18 using System.Threading;
19 using System.Threading.Tasks;
37 private static readonly TimeSpan DefaultOpenThreshold = TimeSpan.FromMinutes(5);
38 private static readonly TimeSpan DefaultInitialDelay = TimeSpan.FromMinutes(15);
40 private volatile bool _connected;
42 private readonly
IApi _api;
44 private readonly TimeSpan _openThreshold;
46 private readonly TimeSpan _initialDelay;
47 private CancellationTokenSource _cancellationTokenSource;
61 _algorithm = algorithm;
63 _openThreshold = openThreshold ?? DefaultOpenThreshold;
64 _initialDelay = initialDelay ?? DefaultInitialDelay;
86 _algorithm.SetRuntimeError(
new Exception(message.
Message),
95 var open = (from kvp in _algorithm.Securities
96 let security = kvp.Value
98 let exchange = security.Exchange
99 let localTime = _algorithm.UtcTime.ConvertFromUtc(exchange.TimeZone)
100 where exchange.IsOpenDuringBar(
102 localTime + _openThreshold,
103 _algorithm.SubscriptionManager.SubscriptionDataConfigService
104 .GetSubscriptionDataConfigs(security.Symbol)
105 .IsExtendedMarketHours())
106 select security).Any();
114 StartCheckReconnected(_initialDelay, message);
121 DateTime nextMarketOpenUtc;
122 if (_algorithm.Securities.Count != 0)
124 nextMarketOpenUtc = (from kvp in _algorithm.Securities
125 let security = kvp.Value
127 let exchange = security.Exchange
128 let localTime = _algorithm.UtcTime.ConvertFromUtc(exchange.TimeZone)
129 let marketOpen = exchange.Hours.GetNextMarketOpen(localTime,
130 _algorithm.SubscriptionManager.SubscriptionDataConfigService
131 .GetSubscriptionDataConfigs(security.Symbol)
132 .IsExtendedMarketHours())
133 let marketOpenUtc = marketOpen.ConvertToUtc(exchange.TimeZone)
134 select marketOpenUtc).Min();
139 nextMarketOpenUtc = DateTime.UtcNow.AddHours(1);
142 var timeUntilNextMarketOpen = nextMarketOpenUtc - DateTime.UtcNow - _openThreshold;
146 StartCheckReconnected(timeUntilNextMarketOpen, message);
154 if (_cancellationTokenSource !=
null && !_cancellationTokenSource.IsCancellationRequested)
156 _cancellationTokenSource.Cancel();
174 _cancellationTokenSource.DisposeSafely();
175 _cancellationTokenSource =
new CancellationTokenSource(delay);
179 while (!_cancellationTokenSource.IsCancellationRequested)
181 Thread.Sleep(TimeSpan.FromMinutes(1));
184 CheckReconnected(message);
186 }, _cancellationTokenSource.Token);
189 private void CheckReconnected(BrokerageMessageEvent message)
193 Log.
Error(Messages.DefaultBrokerageMessageHandler.StillDisconnected);
194 _algorithm.SetRuntimeError(
new Exception(message.Message),
195 Messages.DefaultBrokerageMessageHandler.BrokerageDisconnectedShutDownContext);