25 using System.Collections.Generic;
43 private static readonly
int MaximumWarmupHistoryDaysLookBack =
Config.
GetInt(
"maximum-warmup-history-days-look-back", 5);
62 private readonly TimeSpan _scheduledUniverseUtcTimeShift = TimeSpan.FromMinutes(11 * 60 + DateTime.UtcNow.Second);
63 private readonly HashSet<string> _unsupportedConfigurations =
new();
88 throw new ArgumentException(
"The LiveTradingDataFeed requires a LiveNodePacket.");
91 _algorithm = algorithm;
94 _dataProvider = dataProvider;
95 _mapFileProvider = mapFileProvider;
96 _factorFileProvider = factorFileProvider;
97 _channelProvider = dataChannelProvider;
103 _dataQueueHandler?.
SetJob(_job);
106 _customExchange.
Start();
110 base.Initialize(algorithm, job, resultHandler, mapFileProvider, factorFileProvider, dataProvider, subscriptionManager, dataFeedTimeProvider, dataChannelProvider);
125 ? CreateUniverseSubscription(request)
126 : CreateDataSubscription(request);
128 catch (Exception err)
130 Log.
Error(err, $
"CreateSubscription(): Failed configuration: '{request.Configuration}'");
132 _algorithm.SetRuntimeError(err, $
"Failed to subscribe to {request.Configuration.Symbol}");
153 _dataQueueHandler.UnsubscribeWithMapping(subscription.
Configuration);
165 Log.
Trace(
"LiveTradingDataFeed.Exit(): Start. Setting cancellation token...");
168 manager.UnsupportedConfiguration -= HandleUnsupportedConfigurationEvent;
170 _customExchange?.
Stop();
171 Log.
Trace(
"LiveTradingDataFeed.Exit(): Exit Finished.");
185 result.UnsupportedConfiguration += HandleUnsupportedConfigurationEvent;
210 IEnumerator<BaseData> enumerator =
null;
220 var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
225 enqueable.Enqueue(data);
227 subscription?.OnNewDataAvailable();
230 enumerator = enqueable;
234 var auxEnumerators =
new List<IEnumerator<BaseData>>();
237 _factorFileProvider, request.
StartTimeLocal, out var auxDataEnumator))
239 auxEnumerators.Add(auxDataEnumator);
242 EventHandler handler = (_, _) => subscription?.OnNewDataAvailable();
243 enumerator = Subscribe(request.
Configuration, handler, IsExpired);
245 if (auxEnumerators.Count > 0)
253 if (request.
Configuration.PricesShouldBeScaled(liveMode:
true))
284 enumerator = GetWarmupEnumerator(request, enumerator);
288 subscription =
new Subscription(request, subscriptionDataEnumerator, timeZoneOffsetProvider);
300 var mapFile = _mapFileProvider.ResolveMapFile(dataConfig);
301 var delistingDate = dataConfig.
Symbol.GetDelistingDate(mapFile);
305 private IEnumerator<BaseData> Subscribe(
SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler, Func<SubscriptionDataConfig, bool> isExpired)
316 Subscription subscription =
null;
325 IEnumerator<BaseData> enumerator =
null;
328 if (timeTriggered !=
null)
330 Log.
Trace($
"LiveTradingDataFeed.CreateUniverseSubscription(): Creating user defined universe: {config.Symbol.ID}");
334 enumerator = enumeratorFactory.CreateEnumerator(request, _dataProvider);
339 _customExchange.
AddEnumerator(
new EnumeratorHandler(config.Symbol, enumerator, enqueueable));
340 enumerator = enqueueable;
347 Log.
Trace($
"LiveTradingDataFeed.CreateUniverseSubscription(): Creating {config.Type.Name} universe: {config.Symbol.ID}");
355 TimeSpan.FromMinutes(10)
357 var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
362 _customExchange.
AddEnumerator(config.Symbol, aggregator, handleData: data =>
364 enqueable.Enqueue(data);
365 subscription?.OnNewDataAvailable();
368 enumerator = GetConfiguredFrontierAwareEnumerator(enqueable, tzOffsetProvider,
370 time => time.Hour < 23 && time.Hour > 5 && time.DayOfWeek != DayOfWeek.Saturday);
374 Log.
Trace(
"LiveTradingDataFeed.CreateUniverseSubscription(): Creating custom universe: " + config.Symbol.ID);
377 var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
381 _customExchange.
AddEnumerator(
new EnumeratorHandler(config.Symbol, enumerator, enqueueable));
382 enumerator = enqueueable;
385 enumerator =
AddScheduleWrapper(request, enumerator,
new PredicateTimeProvider(_frontierTimeProvider, (currentUtcDateTime) => {
387 return currentUtcDateTime.TimeOfDay > _scheduledUniverseUtcTimeShift;
390 enumerator = GetWarmupEnumerator(request, enumerator);
395 subscription =
new Subscription(request, subscriptionDataEnumerator, tzOffsetProvider);
403 private IEnumerator<BaseData> GetWarmupEnumerator(
SubscriptionRequest request, IEnumerator<BaseData> liveEnumerator)
411 if (warmupRequest.TradableDaysInDataTimeZone.Any()
414 && warmupRequest.EndTimeUtc - warmupRequest.StartTimeUtc >= warmupRequest.Configuration.Resolution.ToTimeSpan()
416 &&
LeanData.
IsValidConfiguration(warmupRequest.Configuration.SecurityType, warmupRequest.Configuration.Resolution, warmupRequest.Configuration.TickType))
420 var historyWarmup = warmupRequest;
421 var warmupHistoryStartDate = warmupRequest.EndTimeUtc.AddDays(-MaximumWarmupHistoryDaysLookBack);
422 if (warmupHistoryStartDate > warmupRequest.StartTimeUtc)
424 historyWarmup =
new SubscriptionRequest(warmupRequest, startTimeUtc: warmupHistoryStartDate);
429 var lastPointTracker =
new LastPointTracker();
433 new ConcatEnumerator(
true, GetFileBasedWarmupEnumerator(warmupRequest, lastPointTracker), GetHistoryWarmupEnumerator(historyWarmup, lastPointTracker)) { CanEmitNull =
false },
437 synchronizedWarmupEnumerator =
AddScheduleWrapper(warmupRequest, synchronizedWarmupEnumerator,
null);
440 synchronizedWarmupEnumerator =
new FilterEnumerator<BaseData>(synchronizedWarmupEnumerator, data => data ==
null || data.EndTime <= warmupRequest.EndTimeLocal);
443 liveEnumerator =
new ConcatEnumerator(
true, synchronizedWarmupEnumerator, liveEnumerator);
446 return liveEnumerator;
452 private IEnumerator<BaseData> GetFileBasedWarmupEnumerator(
SubscriptionRequest warmup, LastPointTracker lastPointTracker)
454 IEnumerator<BaseData> result =
null;
461 if (data ==
null || data.EndTime < warmup.
EndTimeLocal && !data.IsFillForward)
465 lastPointTracker.LastDataPoint = data;
474 Log.
Error(e, $
"File based warmup: {warmup.Configuration}");
482 private IEnumerator<BaseData> GetHistoryWarmupEnumerator(
SubscriptionRequest warmup, LastPointTracker lastPointTracker)
484 IEnumerator<BaseData> result;
488 result = CreateUniverseEnumerator(warmup, createUnderlyingEnumerator: (req, _) => GetHistoryWarmupEnumerator(req, lastPointTracker));
494 result =
new[] { warmup }.SelectMany(_ =>
497 if (lastPointTracker !=
null && lastPointTracker.LastDataPoint !=
null)
499 var lastPointExchangeTime = lastPointTracker.LastDataPoint.Time;
500 if (warmup.Configuration.Resolution == Resolution.Daily)
503 lastPointExchangeTime = lastPointExchangeTime.Date;
507 if (utcLastPointTime > startTimeUtc)
511 Log.Debug($
"LiveTradingDataFeed.GetHistoryWarmupEnumerator(): Adjusting history warmup start time to {utcLastPointTime} from {startTimeUtc} for {warmup.Configuration}");
513 startTimeUtc = utcLastPointTime;
519 return _algorithm.HistoryProvider.GetHistory(new[] { historyRequest }, _algorithm.
TimeZone).Select(slice =>
523 var data = slice.Get(historyRequest.DataType);
528 Log.
Error(e, $
"History warmup: {warmup.Configuration}");
537 return Enumerable.Empty<
BaseData>();
543 data => data ==
null || data.EndTime < warmup.
EndTimeLocal && !data.IsFillForward);
554 private IEnumerator<BaseData> GetConfiguredFrontierAwareEnumerator(
555 IEnumerator<BaseData> enumerator,
556 TimeZoneOffsetProvider tzOffsetProvider,
557 Func<DateTime, bool> customStepEvaluator)
559 var stepTimeProvider =
new PredicateTimeProvider(_frontierTimeProvider, customStepEvaluator);
568 throw new NotSupportedException($
"The DataQueueHandler does not support {securityType}.");
575 if (_algorithm !=
null)
577 lock (_unsupportedConfigurations)
579 var key = $
"{config.Symbol.ID.Market} {config.Symbol.ID.SecurityType} {config.Type.Name}";
580 if (_unsupportedConfigurations.Add(key))
582 Log.
Trace($
"LiveTradingDataFeed.HandleUnsupportedConfigurationEvent(): detected unsupported configuration: {config}");
584 _algorithm.
Debug($
"Warning: {key} data not supported. Please consider reviewing the data providers selection.");
593 private class EnumeratorHandler : BaseDataExchange.EnumeratorHandler
596 : base(symbol, enumerator, handleData: enqueueable.Enqueue)
598 EnumeratorFinished += (_, _) => enqueueable.
Stop();
602 private class LastPointTracker
604 public BaseData LastDataPoint {
get;
set; }