Lean  $LEAN_TAG$
BaseWebsocketsBrokerage.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using Newtonsoft.Json;
17 using QuantConnect.Data;
18 using QuantConnect.Logging;
19 using RestSharp;
20 using System;
21 using System.Collections.Concurrent;
22 using System.Collections.Generic;
23 using System.Linq;
24 using System.Threading;
25 
27 {
28 
29  /// <summary>
30  /// Provides shared brokerage websockets implementation
31  /// </summary>
32  public abstract class BaseWebsocketsBrokerage : Brokerage
33  {
34  private const int ConnectionTimeout = 30000;
35 
36  /// <summary>
37  /// True if the current brokerage is already initialized
38  /// </summary>
39  protected bool IsInitialized { get; set; }
40 
41  /// <summary>
42  /// The websockets client instance
43  /// </summary>
44  protected IWebSocket WebSocket { get; set; }
45 
46  /// <summary>
47  /// The rest client instance
48  /// </summary>
49  protected IRestClient RestClient { get; set; }
50 
51  /// <summary>
52  /// standard json parsing settings
53  /// </summary>
54  protected JsonSerializerSettings JsonSettings { get; set; }
55 
56  /// <summary>
57  /// A list of currently active orders
58  /// </summary>
59  public ConcurrentDictionary<int, Orders.Order> CachedOrderIDs { get; set; }
60 
61  /// <summary>
62  /// The api secret
63  /// </summary>
64  protected string ApiSecret { get; set; }
65 
66  /// <summary>
67  /// The api key
68  /// </summary>
69  protected string ApiKey { get; set; }
70 
71  /// <summary>
72  /// Count subscribers for each (symbol, tickType) combination
73  /// </summary>
75 
76  /// <summary>
77  /// Initialize the instance of this class
78  /// </summary>
79  /// <param name="wssUrl">The web socket base url</param>
80  /// <param name="websocket">instance of websockets client</param>
81  /// <param name="restClient">instance of rest client</param>
82  /// <param name="apiKey">api key</param>
83  /// <param name="apiSecret">api secret</param>
84  protected void Initialize(string wssUrl, IWebSocket websocket, IRestClient restClient, string apiKey, string apiSecret)
85  {
86  if (IsInitialized)
87  {
88  return;
89  }
90  IsInitialized = true;
91  JsonSettings = new JsonSerializerSettings { FloatParseHandling = FloatParseHandling.Decimal };
92  CachedOrderIDs = new ConcurrentDictionary<int, Orders.Order>();
93 
94  WebSocket = websocket;
95  WebSocket.Initialize(wssUrl);
97 
98  WebSocket.Open += (sender, args) =>
99  {
100  Log.Trace($"BaseWebsocketsBrokerage(): WebSocket.Open. Subscribing");
102  };
103 
104  RestClient = restClient;
105  ApiSecret = apiSecret;
106  ApiKey = apiKey;
107  }
108 
109  /// <summary>
110  /// Creates an instance of a websockets brokerage
111  /// </summary>
112  /// <param name="name">Name of brokerage</param>
113  protected BaseWebsocketsBrokerage(string name) : base(name)
114  {
115  }
116 
117  /// <summary>
118  /// Handles websocket received messages
119  /// </summary>
120  /// <param name="sender"></param>
121  /// <param name="e"></param>
122  protected abstract void OnMessage(object sender, WebSocketMessage e);
123 
124  /// <summary>
125  /// Creates wss connection, monitors for disconnection and re-connects when necessary
126  /// </summary>
127  public override void Connect()
128  {
129  if (IsConnected)
130  return;
131 
132  Log.Trace("BaseWebSocketsBrokerage.Connect(): Connecting...");
133 
134  ConnectSync();
135  }
136 
137  /// <summary>
138  /// Handles the creation of websocket subscriptions
139  /// </summary>
140  /// <param name="symbols"></param>
141  protected abstract bool Subscribe(IEnumerable<Symbol> symbols);
142 
143  /// <summary>
144  /// Gets a list of current subscriptions
145  /// </summary>
146  /// <returns></returns>
147  protected virtual IEnumerable<Symbol> GetSubscribed()
148  {
149  return SubscriptionManager?.GetSubscribedSymbols() ?? Enumerable.Empty<Symbol>();
150  }
151 
152  /// <summary>
153  /// Start websocket connect
154  /// </summary>
155  protected void ConnectSync()
156  {
157  var resetEvent = new ManualResetEvent(false);
158  EventHandler triggerEvent = (o, args) => resetEvent.Set();
159  WebSocket.Open += triggerEvent;
160 
161  WebSocket.Connect();
162 
163  if (!resetEvent.WaitOne(ConnectionTimeout))
164  {
165  throw new TimeoutException("Websockets connection timeout.");
166  }
167  WebSocket.Open -= triggerEvent;
168  }
169  }
170 }