18 using System.Collections.Generic;
19 using System.Collections.Specialized;
40 private readonly
bool _liveMode;
41 private bool _sentUniverseScheduleWarning;
44 private List<SubscriptionDataConfig> _subscriptionDataConfigsEnumerator;
48 private readonly Dictionary<SubscriptionDataConfig, SubscriptionDataConfig> _subscriptionManagerSubscriptions =
new();
77 _timeKeeper = timeKeeper;
78 _marketHoursDatabase = marketHoursDatabase;
80 _registeredTypesProvider = registeredTypesProvider;
81 _dataPermissionManager = dataPermissionManager;
88 case NotifyCollectionChangedAction.Add:
89 foreach (var universe
in args.NewItems.OfType<
Universe>())
91 var config = universe.Configuration;
102 _marketHoursDatabase.GetExchangeHours(config),
114 var universeType = universe.GetType();
127 const int maximumLookback = 60;
129 var startLocalTime = start.ConvertFromUtc(security.Exchange.TimeZone);
130 if (universe.UniverseSettings.Schedule.Initialized)
136 if (universe.UniverseSettings.Schedule.Get(startLocalTime.Date, startLocalTime.Date).Any())
140 startLocalTime = startLocalTime.AddDays(-1);
141 if (++loopCount >= maximumLookback)
144 startLocalTime = algorithm.
UtcTime.ConvertFromUtc(security.Exchange.TimeZone);
145 if (!_sentUniverseScheduleWarning)
148 _sentUniverseScheduleWarning =
true;
149 algorithm.
Debug($
"Warning: Found no valid start time for scheduled universe, will use default");
152 }
while (loopCount < maximumLookback);
157 Time.
OneDay, 1, extendedMarketHours:
false, config.DataTimeZone,
159 start = startLocalTime.ConvertToUtc(security.Exchange.TimeZone);
172 case NotifyCollectionChangedAction.Remove:
173 foreach (var universe
in args.OldItems.OfType<
Universe>())
177 if (!universe.DisposeRequested)
185 throw new NotImplementedException(
"The specified action is not implemented: " + args.Action);
196 .Where(subscription => subscription.Configuration.FillDataForward && subscription.Configuration.Resolution !=
Resolution.Tick)
197 .SelectMany(subscription => subscription.SubscriptionRequests)
200 if(requests.Count > 0)
202 Log.
Trace($
"DataManager(): Fill forward resolution has changed from {changedEvent.Old} to {changedEvent.New} at utc: {algorithm.UtcTime}. " +
203 $
"Restarting {requests.Count} subscriptions...");
209 foreach (var request
in requests)
213 RemoveSubscriptionInternal(request.Configuration, universe: request.Universe, forceSubscriptionRemoval:
true);
217 foreach (var request
in requests)
222 var startUtc = algorithm.
UtcTime;
225 if (!algorithm.
GetLocked() && request.StartTimeUtc < startUtc)
227 startUtc = request.StartTimeUtc;
230 startTimeUtc: startUtc.AddTicks(1),
240 #region IDataFeedSubscriptionManager
259 catch (Exception err)
261 Log.
Error(err,
"DataManager.RemoveAllSubscriptions():" +
262 $
"Error removing: {subscription.Configuration}");
274 lock (_subscriptionManagerSubscriptions)
280 _subscriptionDataConfigsEnumerator =
null;
290 return subscription.Configuration.IsInternalFeed;
295 throw new InvalidOperationException($
"{DataNormalizationMode.ScaledRaw} normalization mode only intended for history requests.");
301 subscription = _dataFeed.CreateSubscription(request);
303 if (subscription ==
null)
305 Log.
Trace($
"DataManager.AddSubscription(): Unable to add subscription for: {request.Configuration}");
313 OnSubscriptionAdded(subscription);
314 Log.
Trace($
"DataManager.AddSubscription(): Added {request.Configuration}." +
315 $
" Start: {request.StartTimeUtc}. End: {request.EndTimeUtc}");
321 Log.
Debug($
"DataManager.AddSubscription(): Added {request.Configuration}." +
322 $
" Start: {request.StartTimeUtc}. End: {request.EndTimeUtc}");
337 return RemoveSubscriptionInternal(configuration, universe, forceSubscriptionRemoval:
false);
356 if (subscription.RemoveSubscriptionRequest(universe))
360 Log.
Error($
"DataManager.RemoveSubscription(): Unable to remove {configuration}");
364 _dataFeed.RemoveSubscription(subscription);
368 OnSubscriptionRemoved(subscription);
371 subscription.Dispose();
373 RemoveSubscriptionDataConfig(subscription);
375 if (forceSubscriptionRemoval)
377 subscription.MarkAsRemovedFromUniverse();
382 Log.
Trace($
"DataManager.RemoveSubscription(): Removed {configuration}");
388 Log.
Debug($
"DataManager.RemoveSubscription(): Removed {configuration}");
393 else if (universe !=
null)
398 lock (_subscriptionManagerSubscriptions)
400 if (_subscriptionManagerSubscriptions.Remove(configuration))
402 _subscriptionDataConfigsEnumerator =
null;
413 private void OnSubscriptionAdded(Subscription subscription)
422 private void OnSubscriptionRemoved(Subscription subscription)
429 #region IAlgorithmSubscriptionManager
438 lock (_subscriptionManagerSubscriptions)
440 if(_subscriptionDataConfigsEnumerator ==
null)
442 _subscriptionDataConfigsEnumerator = _subscriptionManagerSubscriptions.Values.ToList();
444 return _subscriptionDataConfigsEnumerator;
456 lock (_subscriptionManagerSubscriptions)
458 if (!_subscriptionManagerSubscriptions.TryGetValue(newConfig, out config))
460 _subscriptionManagerSubscriptions[newConfig] = config = newConfig;
461 _subscriptionDataConfigsEnumerator =
null;
466 if (!ReferenceEquals(config, newConfig))
472 Log.
Debug(
"DataManager.SubscriptionManagerGetOrAdd(): subscription already added: " + config);
489 private void RemoveSubscriptionDataConfig(
Subscription subscription)
494 lock (_subscriptionManagerSubscriptions)
496 if (_subscriptionManagerSubscriptions.Remove(subscription.
Configuration))
498 _subscriptionDataConfigsEnumerator =
null;
509 lock (_subscriptionManagerSubscriptions)
511 return _subscriptionManagerSubscriptions.Count;
515 #region ISubscriptionDataConfigService
531 bool fillForward =
true,
532 bool extendedMarketHours =
false,
533 bool isFilteredSubscription =
true,
534 bool isInternalFeed =
false,
535 bool isCustomData =
false,
538 uint contractDepthOffset = 0
541 return Add(symbol, resolution, fillForward, extendedMarketHours, isFilteredSubscription, isInternalFeed, isCustomData,
543 dataNormalizationMode, dataMappingMode, contractDepthOffset)
552 public List<SubscriptionDataConfig>
Add(
555 bool fillForward =
true,
556 bool extendedMarketHours =
false,
557 bool isFilteredSubscription =
true,
558 bool isInternalFeed =
false,
559 bool isCustomData =
false,
560 List<Tuple<Type, TickType>> subscriptionDataTypes =
null,
563 uint contractDepthOffset = 0
566 var dataTypes = subscriptionDataTypes;
567 if(dataTypes ==
null)
572 dataTypes =
new List<Tuple<Type, TickType>> {
new Tuple<Type, TickType>(type,
TickType.Trade) };
580 if (!dataTypes.Any())
582 throw new ArgumentNullException(nameof(dataTypes),
"At least one type needed to create new subscriptions");
585 var resolutionWasProvided = resolution.HasValue;
586 foreach (var typeTuple
in dataTypes)
588 var baseInstance = typeTuple.Item1.GetBaseDataInstance();
589 baseInstance.Symbol = symbol;
590 if (!resolutionWasProvided)
592 var defaultResolution = baseInstance.DefaultResolution();
593 if (resolution.HasValue && resolution != defaultResolution)
597 throw new InvalidOperationException(
598 $
"Different data types ({string.Join(",
", dataTypes.Select(tuple => tuple.Item1))})" +
599 $
" provided different default resolutions {defaultResolution} and {resolution}, this is an unexpected invalid operation.");
601 resolution = defaultResolution;
609 var supportedResolutions = baseInstance.SupportedResolutions();
610 if (supportedResolutions.Contains(resolution.Value))
615 throw new ArgumentException($
"Sorry {resolution.ToStringInvariant()} is not a supported resolution for {typeTuple.Item1.Name}" +
616 $
" and SecurityType.{symbol.SecurityType.ToStringInvariant()}." +
617 $
" Please change your AddData to use one of the supported resolutions ({string.Join(",
", supportedResolutions)}).");
621 var marketHoursDbEntry = _marketHoursDatabase.GetEntry(symbol, dataTypes.Select(tuple => tuple.Item1));
623 var exchangeHours = marketHoursDbEntry.ExchangeHours;
624 if (symbol.ID.SecurityType.IsOption() ||
630 if (marketHoursDbEntry.DataTimeZone ==
null)
632 throw new ArgumentNullException(nameof(marketHoursDbEntry.DataTimeZone),
633 "DataTimeZone is a required parameter for new subscriptions. Set to the time zone the raw data is time stamped in.");
636 if (exchangeHours.TimeZone ==
null)
638 throw new ArgumentNullException(nameof(exchangeHours.TimeZone),
639 "ExchangeTimeZone is a required parameter for new subscriptions. Set to the time zone the security exchange resides in.");
642 var result = (from subscriptionDataType in dataTypes
643 let dataType = subscriptionDataType.Item1
644 let tickType = subscriptionDataType.Item2
649 marketHoursDbEntry.DataTimeZone,
650 exchangeHours.TimeZone,
654 subscriptionDataTypes ==
null && tickType ==
TickType.OpenInterest || isInternalFeed,
656 isFilteredSubscription: isFilteredSubscription,
658 dataNormalizationMode: dataNormalizationMode,
659 dataMappingMode: dataMappingMode,
660 contractDepthOffset: contractDepthOffset)).ToList();
662 for (
int i = 0; i < result.Count; i++)
667 _registeredTypesProvider.RegisterType(result[i].Type);
688 if (symbolSecurityType.IsOption())
690 return new List<Tuple<Type, TickType>> {
new Tuple<Type, TickType>(typeof(
OptionUniverse),
TickType.Quote) };
693 return new List<Tuple<Type, TickType>> {
new Tuple<Type, TickType>(typeof(
FutureUniverse),
TickType.Quote) };
700 var result = availableDataType
701 .Select(tickType =>
new Tuple<Type, TickType>(
LeanData.
GetDataType(resolution, tickType), tickType)).ToList();
716 lock (_subscriptionManagerSubscriptions)
718 return _subscriptionManagerSubscriptions.Keys
719 .Where(config => (includeInternalConfigs || !config.IsInternalFeed) && (symbol ==
null || config.Symbol.ID == symbol.ID))
720 .OrderBy(config => config.IsInternalFeed)