Lean  $LEAN_TAG$
DefaultBrokerageMessageHandler.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Linq;
18 using System.Threading;
19 using System.Threading.Tasks;
20 using QuantConnect.Data;
22 using QuantConnect.Logging;
23 using QuantConnect.Packets;
24 using QuantConnect.Util;
25 
27 {
28  /// <summary>
29  /// Provides a default implementation o <see cref="IBrokerageMessageHandler"/> that will forward
30  /// messages as follows:
31  /// Information -> IResultHandler.Debug
32  /// Warning -> IResultHandler.Error &amp;&amp; IApi.SendUserEmail
33  /// Error -> IResultHandler.Error &amp;&amp; IAlgorithm.RunTimeError
34  /// </summary>
36  {
37  private static readonly TimeSpan DefaultOpenThreshold = TimeSpan.FromMinutes(5);
38  private static readonly TimeSpan DefaultInitialDelay = TimeSpan.FromMinutes(15);
39 
40  private volatile bool _connected;
41 
42  private readonly IApi _api;
43  private readonly IAlgorithm _algorithm;
44  private readonly TimeSpan _openThreshold;
45  private readonly AlgorithmNodePacket _job;
46  private readonly TimeSpan _initialDelay;
47  private CancellationTokenSource _cancellationTokenSource;
48 
49  /// <summary>
50  /// Initializes a new instance of the <see cref="DefaultBrokerageMessageHandler"/> class
51  /// </summary>
52  /// <param name="algorithm">The running algorithm</param>
53  /// <param name="job">The job that produced the algorithm</param>
54  /// <param name="api">The api for the algorithm</param>
55  /// <param name="initialDelay"></param>
56  /// <param name="openThreshold">Defines how long before market open to re-check for brokerage reconnect message</param>
57  public DefaultBrokerageMessageHandler(IAlgorithm algorithm, AlgorithmNodePacket job, IApi api, TimeSpan? initialDelay = null, TimeSpan? openThreshold = null)
58  {
59  _api = api;
60  _job = job;
61  _algorithm = algorithm;
62  _connected = true;
63  _openThreshold = openThreshold ?? DefaultOpenThreshold;
64  _initialDelay = initialDelay ?? DefaultInitialDelay;
65  }
66 
67  /// <summary>
68  /// Handles the message
69  /// </summary>
70  /// <param name="message">The message to be handled</param>
71  public void HandleMessage(BrokerageMessageEvent message)
72  {
73  // based on message type dispatch to result handler
74  switch (message.Type)
75  {
76  case BrokerageMessageType.Information:
77  _algorithm.Debug(Messages.DefaultBrokerageMessageHandler.BrokerageInfo(message));
78  break;
79 
80  case BrokerageMessageType.Warning:
81  _algorithm.Error(Messages.DefaultBrokerageMessageHandler.BrokerageWarning(message));
82  break;
83 
84  case BrokerageMessageType.Error:
85  // unexpected error, we need to close down shop
86  _algorithm.SetRuntimeError(new Exception(message.Message),
87  Messages.DefaultBrokerageMessageHandler.BrokerageErrorContext);
88  break;
89 
90  case BrokerageMessageType.Disconnect:
91  _connected = false;
93 
94  // check to see if any non-custom security exchanges are open within the next x minutes
95  var open = (from kvp in _algorithm.Securities
96  let security = kvp.Value
97  where security.Type != SecurityType.Base
98  let exchange = security.Exchange
99  let localTime = _algorithm.UtcTime.ConvertFromUtc(exchange.TimeZone)
100  where exchange.IsOpenDuringBar(
101  localTime,
102  localTime + _openThreshold,
103  _algorithm.SubscriptionManager.SubscriptionDataConfigService
104  .GetSubscriptionDataConfigs(security.Symbol)
105  .IsExtendedMarketHours())
106  select security).Any();
107 
108  // if any are open then we need to kill the algorithm
109  if (open)
110  {
111  Log.Trace(Messages.DefaultBrokerageMessageHandler.DisconnectedWhenExchangesAreOpen(_initialDelay));
112 
113  // wait 15 minutes before killing algorithm
114  StartCheckReconnected(_initialDelay, message);
115  }
116  else
117  {
118  Log.Trace(Messages.DefaultBrokerageMessageHandler.DisconnectedWhenExchangesAreClosed);
119 
120  // if they aren't open, we'll need to check again a little bit before markets open
121  DateTime nextMarketOpenUtc;
122  if (_algorithm.Securities.Count != 0)
123  {
124  nextMarketOpenUtc = (from kvp in _algorithm.Securities
125  let security = kvp.Value
126  where security.Type != SecurityType.Base
127  let exchange = security.Exchange
128  let localTime = _algorithm.UtcTime.ConvertFromUtc(exchange.TimeZone)
129  let marketOpen = exchange.Hours.GetNextMarketOpen(localTime,
130  _algorithm.SubscriptionManager.SubscriptionDataConfigService
131  .GetSubscriptionDataConfigs(security.Symbol)
132  .IsExtendedMarketHours())
133  let marketOpenUtc = marketOpen.ConvertToUtc(exchange.TimeZone)
134  select marketOpenUtc).Min();
135  }
136  else
137  {
138  // if we have no securities just make next market open an hour from now
139  nextMarketOpenUtc = DateTime.UtcNow.AddHours(1);
140  }
141 
142  var timeUntilNextMarketOpen = nextMarketOpenUtc - DateTime.UtcNow - _openThreshold;
143  Log.Trace(Messages.DefaultBrokerageMessageHandler.TimeUntilNextMarketOpen(timeUntilNextMarketOpen));
144 
145  // wake up 5 minutes before market open and check if we've reconnected
146  StartCheckReconnected(timeUntilNextMarketOpen, message);
147  }
148  break;
149 
150  case BrokerageMessageType.Reconnect:
151  _connected = true;
153 
154  if (_cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested)
155  {
156  _cancellationTokenSource.Cancel();
157  }
158  break;
159  }
160  }
161 
162  /// <summary>
163  /// Handles a new order placed manually in the brokerage side
164  /// </summary>
165  /// <param name="eventArgs">The new order event</param>
166  /// <returns>Whether the order should be added to the transaction handler</returns>
168  {
169  return true;
170  }
171 
172  private void StartCheckReconnected(TimeSpan delay, BrokerageMessageEvent message)
173  {
174  _cancellationTokenSource.DisposeSafely();
175  _cancellationTokenSource = new CancellationTokenSource(delay);
176 
177  Task.Run(() =>
178  {
179  while (!_cancellationTokenSource.IsCancellationRequested)
180  {
181  Thread.Sleep(TimeSpan.FromMinutes(1));
182  }
183 
184  CheckReconnected(message);
185 
186  }, _cancellationTokenSource.Token);
187  }
188 
189  private void CheckReconnected(BrokerageMessageEvent message)
190  {
191  if (!_connected)
192  {
193  Log.Error(Messages.DefaultBrokerageMessageHandler.StillDisconnected);
194  _algorithm.SetRuntimeError(new Exception(message.Message),
195  Messages.DefaultBrokerageMessageHandler.BrokerageDisconnectedShutDownContext);
196  }
197  }
198  }
199 }