Lean  $LEAN_TAG$
DefaultConnectionHandler.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Threading;
18 using QuantConnect.Logging;
19 using QuantConnect.Util;
20 
22 {
23  /// <summary>
24  /// A default implementation of <see cref="IConnectionHandler"/>
25  /// which signals disconnection if no data is received for a given time span
26  /// and attempts to reconnect automatically.
27  /// </summary>
29  {
30  private CancellationTokenSource _cancellationTokenSource;
31  private Thread _connectionMonitorThread;
32 
33  private bool _isEnabled;
34  private readonly object _lockerConnectionMonitor = new object();
35  private volatile bool _connectionLost;
36  private DateTime _lastDataReceivedTime;
37 
38  /// <summary>
39  /// Event that fires when a connection loss is detected
40  /// </summary>
41  public event EventHandler ConnectionLost;
42 
43  /// <summary>
44  /// Event that fires when a lost connection is restored
45  /// </summary>
46  public event EventHandler ConnectionRestored;
47 
48  /// <summary>
49  /// Event that fires when a reconnection attempt is required
50  /// </summary>
51  public event EventHandler ReconnectRequested;
52 
53  /// <summary>
54  /// The elapsed time with no received data after which a connection loss is reported
55  /// </summary>
56  public TimeSpan MaximumIdleTimeSpan { get; set; } = TimeSpan.FromSeconds(5);
57 
58  /// <summary>
59  /// The minimum time in seconds to wait before attempting to reconnect
60  /// </summary>
61  public int MinimumSecondsForNextReconnectionAttempt { get; set; } = 1;
62 
63  /// <summary>
64  /// The maximum time in seconds to wait before attempting to reconnect
65  /// </summary>
66  public int MaximumSecondsForNextReconnectionAttempt { get; set; } = 60;
67 
68  /// <summary>
69  /// The unique Id for the connection
70  /// </summary>
71  public string ConnectionId { get; private set; }
72 
73  /// <summary>
74  /// Returns true if the connection has been lost
75  /// </summary>
76  public bool IsConnectionLost => _connectionLost;
77 
78  /// <summary>
79  /// Initializes the connection handler
80  /// </summary>
81  /// <param name="connectionId">The connection id</param>
82  public void Initialize(string connectionId)
83  {
84  ConnectionId = connectionId;
85 
86  using var waitHandle = new ManualResetEvent(false);
87 
88  _cancellationTokenSource = new CancellationTokenSource();
89 
90  _connectionMonitorThread = new Thread(() =>
91  {
92  waitHandle.Set();
93 
94  var nextReconnectionAttemptUtcTime = DateTime.UtcNow;
95  var nextReconnectionAttemptSeconds = MinimumSecondsForNextReconnectionAttempt;
96 
97  lock (_lockerConnectionMonitor)
98  {
99  _lastDataReceivedTime = DateTime.UtcNow;
100  }
101 
102  try
103  {
104  while (!_cancellationTokenSource.IsCancellationRequested
105  && !_cancellationTokenSource.Token.WaitHandle.WaitOne(Time.GetSecondUnevenWait(1000)))
106  {
107  if (!_isEnabled) continue;
108 
109  try
110  {
111  TimeSpan elapsed;
112  lock (_lockerConnectionMonitor)
113  {
114  elapsed = DateTime.UtcNow - _lastDataReceivedTime;
115  }
116 
117  if (!_connectionLost && elapsed > MaximumIdleTimeSpan)
118  {
119  _connectionLost = true;
120  nextReconnectionAttemptUtcTime = DateTime.UtcNow.AddSeconds(nextReconnectionAttemptSeconds);
121 
123  }
124  else if (_connectionLost)
125  {
126  if (elapsed <= MaximumIdleTimeSpan)
127  {
128  _connectionLost = false;
129  nextReconnectionAttemptSeconds = MinimumSecondsForNextReconnectionAttempt;
130 
132  }
133  else
134  {
135  if (DateTime.UtcNow > nextReconnectionAttemptUtcTime)
136  {
137  // double the interval between attempts (capped to 1 minute)
138  nextReconnectionAttemptSeconds = Math.Min(nextReconnectionAttemptSeconds * 2, MaximumSecondsForNextReconnectionAttempt);
139  nextReconnectionAttemptUtcTime = DateTime.UtcNow.AddSeconds(nextReconnectionAttemptSeconds);
140 
142  }
143  }
144  }
145  }
146  catch (Exception exception)
147  {
148  Log.Error($"Error in DefaultConnectionHandler: {exception}");
149  }
150  }
151  }
152  catch (Exception exception)
153  {
154  Log.Error(exception);
155  }
156  }) { IsBackground = true };
157 
158  _connectionMonitorThread.Start();
159 
160  waitHandle.WaitOne();
161  }
162 
163  /// <summary>
164  /// Enables/disables monitoring of the connection
165  /// </summary>
166  /// <param name="isEnabled">True to enable monitoring, false otherwise</param>
167  public void EnableMonitoring(bool isEnabled)
168  {
169  // if we are switching to enabled, initialize the last data received time
170  if (!_isEnabled && isEnabled)
171  {
172  KeepAlive(DateTime.UtcNow);
173  }
174 
175  _isEnabled = isEnabled;
176  }
177 
178  /// <summary>
179  /// Notifies the connection handler that new data was received
180  /// </summary>
181  /// <param name="lastDataReceivedTime">The UTC timestamp of the last data point received</param>
182  public void KeepAlive(DateTime lastDataReceivedTime)
183  {
184  lock (_lockerConnectionMonitor)
185  {
186  _lastDataReceivedTime = lastDataReceivedTime;
187  }
188  }
189 
190  /// <summary>
191  /// Event invocator for the <see cref="ConnectionLost"/> event
192  /// </summary>
193  protected virtual void OnConnectionLost()
194  {
195  Log.Error("DefaultConnectionHandler.OnConnectionLost(): WebSocket connection lost.");
196  ConnectionLost?.Invoke(this, EventArgs.Empty);
197  }
198 
199  /// <summary>
200  /// Event invocator for the <see cref="ConnectionRestored"/> event
201  /// </summary>
202  protected virtual void OnConnectionRestored()
203  {
204  Log.Trace("DefaultConnectionHandler.OnConnectionRestored(): WebSocket connection restored.");
205  ConnectionRestored?.Invoke(this, EventArgs.Empty);
206  }
207 
208  /// <summary>
209  /// Event invocator for the <see cref="ReconnectRequested"/> event
210  /// </summary>
211  protected virtual void OnReconnectRequested()
212  {
213  ReconnectRequested?.Invoke(this, EventArgs.Empty);
214  }
215 
216  /// <summary>
217  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
218  /// </summary>
219  public void Dispose()
220  {
221  _isEnabled = false;
222 
223  // request and wait for thread to stop
224  _connectionMonitorThread.StopSafely(TimeSpan.FromSeconds(5), _cancellationTokenSource);
225  _cancellationTokenSource?.DisposeSafely();
226  }
227  }
228 }