Lean  $LEAN_TAG$
LiveTradingDataFeed.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Linq;
19 using QuantConnect.Data;
20 using QuantConnect.Util;
21 using QuantConnect.Logging;
22 using QuantConnect.Packets;
25 using System.Collections.Generic;
34 
36 {
37  /// <summary>
38  /// Provides an implementation of <see cref="IDataFeed"/> that is designed to deal with
39  /// live, remote data sources
40  /// </summary>
42  {
43  private static readonly int MaximumWarmupHistoryDaysLookBack = Config.GetInt("maximum-warmup-history-days-look-back", 5);
44 
45  private LiveNodePacket _job;
46 
47  // used to get current time
48  private ITimeProvider _timeProvider;
49  private IAlgorithm _algorithm;
50  private ITimeProvider _frontierTimeProvider;
51  private IDataProvider _dataProvider;
52  private IMapFileProvider _mapFileProvider;
53  private IDataQueueHandler _dataQueueHandler;
54  private BaseDataExchange _customExchange;
55  private SubscriptionCollection _subscriptions;
56  private IFactorFileProvider _factorFileProvider;
57  private IDataChannelProvider _channelProvider;
58  // in live trading we delay scheduled universe selection between 11 & 12 hours after midnight UTC so that we allow new selection data to be piped in
59  // NY goes from -4/-5 UTC time, so:
60  // 11 UTC - 4 => 7am NY
61  // 12 UTC - 4 => 8am NY
62  private readonly TimeSpan _scheduledUniverseUtcTimeShift = TimeSpan.FromMinutes(11 * 60 + DateTime.UtcNow.Second);
63  private readonly HashSet<string> _unsupportedConfigurations = new();
64 
65  /// <summary>
66  /// Public flag indicator that the thread is still busy.
67  /// </summary>
68  public bool IsActive
69  {
70  get; private set;
71  }
72 
73  /// <summary>
74  /// Initializes the data feed for the specified job and algorithm
75  /// </summary>
76  public override void Initialize(IAlgorithm algorithm,
78  IResultHandler resultHandler,
79  IMapFileProvider mapFileProvider,
80  IFactorFileProvider factorFileProvider,
81  IDataProvider dataProvider,
82  IDataFeedSubscriptionManager subscriptionManager,
83  IDataFeedTimeProvider dataFeedTimeProvider,
84  IDataChannelProvider dataChannelProvider)
85  {
86  if (!(job is LiveNodePacket))
87  {
88  throw new ArgumentException("The LiveTradingDataFeed requires a LiveNodePacket.");
89  }
90 
91  _algorithm = algorithm;
92  _job = (LiveNodePacket)job;
93  _timeProvider = dataFeedTimeProvider.TimeProvider;
94  _dataProvider = dataProvider;
95  _mapFileProvider = mapFileProvider;
96  _factorFileProvider = factorFileProvider;
97  _channelProvider = dataChannelProvider;
98  _frontierTimeProvider = dataFeedTimeProvider.FrontierTimeProvider;
99  _customExchange = GetBaseDataExchange();
100  _subscriptions = subscriptionManager.DataFeedSubscriptions;
101 
102  _dataQueueHandler = GetDataQueueHandler();
103  _dataQueueHandler?.SetJob(_job);
104 
105  // run the custom data exchange
106  _customExchange.Start();
107 
108  IsActive = true;
109 
110  base.Initialize(algorithm, job, resultHandler, mapFileProvider, factorFileProvider, dataProvider, subscriptionManager, dataFeedTimeProvider, dataChannelProvider);
111  }
112 
113  /// <summary>
114  /// Creates a new subscription to provide data for the specified security.
115  /// </summary>
116  /// <param name="request">Defines the subscription to be added, including start/end times the universe and security</param>
117  /// <returns>The created <see cref="Subscription"/> if successful, null otherwise</returns>
119  {
120  Subscription subscription = null;
121  try
122  {
123  // create and add the subscription to our collection
124  subscription = request.IsUniverseSubscription
125  ? CreateUniverseSubscription(request)
126  : CreateDataSubscription(request);
127  }
128  catch (Exception err)
129  {
130  Log.Error(err, $"CreateSubscription(): Failed configuration: '{request.Configuration}'");
131  // kill the algorithm, this shouldn't happen
132  _algorithm.SetRuntimeError(err, $"Failed to subscribe to {request.Configuration.Symbol}");
133  }
134 
135  return subscription;
136  }
137 
138  /// <summary>
139  /// Removes the subscription from the data feed, if it exists
140  /// </summary>
141  /// <param name="subscription">The subscription to remove</param>
142  public override void RemoveSubscription(Subscription subscription)
143  {
144  var symbol = subscription.Configuration.Symbol;
145 
146  // remove the subscriptions
147  if (!_channelProvider.ShouldStreamSubscription(subscription.Configuration))
148  {
149  _customExchange.RemoveEnumerator(symbol);
150  }
151  else
152  {
153  _dataQueueHandler.UnsubscribeWithMapping(subscription.Configuration);
154  }
155  }
156 
157  /// <summary>
158  /// External controller calls to signal a terminate of the thread.
159  /// </summary>
160  public override void Exit()
161  {
162  if (IsActive)
163  {
164  IsActive = false;
165  Log.Trace("LiveTradingDataFeed.Exit(): Start. Setting cancellation token...");
166  if (_dataQueueHandler is DataQueueHandlerManager manager)
167  {
168  manager.UnsupportedConfiguration -= HandleUnsupportedConfigurationEvent;
169  }
170  _customExchange?.Stop();
171  Log.Trace("LiveTradingDataFeed.Exit(): Exit Finished.");
172 
173  base.Exit();
174  }
175  }
176 
177  /// <summary>
178  /// Gets the <see cref="IDataQueueHandler"/> to use by default <see cref="DataQueueHandlerManager"/>
179  /// </summary>
180  /// <remarks>Useful for testing</remarks>
181  /// <returns>The loaded <see cref="IDataQueueHandler"/></returns>
183  {
184  var result = new DataQueueHandlerManager(_algorithm.Settings);
185  result.UnsupportedConfiguration += HandleUnsupportedConfigurationEvent;
186  return result;
187  }
188 
189  /// <summary>
190  /// Gets the <see cref="BaseDataExchange"/> to use
191  /// </summary>
192  /// <remarks>Useful for testing</remarks>
194  {
195  return new BaseDataExchange("CustomDataExchange") { SleepInterval = 100 };
196  }
197 
198  /// <summary>
199  /// Creates a new subscription for the specified security
200  /// </summary>
201  /// <param name="request">The subscription request</param>
202  /// <returns>A new subscription instance of the specified security</returns>
203  private Subscription CreateDataSubscription(SubscriptionRequest request)
204  {
205  Subscription subscription = null;
206 
207  var localEndTime = request.EndTimeUtc.ConvertFromUtc(request.Security.Exchange.TimeZone);
208  var timeZoneOffsetProvider = new TimeZoneOffsetProvider(request.Configuration.ExchangeTimeZone, request.StartTimeUtc, request.EndTimeUtc);
209 
210  IEnumerator<BaseData> enumerator = null;
211  if (!_channelProvider.ShouldStreamSubscription(request.Configuration))
212  {
213  if (!Tiingo.IsAuthCodeSet)
214  {
215  // we're not using the SubscriptionDataReader, so be sure to set the auth token here
216  Tiingo.SetAuthCode(Config.Get("tiingo-auth-token"));
217  }
218 
219  var factory = new LiveCustomDataSubscriptionEnumeratorFactory(_timeProvider, _algorithm.ObjectStore);
220  var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
221 
222  var enqueable = new EnqueueableEnumerator<BaseData>();
223  _customExchange.AddEnumerator(request.Configuration.Symbol, enumeratorStack, handleData: data =>
224  {
225  enqueable.Enqueue(data);
226 
227  subscription?.OnNewDataAvailable();
228  });
229 
230  enumerator = enqueable;
231  }
232  else
233  {
234  var auxEnumerators = new List<IEnumerator<BaseData>>();
235 
236  if (LiveAuxiliaryDataEnumerator.TryCreate(request.Configuration, _timeProvider, request.Security.Cache, _mapFileProvider,
237  _factorFileProvider, request.StartTimeLocal, out var auxDataEnumator))
238  {
239  auxEnumerators.Add(auxDataEnumator);
240  }
241 
242  EventHandler handler = (_, _) => subscription?.OnNewDataAvailable();
243  enumerator = Subscribe(request.Configuration, handler, IsExpired);
244 
245  if (auxEnumerators.Count > 0)
246  {
247  enumerator = new LiveAuxiliaryDataSynchronizingEnumerator(_timeProvider, request.Configuration.ExchangeTimeZone, enumerator, auxEnumerators);
248  }
249  }
250 
251  // scale prices before 'SubscriptionFilterEnumerator' since it updates securities realtime price
252  // and before fill forwarding so we don't happen to apply twice the factor
253  if (request.Configuration.PricesShouldBeScaled(liveMode: true))
254  {
255  enumerator = new PriceScaleFactorEnumerator(
256  enumerator,
257  request.Configuration,
258  _factorFileProvider,
259  liveMode: true);
260  }
261 
262  if (request.Configuration.FillDataForward)
263  {
264  var fillForwardResolution = _subscriptions.UpdateAndGetFillForwardResolution(request.Configuration);
265  // Pass the security exchange hours explicitly to avoid using the ones in the request, since
266  // those could be different. e.g. when requests are created for open interest data the exchange
267  // hours are set to always open to avoid OI data being filtered out due to the exchange being closed.
268  var useDailyStrictEndTimes = LeanData.UseDailyStrictEndTimes(_algorithm.Settings, request, request.Configuration.Symbol, request.Configuration.Increment, request.Security.Exchange.Hours);
269 
270  enumerator = new LiveFillForwardEnumerator(_frontierTimeProvider, enumerator, request.Security.Exchange, fillForwardResolution,
272  useDailyStrictEndTimes, request.Configuration.Type);
273  }
274 
275  // make our subscriptions aware of the frontier of the data feed, prevents future data from spewing into the feed
276  enumerator = new FrontierAwareEnumerator(enumerator, _frontierTimeProvider, timeZoneOffsetProvider);
277 
278  // define market hours and user filters to incoming data after the frontier enumerator so during warmup we avoid any realtime data making it's way into the securities
280  {
281  enumerator = new SubscriptionFilterEnumerator(enumerator, request.Security, localEndTime, request.Configuration.ExtendedMarketHours, true, request.ExchangeHours);
282  }
283 
284  enumerator = GetWarmupEnumerator(request, enumerator);
285 
286  var subscriptionDataEnumerator = new SubscriptionDataEnumerator(request.Configuration, request.Security.Exchange.Hours, timeZoneOffsetProvider,
287  enumerator, request.IsUniverseSubscription, _algorithm.Settings.DailyPreciseEndTime);
288  subscription = new Subscription(request, subscriptionDataEnumerator, timeZoneOffsetProvider);
289 
290  return subscription;
291  }
292 
293  /// <summary>
294  /// Helper method to determine if the symbol associated with the requested configuration is expired or not
295  /// </summary>
296  /// <remarks>This is useful during warmup where we can be requested to add some already expired asset. We want to skip sending it
297  /// to our live <see cref="_dataQueueHandler"/> instance to avoid explosions. But we do want to add warmup enumerators</remarks>
298  private bool IsExpired(SubscriptionDataConfig dataConfig)
299  {
300  var mapFile = _mapFileProvider.ResolveMapFile(dataConfig);
301  var delistingDate = dataConfig.Symbol.GetDelistingDate(mapFile);
302  return _timeProvider.GetUtcNow().Date > delistingDate.ConvertToUtc(dataConfig.ExchangeTimeZone);
303  }
304 
305  private IEnumerator<BaseData> Subscribe(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler, Func<SubscriptionDataConfig, bool> isExpired)
306  {
307  return new LiveSubscriptionEnumerator(dataConfig, _dataQueueHandler, newDataAvailableHandler, isExpired);
308  }
309 
310  /// <summary>
311  /// Creates a new subscription for universe selection
312  /// </summary>
313  /// <param name="request">The subscription request</param>
314  private Subscription CreateUniverseSubscription(SubscriptionRequest request)
315  {
316  Subscription subscription = null;
317 
318  // TODO : Consider moving the creating of universe subscriptions to a separate, testable class
319 
320  // grab the relevant exchange hours
321  var config = request.Universe.Configuration;
322  var localEndTime = request.EndTimeUtc.ConvertFromUtc(request.Security.Exchange.TimeZone);
323  var tzOffsetProvider = new TimeZoneOffsetProvider(request.Configuration.ExchangeTimeZone, request.StartTimeUtc, request.EndTimeUtc);
324 
325  IEnumerator<BaseData> enumerator = null;
326 
327  var timeTriggered = request.Universe as ITimeTriggeredUniverse;
328  if (timeTriggered != null)
329  {
330  Log.Trace($"LiveTradingDataFeed.CreateUniverseSubscription(): Creating user defined universe: {config.Symbol.ID}");
331 
332  // spoof a tick on the requested interval to trigger the universe selection function
333  var enumeratorFactory = new TimeTriggeredUniverseSubscriptionEnumeratorFactory(timeTriggered, MarketHoursDatabase.FromDataFolder(), _frontierTimeProvider);
334  enumerator = enumeratorFactory.CreateEnumerator(request, _dataProvider);
335 
336  enumerator = new FrontierAwareEnumerator(enumerator, _timeProvider, tzOffsetProvider);
337 
338  var enqueueable = new EnqueueableEnumerator<BaseData>();
339  _customExchange.AddEnumerator(new EnumeratorHandler(config.Symbol, enumerator, enqueueable));
340  enumerator = enqueueable;
341  }
342  else if (config.Type.IsAssignableTo(typeof(ETFConstituentUniverse)) ||
343  config.Type.IsAssignableTo(typeof(FundamentalUniverse)) ||
344  request.Universe is OptionChainUniverse ||
345  request.Universe is FuturesChainUniverse)
346  {
347  Log.Trace($"LiveTradingDataFeed.CreateUniverseSubscription(): Creating {config.Type.Name} universe: {config.Symbol.ID}");
348 
349  // Will try to pull data from the data folder every 10min, file with yesterdays date.
350  // If lean is started today it will trigger initial coarse universe selection
351  var factory = new LiveCustomDataSubscriptionEnumeratorFactory(_timeProvider,
352  _algorithm.ObjectStore,
353  // we adjust time to the previous tradable date
354  time => Time.GetStartTimeForTradeBars(request.Security.Exchange.Hours, time, Time.OneDay, 1, false, config.DataTimeZone, _algorithm.Settings.DailyPreciseEndTime),
355  TimeSpan.FromMinutes(10)
356  );
357  var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
358 
359  // aggregates each coarse data point into a single BaseDataCollection
360  var aggregator = new BaseDataCollectionAggregatorEnumerator(enumeratorStack, config.Symbol, true);
361  var enqueable = new EnqueueableEnumerator<BaseData>();
362  _customExchange.AddEnumerator(config.Symbol, aggregator, handleData: data =>
363  {
364  enqueable.Enqueue(data);
365  subscription?.OnNewDataAvailable();
366  });
367 
368  enumerator = GetConfiguredFrontierAwareEnumerator(enqueable, tzOffsetProvider,
369  // advance time if before 23pm or after 5am and not on Saturdays
370  time => time.Hour < 23 && time.Hour > 5 && time.DayOfWeek != DayOfWeek.Saturday);
371  }
372  else
373  {
374  Log.Trace("LiveTradingDataFeed.CreateUniverseSubscription(): Creating custom universe: " + config.Symbol.ID);
375 
376  var factory = new LiveCustomDataSubscriptionEnumeratorFactory(_timeProvider, _algorithm.ObjectStore);
377  var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
378  enumerator = new BaseDataCollectionAggregatorEnumerator(enumeratorStack, config.Symbol, liveMode: true);
379 
380  var enqueueable = new EnqueueableEnumerator<BaseData>();
381  _customExchange.AddEnumerator(new EnumeratorHandler(config.Symbol, enumerator, enqueueable));
382  enumerator = enqueueable;
383  }
384 
385  enumerator = AddScheduleWrapper(request, enumerator, new PredicateTimeProvider(_frontierTimeProvider, (currentUtcDateTime) => {
386  // will only let time advance after it's passed the live time shift frontier
387  return currentUtcDateTime.TimeOfDay > _scheduledUniverseUtcTimeShift;
388  }));
389 
390  enumerator = GetWarmupEnumerator(request, enumerator);
391 
392  // create the subscription
393  var subscriptionDataEnumerator = new SubscriptionDataEnumerator(request.Configuration, request.Security.Exchange.Hours, tzOffsetProvider,
394  enumerator, request.IsUniverseSubscription, _algorithm.Settings.DailyPreciseEndTime);
395  subscription = new Subscription(request, subscriptionDataEnumerator, tzOffsetProvider);
396 
397  return subscription;
398  }
399 
400  /// <summary>
401  /// Build and apply the warmup enumerators when required
402  /// </summary>
403  private IEnumerator<BaseData> GetWarmupEnumerator(SubscriptionRequest request, IEnumerator<BaseData> liveEnumerator)
404  {
405  if (_algorithm.IsWarmingUp)
406  {
407  var warmupRequest = new SubscriptionRequest(request, endTimeUtc: _timeProvider.GetUtcNow(),
408  // we will not fill forward each warmup enumerators separately but concatenated bellow
409  configuration: new SubscriptionDataConfig(request.Configuration, fillForward: false,
410  resolution: _algorithm.Settings.WarmupResolution));
411  if (warmupRequest.TradableDaysInDataTimeZone.Any()
412  // make sure there is at least room for a single bar of the requested resolution, else can cause issues with some history providers
413  // this could happen when we create some internal subscription whose start time is 'Now', which we don't really want to warmup
414  && warmupRequest.EndTimeUtc - warmupRequest.StartTimeUtc >= warmupRequest.Configuration.Resolution.ToTimeSpan()
415  // since we change the resolution, let's validate it's still valid configuration (example daily equity quotes are not!)
416  && LeanData.IsValidConfiguration(warmupRequest.Configuration.SecurityType, warmupRequest.Configuration.Resolution, warmupRequest.Configuration.TickType))
417  {
418  // since we will source data locally and from the history provider, let's limit the history request size
419  // by setting a start date respecting the 'MaximumWarmupHistoryDaysLookBack'
420  var historyWarmup = warmupRequest;
421  var warmupHistoryStartDate = warmupRequest.EndTimeUtc.AddDays(-MaximumWarmupHistoryDaysLookBack);
422  if (warmupHistoryStartDate > warmupRequest.StartTimeUtc)
423  {
424  historyWarmup = new SubscriptionRequest(warmupRequest, startTimeUtc: warmupHistoryStartDate);
425  }
426 
427  // let's keep track of the last point we got from the file based enumerator and start our history enumeration from this point
428  // this is much more efficient since these duplicated points will be dropped by the filter righ away causing memory usage spikes
429  var lastPointTracker = new LastPointTracker();
430 
431  var synchronizedWarmupEnumerator = TryAddFillForwardEnumerator(warmupRequest,
432  // we concatenate the file based and history based warmup enumerators, dropping duplicate time stamps
433  new ConcatEnumerator(true, GetFileBasedWarmupEnumerator(warmupRequest, lastPointTracker), GetHistoryWarmupEnumerator(historyWarmup, lastPointTracker)) { CanEmitNull = false },
434  // if required by the original request, we will fill forward the Synced warmup data
436  _algorithm.Settings.WarmupResolution);
437  synchronizedWarmupEnumerator = AddScheduleWrapper(warmupRequest, synchronizedWarmupEnumerator, null);
438 
439  // don't let future data past. We let null pass because that's letting the next enumerator know we've ended because we always return true in live
440  synchronizedWarmupEnumerator = new FilterEnumerator<BaseData>(synchronizedWarmupEnumerator, data => data == null || data.EndTime <= warmupRequest.EndTimeLocal);
441 
442  // the order here is important, concat enumerator will keep the last enumerator given and dispose of the rest
443  liveEnumerator = new ConcatEnumerator(true, synchronizedWarmupEnumerator, liveEnumerator);
444  }
445  }
446  return liveEnumerator;
447  }
448 
449  /// <summary>
450  /// File based warmup enumerator
451  /// </summary>
452  private IEnumerator<BaseData> GetFileBasedWarmupEnumerator(SubscriptionRequest warmup, LastPointTracker lastPointTracker)
453  {
454  IEnumerator<BaseData> result = null;
455  try
456  {
457  result = new FilterEnumerator<BaseData>(CreateEnumerator(warmup),
458  data =>
459  {
460  // don't let future data past, nor fill forward, that will be handled after merging with the history request response
461  if (data == null || data.EndTime < warmup.EndTimeLocal && !data.IsFillForward)
462  {
463  if (data != null)
464  {
465  lastPointTracker.LastDataPoint = data;
466  }
467  return true;
468  }
469  return false;
470  });
471  }
472  catch (Exception e)
473  {
474  Log.Error(e, $"File based warmup: {warmup.Configuration}");
475  }
476  return result;
477  }
478 
479  /// <summary>
480  /// History based warmup enumerator
481  /// </summary>
482  private IEnumerator<BaseData> GetHistoryWarmupEnumerator(SubscriptionRequest warmup, LastPointTracker lastPointTracker)
483  {
484  IEnumerator<BaseData> result;
485  if (warmup.IsUniverseSubscription)
486  {
487  // we ignore the fill forward time span argument because we will fill forwared the concatenated file and history based enumerators next in the stack
488  result = CreateUniverseEnumerator(warmup, createUnderlyingEnumerator: (req, _) => GetHistoryWarmupEnumerator(req, lastPointTracker));
489  }
490  else
491  {
492  // we create an enumerable of which we get the enumerator to defer the creation of the history request until the file based enumeration ended
493  // and potentially the 'lastPointTracker' is available to adjust our start time
494  result = new[] { warmup }.SelectMany(_ =>
495  {
496  var startTimeUtc = warmup.StartTimeUtc;
497  if (lastPointTracker != null && lastPointTracker.LastDataPoint != null)
498  {
499  var lastPointExchangeTime = lastPointTracker.LastDataPoint.Time;
500  if (warmup.Configuration.Resolution == Resolution.Daily)
501  {
502  // time could be 9.30 for example using strict daily end times, but we just want the date in this case
503  lastPointExchangeTime = lastPointExchangeTime.Date;
504  }
505 
506  var utcLastPointTime = lastPointExchangeTime.ConvertToUtc(warmup.ExchangeHours.TimeZone);
507  if (utcLastPointTime > startTimeUtc)
508  {
509  if (Log.DebuggingEnabled)
510  {
511  Log.Debug($"LiveTradingDataFeed.GetHistoryWarmupEnumerator(): Adjusting history warmup start time to {utcLastPointTime} from {startTimeUtc} for {warmup.Configuration}");
512  }
513  startTimeUtc = utcLastPointTime;
514  }
515  }
516  var historyRequest = new Data.HistoryRequest(warmup.Configuration, warmup.ExchangeHours, startTimeUtc, warmup.EndTimeUtc);
517  try
518  {
519  return _algorithm.HistoryProvider.GetHistory(new[] { historyRequest }, _algorithm.TimeZone).Select(slice =>
520  {
521  try
522  {
523  var data = slice.Get(historyRequest.DataType);
524  return (BaseData)data[warmup.Configuration.Symbol];
525  }
526  catch (Exception e)
527  {
528  Log.Error(e, $"History warmup: {warmup.Configuration}");
529  }
530  return null;
531  });
532  }
533  catch
534  {
535  // some history providers could throw if they do not support a type
536  }
537  return Enumerable.Empty<BaseData>();
538  }).GetEnumerator();
539  }
540 
541  return new FilterEnumerator<BaseData>(result,
542  // don't let future data past, nor fill forward, that will be handled after merging with the file based enumerator
543  data => data == null || data.EndTime < warmup.EndTimeLocal && !data.IsFillForward);
544  }
545 
546  /// <summary>
547  /// Will wrap the provided enumerator with a <see cref="FrontierAwareEnumerator"/>
548  /// using a <see cref="PredicateTimeProvider"/> that will advance time based on the provided
549  /// function
550  /// </summary>
551  /// <remarks>Won't advance time if now.Hour is bigger or equal than 23pm, less or equal than 5am or Saturday.
552  /// This is done to prevent universe selection occurring in those hours so that the subscription changes
553  /// are handled correctly.</remarks>
554  private IEnumerator<BaseData> GetConfiguredFrontierAwareEnumerator(
555  IEnumerator<BaseData> enumerator,
556  TimeZoneOffsetProvider tzOffsetProvider,
557  Func<DateTime, bool> customStepEvaluator)
558  {
559  var stepTimeProvider = new PredicateTimeProvider(_frontierTimeProvider, customStepEvaluator);
560 
561  return new FrontierAwareEnumerator(enumerator, stepTimeProvider, tzOffsetProvider);
562  }
563 
564  private IDataQueueUniverseProvider GetUniverseProvider(SecurityType securityType)
565  {
566  if (_dataQueueHandler is not IDataQueueUniverseProvider or DataQueueHandlerManager { HasUniverseProvider: false })
567  {
568  throw new NotSupportedException($"The DataQueueHandler does not support {securityType}.");
569  }
570  return (IDataQueueUniverseProvider)_dataQueueHandler;
571  }
572 
573  private void HandleUnsupportedConfigurationEvent(object _, SubscriptionDataConfig config)
574  {
575  if (_algorithm != null)
576  {
577  lock (_unsupportedConfigurations)
578  {
579  var key = $"{config.Symbol.ID.Market} {config.Symbol.ID.SecurityType} {config.Type.Name}";
580  if (_unsupportedConfigurations.Add(key))
581  {
582  Log.Trace($"LiveTradingDataFeed.HandleUnsupportedConfigurationEvent(): detected unsupported configuration: {config}");
583 
584  _algorithm.Debug($"Warning: {key} data not supported. Please consider reviewing the data providers selection.");
585  }
586  }
587  }
588  }
589 
590  /// <summary>
591  /// Overrides methods of the base data exchange implementation
592  /// </summary>
593  private class EnumeratorHandler : BaseDataExchange.EnumeratorHandler
594  {
595  public EnumeratorHandler(Symbol symbol, IEnumerator<BaseData> enumerator, EnqueueableEnumerator<BaseData> enqueueable)
596  : base(symbol, enumerator, handleData: enqueueable.Enqueue)
597  {
598  EnumeratorFinished += (_, _) => enqueueable.Stop();
599  }
600  }
601 
602  private class LastPointTracker
603  {
604  public BaseData LastDataPoint { get; set; }
605  }
606  }
607 }