Lean  $LEAN_TAG$
BrokerageMultiWebSocketSubscriptionManager.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 using System;
17 using System.Linq;
18 using System.Threading;
19 using QuantConnect.Data;
20 using QuantConnect.Util;
21 using QuantConnect.Logging;
22 using System.Threading.Tasks;
23 using System.Collections.Generic;
24 using System.Runtime.CompilerServices;
25 
27 {
28  /// <summary>
29  /// Handles brokerage data subscriptions with multiple websocket connections, with optional symbol weighting
30  /// </summary>
32  {
33  private readonly string _webSocketUrl;
34  private readonly int _maximumSymbolsPerWebSocket;
35  private readonly int _maximumWebSocketConnections;
36  private readonly Func<WebSocketClientWrapper> _webSocketFactory;
37  private readonly Func<IWebSocket, Symbol, bool> _subscribeFunc;
38  private readonly Func<IWebSocket, Symbol, bool> _unsubscribeFunc;
39  private readonly Action<WebSocketMessage> _messageHandler;
40  private readonly RateGate _connectionRateLimiter;
41  private readonly System.Timers.Timer _reconnectTimer;
42 
43  private const int ConnectionTimeout = 30000;
44 
45  private readonly object _locker = new();
46  private readonly List<BrokerageMultiWebSocketEntry> _webSocketEntries = new();
47 
48  /// <summary>
49  /// Initializes a new instance of the <see cref="BrokerageMultiWebSocketSubscriptionManager"/> class
50  /// </summary>
51  /// <param name="webSocketUrl">The URL for websocket connections</param>
52  /// <param name="maximumSymbolsPerWebSocket">The maximum number of symbols per websocket connection</param>
53  /// <param name="maximumWebSocketConnections">The maximum number of websocket connections allowed (if zero, symbol weighting is disabled)</param>
54  /// <param name="symbolWeights">A dictionary for the symbol weights</param>
55  /// <param name="webSocketFactory">A function which returns a new websocket instance</param>
56  /// <param name="subscribeFunc">A function which subscribes a symbol</param>
57  /// <param name="unsubscribeFunc">A function which unsubscribes a symbol</param>
58  /// <param name="messageHandler">The websocket message handler</param>
59  /// <param name="webSocketConnectionDuration">The maximum duration of the websocket connection, TimeSpan.Zero for no duration limit</param>
60  /// <param name="connectionRateLimiter">The rate limiter for creating new websocket connections</param>
62  string webSocketUrl,
63  int maximumSymbolsPerWebSocket,
64  int maximumWebSocketConnections,
65  Dictionary<Symbol, int> symbolWeights,
66  Func<WebSocketClientWrapper> webSocketFactory,
67  Func<IWebSocket, Symbol, bool> subscribeFunc,
68  Func<IWebSocket, Symbol, bool> unsubscribeFunc,
69  Action<WebSocketMessage> messageHandler,
70  TimeSpan webSocketConnectionDuration,
71  RateGate connectionRateLimiter = null)
72  {
73  _webSocketUrl = webSocketUrl;
74  _maximumSymbolsPerWebSocket = maximumSymbolsPerWebSocket;
75  _maximumWebSocketConnections = maximumWebSocketConnections;
76  _webSocketFactory = webSocketFactory;
77  _subscribeFunc = subscribeFunc;
78  _unsubscribeFunc = unsubscribeFunc;
79  _messageHandler = messageHandler;
80  _connectionRateLimiter = connectionRateLimiter;
81 
82  if (_maximumWebSocketConnections > 0)
83  {
84  // symbol weighting enabled, create all websocket instances
85  for (var i = 0; i < _maximumWebSocketConnections; i++)
86  {
87  var webSocket = CreateWebSocket();
88 
89  _webSocketEntries.Add(new BrokerageMultiWebSocketEntry(symbolWeights, webSocket));
90  }
91  }
92 
93  // Some exchanges (e.g. Binance) require a daily restart for websocket connections
94  if (webSocketConnectionDuration != TimeSpan.Zero)
95  {
96  _reconnectTimer = new System.Timers.Timer
97  {
98  Interval = webSocketConnectionDuration.TotalMilliseconds
99  };
100  _reconnectTimer.Elapsed += (_, _) =>
101  {
102  Log.Trace("BrokerageMultiWebSocketSubscriptionManager(): Restarting websocket connections");
103 
104  lock (_locker)
105  {
106  foreach (var entry in _webSocketEntries)
107  {
108  if (entry.WebSocket.IsOpen)
109  {
110  Task.Factory.StartNew(() =>
111  {
112  Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - disconnect: ({entry.WebSocket.GetHashCode()})");
113  Disconnect(entry.WebSocket);
114 
115  Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - connect: ({entry.WebSocket.GetHashCode()})");
116  Connect(entry.WebSocket);
117  });
118  }
119  }
120  }
121  };
122  _reconnectTimer.Start();
123 
124  Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): WebSocket connections will be restarted every: {webSocketConnectionDuration}");
125  }
126  }
127 
128  /// <summary>
129  /// Subscribes to the symbols
130  /// </summary>
131  /// <param name="symbols">Symbols to subscribe</param>
132  /// <param name="tickType">Type of tick data</param>
133  protected override bool Subscribe(IEnumerable<Symbol> symbols, TickType tickType)
134  {
135  Log.Trace($"BrokerageMultiWebSocketSubscriptionManager.Subscribe(): {string.Join(",", symbols.Select(x => x.Value))}");
136 
137  var success = true;
138 
139  foreach (var symbol in symbols)
140  {
141  var webSocket = GetWebSocketForSymbol(symbol);
142 
143  success &= _subscribeFunc(webSocket, symbol);
144  }
145 
146  return success;
147  }
148 
149  /// <summary>
150  /// Unsubscribes from the symbols
151  /// </summary>
152  /// <param name="symbols">Symbols to subscribe</param>
153  /// <param name="tickType">Type of tick data</param>
154  protected override bool Unsubscribe(IEnumerable<Symbol> symbols, TickType tickType)
155  {
156  Log.Trace($"BrokerageMultiWebSocketSubscriptionManager.Unsubscribe(): {string.Join(",", symbols.Select(x => x.Value))}");
157 
158  var success = true;
159 
160  foreach (var symbol in symbols)
161  {
162  var entry = GetWebSocketEntryBySymbol(symbol);
163  if (entry != null)
164  {
165  entry.RemoveSymbol(symbol);
166 
167  success &= _unsubscribeFunc(entry.WebSocket, symbol);
168  }
169  }
170 
171  return success;
172  }
173 
174  /// <summary>
175  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
176  /// </summary>
177  public override void Dispose()
178  {
179  _reconnectTimer?.Stop();
180  _reconnectTimer.DisposeSafely();
181  lock (_locker)
182  {
183  foreach (var entry in _webSocketEntries)
184  {
185  try
186  {
187  entry.WebSocket.Open -= OnOpen;
188  entry.WebSocket.Message -= EventHandler;
189  entry.WebSocket.Close();
190  }
191  catch (Exception ex)
192  {
193  Log.Error(ex);
194  }
195  }
196  _webSocketEntries.Clear();
197  }
198  }
199 
200  private BrokerageMultiWebSocketEntry GetWebSocketEntryBySymbol(Symbol symbol)
201  {
202  lock (_locker)
203  {
204  foreach (var entry in _webSocketEntries.Where(entry => entry.Contains(symbol)))
205  {
206  return entry;
207  }
208  }
209 
210  return null;
211  }
212 
213  /// <summary>
214  /// Adds a symbol to an existing or new websocket connection
215  /// </summary>
216  private IWebSocket GetWebSocketForSymbol(Symbol symbol)
217  {
218  BrokerageMultiWebSocketEntry entry;
219 
220  lock (_locker)
221  {
222  if (_webSocketEntries.All(x => x.SymbolCount >= _maximumSymbolsPerWebSocket))
223  {
224  if (_maximumWebSocketConnections > 0)
225  {
226  throw new NotSupportedException($"Maximum symbol count reached for the current configuration [MaxSymbolsPerWebSocket={_maximumSymbolsPerWebSocket}, MaxWebSocketConnections:{_maximumWebSocketConnections}]");
227  }
228 
229  // symbol limit reached on all, create new websocket instance
230  var webSocket = CreateWebSocket();
231 
232  _webSocketEntries.Add(new BrokerageMultiWebSocketEntry(webSocket));
233  }
234 
235  // sort by weight ascending, taking into account the symbol limit per websocket
236  _webSocketEntries.Sort((x, y) =>
237  x.SymbolCount >= _maximumSymbolsPerWebSocket
238  ? 1
239  : y.SymbolCount >= _maximumSymbolsPerWebSocket
240  ? -1
241  : Math.Sign(x.TotalWeight - y.TotalWeight));
242 
243  entry = _webSocketEntries.First();
244  }
245 
246  if (!entry.WebSocket.IsOpen)
247  {
248  Connect(entry.WebSocket);
249  }
250 
251  entry.AddSymbol(symbol);
252 
253  Log.Trace($"BrokerageMultiWebSocketSubscriptionManager.GetWebSocketForSymbol(): added symbol: {symbol} to websocket: {entry.WebSocket.GetHashCode()} - Count: {entry.SymbolCount}");
254 
255  return entry.WebSocket;
256  }
257 
258  /// <summary>
259  /// When we create a websocket we will subscribe to it's events once and initialize it
260  /// </summary>
261  /// <remarks>Note that the websocket is no connected yet <see cref="Connect(IWebSocket)"/></remarks>
262  private IWebSocket CreateWebSocket()
263  {
264  var webSocket = _webSocketFactory();
265  webSocket.Open += OnOpen;
266  webSocket.Message += EventHandler;
267  webSocket.Initialize(_webSocketUrl);
268 
269  return webSocket;
270  }
271 
272  [MethodImpl(MethodImplOptions.AggressiveInlining)]
273  private void EventHandler(object _, WebSocketMessage message)
274  {
275  _messageHandler(message);
276  }
277 
278  private void Connect(IWebSocket webSocket)
279  {
280  var connectedEvent = new ManualResetEvent(false);
281  EventHandler onOpenAction = (_, _) =>
282  {
283  connectedEvent.Set();
284  };
285 
286  webSocket.Open += onOpenAction;
287 
288  if (_connectionRateLimiter is { IsRateLimited: false })
289  {
290  _connectionRateLimiter.WaitToProceed();
291  }
292 
293  try
294  {
295  webSocket.Connect();
296 
297  if (!connectedEvent.WaitOne(ConnectionTimeout))
298  {
299  throw new TimeoutException($"BrokerageMultiWebSocketSubscriptionManager.Connect(): WebSocket connection timeout: {webSocket.GetHashCode()}");
300  }
301  }
302  finally
303  {
304  webSocket.Open -= onOpenAction;
305 
306  connectedEvent.DisposeSafely();
307  }
308  }
309 
310  private void Disconnect(IWebSocket webSocket)
311  {
312  webSocket.Close();
313  }
314 
315  private void OnOpen(object sender, EventArgs e)
316  {
317  var webSocket = (IWebSocket)sender;
318 
319  lock (_locker)
320  {
321  foreach (var entry in _webSocketEntries)
322  {
323  if (entry.WebSocket == webSocket && entry.Symbols.Count > 0)
324  {
325  Log.Trace($"BrokerageMultiWebSocketSubscriptionManager.Connect(): WebSocket opened: {webSocket.GetHashCode()} - Resubscribing existing symbols: {entry.Symbols.Count}");
326 
327  Task.Factory.StartNew(() =>
328  {
329  foreach (var symbol in entry.Symbols)
330  {
331  _subscribeFunc(webSocket, symbol);
332  }
333  });
334  }
335  }
336  }
337  }
338  }
339 }