Lean  $LEAN_TAG$
WebSocketClientWrapper.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using QuantConnect.Logging;
17 using QuantConnect.Util;
18 using System;
19 using System.IO;
20 using System.Net.WebSockets;
21 using System.Text;
22 using System.Threading;
23 using System.Threading.Tasks;
24 
26 {
27  /// <summary>
28  /// Wrapper for System.Net.Websockets.ClientWebSocket to enhance testability
29  /// </summary>
31  {
32  private const int ReceiveBufferSize = 8192;
33 
34  private string _url;
35  private string _sessionToken;
36  private CancellationTokenSource _cts;
37  private ClientWebSocket _client;
38  private Task _taskConnect;
39  private object _connectLock = new object();
40  private readonly object _locker = new object();
41 
42  /// <summary>
43  /// Wraps constructor
44  /// </summary>
45  /// <param name="url">The target websocket url</param>
46  /// <param name="sessionToken">The websocket session token</param>
47  public void Initialize(string url, string sessionToken = null)
48  {
49  _url = url;
50  _sessionToken = sessionToken;
51  }
52 
53  /// <summary>
54  /// Wraps send method
55  /// </summary>
56  /// <param name="data"></param>
57  public void Send(string data)
58  {
59  lock (_locker)
60  {
61  var buffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(data));
62  _client.SendAsync(buffer, WebSocketMessageType.Text, true, _cts.Token).SynchronouslyAwaitTask();
63  }
64  }
65 
66  /// <summary>
67  /// Wraps Connect method
68  /// </summary>
69  public void Connect()
70  {
71  lock (_connectLock)
72  {
73  lock (_locker)
74  {
75  if (_cts == null)
76  {
77  _cts = new CancellationTokenSource();
78 
79  _client = null;
80 
81  _taskConnect = Task.Factory.StartNew(
82  () =>
83  {
84  Log.Trace($"WebSocketClientWrapper connection task started: {_url}");
85 
86  try
87  {
88  HandleConnection();
89  }
90  catch (Exception e)
91  {
92  Log.Error(e, $"Error in WebSocketClientWrapper connection task: {_url}: ");
93  }
94 
95  Log.Trace($"WebSocketClientWrapper connection task ended: {_url}");
96  },
97  _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
98  }
99  }
100 
101  var count = 0;
102  do
103  {
104  // wait for _client to be not null, we need to release the '_locker' lock used by 'HandleConnection'
105  if (_client != null || _cts.Token.WaitHandle.WaitOne(50))
106  {
107  break;
108  }
109  }
110  while (++count < 100);
111  }
112  }
113 
114  /// <summary>
115  /// Wraps Close method
116  /// </summary>
117  public void Close()
118  {
119  lock (_locker)
120  {
121  try
122  {
123  try
124  {
125  _client?.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", _cts.Token).SynchronouslyAwaitTask();
126  }
127  catch
128  {
129  // ignored
130  }
131 
132  _cts?.Cancel();
133 
134  _taskConnect?.Wait(TimeSpan.FromSeconds(5));
135 
136  _cts.DisposeSafely();
137  }
138  catch (Exception e)
139  {
140  Log.Error($"WebSocketClientWrapper.Close({_url}): {e}");
141  }
142 
143  _cts = null;
144  }
145 
146  if (_client != null)
147  {
148  OnClose(new WebSocketCloseData(0, string.Empty, true));
149  }
150  }
151 
152  /// <summary>
153  /// Wraps IsAlive
154  /// </summary>
155  public bool IsOpen => _client?.State == WebSocketState.Open;
156 
157  /// <summary>
158  /// Wraps message event
159  /// </summary>
160  public event EventHandler<WebSocketMessage> Message;
161 
162  /// <summary>
163  /// Wraps error event
164  /// </summary>
165  public event EventHandler<WebSocketError> Error;
166 
167  /// <summary>
168  /// Wraps open method
169  /// </summary>
170  public event EventHandler Open;
171 
172  /// <summary>
173  /// Wraps close method
174  /// </summary>
175  public event EventHandler<WebSocketCloseData> Closed;
176 
177  /// <summary>
178  /// Event invocator for the <see cref="Message"/> event
179  /// </summary>
180  protected virtual void OnMessage(WebSocketMessage e)
181  {
182  Message?.Invoke(this, e);
183  }
184 
185  /// <summary>
186  /// Event invocator for the <see cref="Error"/> event
187  /// </summary>
188  /// <param name="e"></param>
189  protected virtual void OnError(WebSocketError e)
190  {
191  Log.Error(e.Exception, $"WebSocketClientWrapper.OnError(): (IsOpen:{IsOpen}, State:{_client.State}): {_url}: {e.Message}");
192  Error?.Invoke(this, e);
193  }
194 
195  /// <summary>
196  /// Event invocator for the <see cref="Open"/> event
197  /// </summary>
198  protected virtual void OnOpen()
199  {
200  Log.Trace($"WebSocketClientWrapper.OnOpen(): Connection opened (IsOpen:{IsOpen}, State:{_client.State}): {_url}");
201  Open?.Invoke(this, EventArgs.Empty);
202  }
203 
204  /// <summary>
205  /// Event invocator for the <see cref="Close"/> event
206  /// </summary>
207  protected virtual void OnClose(WebSocketCloseData e)
208  {
209  Log.Trace($"WebSocketClientWrapper.OnClose(): Connection closed (IsOpen:{IsOpen}, State:{_client.State}): {_url}");
210  Closed?.Invoke(this, e);
211  }
212 
213  private void HandleConnection()
214  {
215  var receiveBuffer = new byte[ReceiveBufferSize];
216 
217  while (_cts is { IsCancellationRequested: false })
218  {
219  Log.Trace($"WebSocketClientWrapper.HandleConnection({_url}): Connecting...");
220 
221  const int maximumWaitTimeOnError = 120 * 1000;
222  const int minimumWaitTimeOnError = 2 * 1000;
223  var waitTimeOnError = minimumWaitTimeOnError;
224  using (var connectionCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token))
225  {
226  try
227  {
228  lock (_locker)
229  {
230  _client.DisposeSafely();
231  _client = new ClientWebSocket();
232  if (_sessionToken != null)
233  {
234  _client.Options.SetRequestHeader("x-session-token", _sessionToken);
235  }
236  _client.ConnectAsync(new Uri(_url), connectionCts.Token).SynchronouslyAwaitTask();
237  }
238  OnOpen();
239 
240  while ((_client.State == WebSocketState.Open || _client.State == WebSocketState.CloseSent) &&
241  !connectionCts.IsCancellationRequested)
242  {
243  var messageData = ReceiveMessage(_client, connectionCts.Token, receiveBuffer);
244 
245  if (messageData == null)
246  {
247  break;
248  }
249 
250  // reset wait time
251  waitTimeOnError = minimumWaitTimeOnError;
252  OnMessage(new WebSocketMessage(this, messageData));
253  }
254  }
255  catch (OperationCanceledException) { }
256  catch (WebSocketException ex)
257  {
258  OnError(new WebSocketError(ex.Message, ex));
259  connectionCts.Token.WaitHandle.WaitOne(waitTimeOnError);
260 
261  // increase wait time until a maximum value. This is useful during brokerage down times
262  waitTimeOnError += Math.Min(maximumWaitTimeOnError, waitTimeOnError);
263  }
264  catch (Exception ex)
265  {
266  OnError(new WebSocketError(ex.Message, ex));
267  }
268  connectionCts.Cancel();
269  }
270  }
271  }
272 
273  private MessageData ReceiveMessage(
274  WebSocket webSocket,
275  CancellationToken ct,
276  byte[] receiveBuffer,
277  long maxSize = long.MaxValue)
278  {
279  var buffer = new ArraySegment<byte>(receiveBuffer);
280 
281  using (var ms = new MemoryStream())
282  {
283  WebSocketReceiveResult result;
284 
285  do
286  {
287  result = webSocket.ReceiveAsync(buffer, ct).SynchronouslyAwaitTask();
288  ms.Write(buffer.Array, buffer.Offset, result.Count);
289  if (ms.Length > maxSize)
290  {
291  throw new InvalidOperationException($"Maximum size of the message was exceeded: {_url}");
292  }
293  }
294  while (!result.EndOfMessage);
295 
296  if (result.MessageType == WebSocketMessageType.Binary)
297  {
298  return new BinaryMessage
299  {
300  Data = ms.ToArray(),
301  Count = result.Count,
302  };
303  }
304  else if (result.MessageType == WebSocketMessageType.Text)
305  {
306  return new TextMessage
307  {
308  Message = Encoding.UTF8.GetString(ms.GetBuffer(), 0, (int)ms.Length),
309  };
310  }
311  else if (result.MessageType == WebSocketMessageType.Close)
312  {
313  Log.Trace($"WebSocketClientWrapper.HandleConnection({_url}): WebSocketMessageType.Close - Data: {Encoding.UTF8.GetString(ms.GetBuffer(), 0, (int)ms.Length)}");
314  return null;
315  }
316  }
317  return null;
318  }
319 
320  /// <summary>
321  /// Defines a message of websocket data
322  /// </summary>
323  public abstract class MessageData
324  {
325  /// <summary>
326  /// Type of message
327  /// </summary>
328  public WebSocketMessageType MessageType { get; set; }
329  }
330 
331  /// <summary>
332  /// Defines a text-Type message of websocket data
333  /// </summary>
334  public class TextMessage : MessageData
335  {
336  /// <summary>
337  /// Data contained in message
338  /// </summary>
339  public string Message { get; set; }
340 
341  /// <summary>
342  /// Constructs default instance of the TextMessage
343  /// </summary>
344  public TextMessage()
345  {
346  MessageType = WebSocketMessageType.Text;
347  }
348  }
349 
350  /// <summary>
351  /// Defines a byte-Type message of websocket data
352  /// </summary>
353  public class BinaryMessage : MessageData
354  {
355  /// <summary>
356  /// Data contained in message
357  /// </summary>
358  public byte[] Data { get; set; }
359 
360  /// <summary>
361  /// Count of message
362  /// </summary>
363  public int Count { get; set; }
364 
365  /// <summary>
366  /// Constructs default instance of the BinaryMessage
367  /// </summary>
368  public BinaryMessage()
369  {
370  MessageType = WebSocketMessageType.Binary;
371  }
372  }
373  }
374 }