Lean  $LEAN_TAG$
DataManager.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using System.Collections.Specialized;
20 using System.Linq;
21 using QuantConnect.Data;
26 using QuantConnect.Logging;
28 using QuantConnect.Util;
29 
31 {
32  /// <summary>
33  /// DataManager will manage the subscriptions for both the DataFeeds and the SubscriptionManager
34  /// </summary>
36  {
37  private readonly IDataFeed _dataFeed;
38  private readonly MarketHoursDatabase _marketHoursDatabase;
39  private readonly ITimeKeeper _timeKeeper;
40  private readonly bool _liveMode;
41  private bool _sentUniverseScheduleWarning;
42  private readonly IRegisteredSecurityDataTypesProvider _registeredTypesProvider;
43  private readonly IDataPermissionManager _dataPermissionManager;
44  private List<SubscriptionDataConfig> _subscriptionDataConfigsEnumerator;
45 
46  /// There is no ConcurrentHashSet collection in .NET,
47  /// so we use ConcurrentDictionary with byte value to minimize memory usage
48  private readonly Dictionary<SubscriptionDataConfig, SubscriptionDataConfig> _subscriptionManagerSubscriptions = new();
49 
50  /// <summary>
51  /// Event fired when a new subscription is added
52  /// </summary>
53  public event EventHandler<Subscription> SubscriptionAdded;
54 
55  /// <summary>
56  /// Event fired when an existing subscription is removed
57  /// </summary>
58  public event EventHandler<Subscription> SubscriptionRemoved;
59 
60  /// <summary>
61  /// Creates a new instance of the DataManager
62  /// </summary>
63  public DataManager(
64  IDataFeed dataFeed,
65  UniverseSelection universeSelection,
66  IAlgorithm algorithm,
67  ITimeKeeper timeKeeper,
68  MarketHoursDatabase marketHoursDatabase,
69  bool liveMode,
70  IRegisteredSecurityDataTypesProvider registeredTypesProvider,
71  IDataPermissionManager dataPermissionManager)
72  {
73  _dataFeed = dataFeed;
74  UniverseSelection = universeSelection;
77  _timeKeeper = timeKeeper;
78  _marketHoursDatabase = marketHoursDatabase;
79  _liveMode = liveMode;
80  _registeredTypesProvider = registeredTypesProvider;
81  _dataPermissionManager = dataPermissionManager;
82 
83  // wire ourselves up to receive notifications when universes are added/removed
84  algorithm.UniverseManager.CollectionChanged += (sender, args) =>
85  {
86  switch (args.Action)
87  {
88  case NotifyCollectionChangedAction.Add:
89  foreach (var universe in args.NewItems.OfType<Universe>())
90  {
91  var config = universe.Configuration;
92  var start = algorithm.UtcTime;
93 
94  var end = algorithm.LiveMode ? Time.EndOfTime
95  : algorithm.EndDate.ConvertToUtc(algorithm.TimeZone);
96 
97  Security security;
98  if (!algorithm.Securities.TryGetValue(config.Symbol, out security))
99  {
100  // create a canonical security object if it doesn't exist
101  security = new Security(
102  _marketHoursDatabase.GetExchangeHours(config),
103  config,
104  algorithm.Portfolio.CashBook[algorithm.AccountCurrency],
106  algorithm.Portfolio.CashBook,
108  new SecurityCache()
109  );
110  }
111 
112  // Let's adjust the start time to the previous tradable date
113  // so universe selection always happens right away at the start of the algorithm.
114  var universeType = universe.GetType();
115  if (
116  // We exclude the UserDefinedUniverse because their selection already happens at the algorithm start time.
117  // For instance, ETFs universe selection depends its first trigger time to be before the equity universe
118  // (the UserDefinedUniverse), because the ETFs are EndTime-indexed and that would make their first selection
119  // time to be before the algorithm start time, with the EndTime being the algorithms's start date,
120  // and both the Equity and the ETFs constituents first selection to happen together.
121  !universeType.IsAssignableTo(typeof(UserDefinedUniverse)) &&
122  // We exclude the ScheduledUniverse because it's already scheduled to run at a specific time.
123  // Adjusting the start time would cause the first selection trigger time to be before the algorithm start time,
124  // making the selection to be triggered at the first algorithm time, which would be the exact StartDate.
125  universeType != typeof(ScheduledUniverse))
126  {
127  const int maximumLookback = 60;
128  var loopCount = 0;
129  var startLocalTime = start.ConvertFromUtc(security.Exchange.TimeZone);
130  if (universe.UniverseSettings.Schedule.Initialized)
131  {
132  do
133  {
134  // determine if there's a scheduled selection time at the current start local time date, note that next
135  // we get the previous day of the first scheduled date we find, so we are sure the data is available to trigger selection
136  if (universe.UniverseSettings.Schedule.Get(startLocalTime.Date, startLocalTime.Date).Any())
137  {
138  break;
139  }
140  startLocalTime = startLocalTime.AddDays(-1);
141  if (++loopCount >= maximumLookback)
142  {
143  // fallback to the original, we found none
144  startLocalTime = algorithm.UtcTime.ConvertFromUtc(security.Exchange.TimeZone);
145  if (!_sentUniverseScheduleWarning)
146  {
147  // just in case
148  _sentUniverseScheduleWarning = true;
149  algorithm.Debug($"Warning: Found no valid start time for scheduled universe, will use default");
150  }
151  }
152  } while (loopCount < maximumLookback);
153  }
154 
155  startLocalTime = Time.GetStartTimeForTradeBars(security.Exchange.Hours, startLocalTime,
156  // disable universe selection on extended market hours, for example futures/index options have a sunday pre market we are not interested on
157  Time.OneDay, 1, extendedMarketHours: false, config.DataTimeZone,
158  LeanData.UseDailyStrictEndTimes(algorithm.Settings, config.Type, security.Symbol, Time.OneDay, security.Exchange.Hours));
159  start = startLocalTime.ConvertToUtc(security.Exchange.TimeZone);
160  }
161 
163  new SubscriptionRequest(true,
164  universe,
165  security,
166  config,
167  start,
168  end));
169  }
170  break;
171 
172  case NotifyCollectionChangedAction.Remove:
173  foreach (var universe in args.OldItems.OfType<Universe>())
174  {
175  // removing the subscription will be handled by the SubscriptionSynchronizer
176  // in the next loop as well as executing a UniverseSelection one last time.
177  if (!universe.DisposeRequested)
178  {
179  universe.Dispose();
180  }
181  }
182  break;
183 
184  default:
185  throw new NotImplementedException("The specified action is not implemented: " + args.Action);
186  }
187  };
188 
190  if (!_liveMode)
191  {
193  {
194  var requests = DataFeedSubscriptions
195  // we don't fill forward tick resolution so we don't need to touch their subscriptions
196  .Where(subscription => subscription.Configuration.FillDataForward && subscription.Configuration.Resolution != Resolution.Tick)
197  .SelectMany(subscription => subscription.SubscriptionRequests)
198  .ToList();
199 
200  if(requests.Count > 0)
201  {
202  Log.Trace($"DataManager(): Fill forward resolution has changed from {changedEvent.Old} to {changedEvent.New} at utc: {algorithm.UtcTime}. " +
203  $"Restarting {requests.Count} subscriptions...");
204 
205  // disable reentry while we remove and re add
207 
208  // remove
209  foreach (var request in requests)
210  {
211  // force because we want them actually removed even if still a member of the universe, because the FF res changed
212  // which means we will drop any data points that could be in the next potential slice being created
213  RemoveSubscriptionInternal(request.Configuration, universe: request.Universe, forceSubscriptionRemoval: true);
214  }
215 
216  // re add
217  foreach (var request in requests)
218  {
219  // If it is an add we will set time 1 tick ahead to properly sync data
220  // with next timeslice, avoid emitting now twice.
221  // We do the same in the 'TimeTriggeredUniverseSubscriptionEnumeratorFactory' when handling changes
222  var startUtc = algorithm.UtcTime;
223  // If the algorithm is not initialized (locked) the request start time can be even before the algorithm start time,
224  // like in the case of universe requests that are scheduled to run at a specific time in the past for immediate selection.
225  if (!algorithm.GetLocked() && request.StartTimeUtc < startUtc)
226  {
227  startUtc = request.StartTimeUtc;
228  }
230  startTimeUtc: startUtc.AddTicks(1),
231  configuration: new SubscriptionDataConfig(request.Configuration)));
232  }
233 
235  }
236  };
237  }
238  }
239 
240  #region IDataFeedSubscriptionManager
241 
242  /// <summary>
243  /// Gets the data feed subscription collection
244  /// </summary>
246 
247  /// <summary>
248  /// Will remove all current <see cref="Subscription"/>
249  /// </summary>
251  {
252  // remove each subscription from our collection
253  foreach (var subscription in DataFeedSubscriptions)
254  {
255  try
256  {
257  RemoveSubscription(subscription.Configuration);
258  }
259  catch (Exception err)
260  {
261  Log.Error(err, "DataManager.RemoveAllSubscriptions():" +
262  $"Error removing: {subscription.Configuration}");
263  }
264  }
265  }
266 
267  /// <summary>
268  /// Adds a new <see cref="Subscription"/> to provide data for the specified security.
269  /// </summary>
270  /// <param name="request">Defines the <see cref="SubscriptionRequest"/> to be added</param>
271  /// <returns>True if the subscription was created and added successfully, false otherwise</returns>
273  {
274  lock (_subscriptionManagerSubscriptions)
275  {
276  // guarantee the configuration is present in our config collection
277  // this is related to GH issue 3877: where we added a configuration which we also removed
278  if(_subscriptionManagerSubscriptions.TryAdd(request.Configuration, request.Configuration))
279  {
280  _subscriptionDataConfigsEnumerator = null;
281  }
282  }
283 
284  Subscription subscription;
285  if (DataFeedSubscriptions.TryGetValue(request.Configuration, out subscription))
286  {
287  // duplicate subscription request
288  subscription.AddSubscriptionRequest(request);
289  // only result true if the existing subscription is internal, we actually added something from the users perspective
290  return subscription.Configuration.IsInternalFeed;
291  }
292 
294  {
295  throw new InvalidOperationException($"{DataNormalizationMode.ScaledRaw} normalization mode only intended for history requests.");
296  }
297 
298  // before adding the configuration to the data feed let's assert it's valid
299  _dataPermissionManager.AssertConfiguration(request.Configuration, request.StartTimeLocal, request.EndTimeLocal);
300 
301  subscription = _dataFeed.CreateSubscription(request);
302 
303  if (subscription == null)
304  {
305  Log.Trace($"DataManager.AddSubscription(): Unable to add subscription for: {request.Configuration}");
306  // subscription will be null when there's no tradeable dates for the security between the requested times, so
307  // don't even try to load the data
308  return false;
309  }
310 
311  if (_liveMode)
312  {
313  OnSubscriptionAdded(subscription);
314  Log.Trace($"DataManager.AddSubscription(): Added {request.Configuration}." +
315  $" Start: {request.StartTimeUtc}. End: {request.EndTimeUtc}");
316  }
317  else if(Log.DebuggingEnabled)
318  {
319  // for performance lets not create the message string if debugging is not enabled
320  // this can be executed many times and its in the algorithm thread
321  Log.Debug($"DataManager.AddSubscription(): Added {request.Configuration}." +
322  $" Start: {request.StartTimeUtc}. End: {request.EndTimeUtc}");
323  }
324 
325  return DataFeedSubscriptions.TryAdd(subscription);
326  }
327 
328  /// <summary>
329  /// Removes the <see cref="Subscription"/>, if it exists
330  /// </summary>
331  /// <param name="configuration">The <see cref="SubscriptionDataConfig"/> of the subscription to remove</param>
332  /// <param name="universe">Universe requesting to remove <see cref="Subscription"/>.
333  /// Default value, null, will remove all universes</param>
334  /// <returns>True if the subscription was successfully removed, false otherwise</returns>
335  public bool RemoveSubscription(SubscriptionDataConfig configuration, Universe universe = null)
336  {
337  return RemoveSubscriptionInternal(configuration, universe, forceSubscriptionRemoval: false);
338  }
339 
340  /// <summary>
341  /// Removes the <see cref="Subscription"/>, if it exists
342  /// </summary>
343  /// <param name="configuration">The <see cref="SubscriptionDataConfig"/> of the subscription to remove</param>
344  /// <param name="universe">Universe requesting to remove <see cref="Subscription"/>.
345  /// Default value, null, will remove all universes</param>
346  /// <param name="forceSubscriptionRemoval">We force the subscription removal by marking it as removed from universe, so that all it's data is dropped</param>
347  /// <returns>True if the subscription was successfully removed, false otherwise</returns>
348  private bool RemoveSubscriptionInternal(SubscriptionDataConfig configuration, Universe universe, bool forceSubscriptionRemoval)
349  {
350  // remove the subscription from our collection, if it exists
351  Subscription subscription;
352 
353  if (DataFeedSubscriptions.TryGetValue(configuration, out subscription))
354  {
355  // we remove the subscription when there are no other requests left
356  if (subscription.RemoveSubscriptionRequest(universe))
357  {
358  if (!DataFeedSubscriptions.TryRemove(configuration, out subscription))
359  {
360  Log.Error($"DataManager.RemoveSubscription(): Unable to remove {configuration}");
361  return false;
362  }
363 
364  _dataFeed.RemoveSubscription(subscription);
365 
366  if (_liveMode)
367  {
368  OnSubscriptionRemoved(subscription);
369  }
370 
371  subscription.Dispose();
372 
373  RemoveSubscriptionDataConfig(subscription);
374 
375  if (forceSubscriptionRemoval)
376  {
377  subscription.MarkAsRemovedFromUniverse();
378  }
379 
380  if (_liveMode)
381  {
382  Log.Trace($"DataManager.RemoveSubscription(): Removed {configuration}");
383  }
384  else if(Log.DebuggingEnabled)
385  {
386  // for performance lets not create the message string if debugging is not enabled
387  // this can be executed many times and its in the algorithm thread
388  Log.Debug($"DataManager.RemoveSubscription(): Removed {configuration}");
389  }
390  return true;
391  }
392  }
393  else if (universe != null)
394  {
395  // a universe requested removal of a subscription which wasn't present anymore, this can happen when a subscription ends
396  // it will get removed from the data feed subscription list, but the configuration will remain until the universe removes it
397  // why? the effect I found is that the fill models are using these subscriptions to determine which data they could use
398  lock (_subscriptionManagerSubscriptions)
399  {
400  if (_subscriptionManagerSubscriptions.Remove(configuration))
401  {
402  _subscriptionDataConfigsEnumerator = null;
403  }
404  }
405  }
406  return false;
407  }
408 
409  /// <summary>
410  /// Event invocator for the <see cref="SubscriptionAdded"/> event
411  /// </summary>
412  /// <param name="subscription">The added subscription</param>
413  private void OnSubscriptionAdded(Subscription subscription)
414  {
415  SubscriptionAdded?.Invoke(this, subscription);
416  }
417 
418  /// <summary>
419  /// Event invocator for the <see cref="SubscriptionRemoved"/> event
420  /// </summary>
421  /// <param name="subscription">The removed subscription</param>
422  private void OnSubscriptionRemoved(Subscription subscription)
423  {
424  SubscriptionRemoved?.Invoke(this, subscription);
425  }
426 
427  #endregion
428 
429  #region IAlgorithmSubscriptionManager
430 
431  /// <summary>
432  /// Gets all the current data config subscriptions that are being processed for the SubscriptionManager
433  /// </summary>
434  public IEnumerable<SubscriptionDataConfig> SubscriptionManagerSubscriptions
435  {
436  get
437  {
438  lock (_subscriptionManagerSubscriptions)
439  {
440  if(_subscriptionDataConfigsEnumerator == null)
441  {
442  _subscriptionDataConfigsEnumerator = _subscriptionManagerSubscriptions.Values.ToList();
443  }
444  return _subscriptionDataConfigsEnumerator;
445  }
446  }
447  }
448 
449  /// <summary>
450  /// Gets existing or adds new <see cref="SubscriptionDataConfig" />
451  /// </summary>
452  /// <returns>Returns the SubscriptionDataConfig instance used</returns>
454  {
455  SubscriptionDataConfig config;
456  lock (_subscriptionManagerSubscriptions)
457  {
458  if (!_subscriptionManagerSubscriptions.TryGetValue(newConfig, out config))
459  {
460  _subscriptionManagerSubscriptions[newConfig] = config = newConfig;
461  _subscriptionDataConfigsEnumerator = null;
462  }
463  }
464 
465  // if the reference is not the same, means it was already there and we did not add anything new
466  if (!ReferenceEquals(config, newConfig))
467  {
468  // for performance lets not create the message string if debugging is not enabled
469  // this can be executed many times and its in the algorithm thread
470  if (Log.DebuggingEnabled)
471  {
472  Log.Debug("DataManager.SubscriptionManagerGetOrAdd(): subscription already added: " + config);
473  }
474  }
475  else
476  {
477  // add the time zone to our time keeper
478  _timeKeeper.AddTimeZone(newConfig.ExchangeTimeZone);
479  }
480 
481  return config;
482  }
483 
484  /// <summary>
485  /// Will try to remove a <see cref="SubscriptionDataConfig"/> and update the corresponding
486  /// consumers accordingly
487  /// </summary>
488  /// <param name="subscription">The <see cref="Subscription"/> owning the configuration to remove</param>
489  private void RemoveSubscriptionDataConfig(Subscription subscription)
490  {
491  // the subscription could of ended but might still be part of the universe
492  if (subscription.RemovedFromUniverse.Value)
493  {
494  lock (_subscriptionManagerSubscriptions)
495  {
496  if (_subscriptionManagerSubscriptions.Remove(subscription.Configuration))
497  {
498  _subscriptionDataConfigsEnumerator = null;
499  }
500  }
501  }
502  }
503 
504  /// <summary>
505  /// Returns the amount of data config subscriptions processed for the SubscriptionManager
506  /// </summary>
508  {
509  lock (_subscriptionManagerSubscriptions)
510  {
511  return _subscriptionManagerSubscriptions.Count;
512  }
513  }
514 
515  #region ISubscriptionDataConfigService
516 
517  /// <summary>
518  /// The different <see cref="TickType" /> each <see cref="SecurityType" /> supports
519  /// </summary>
520  public Dictionary<SecurityType, List<TickType>> AvailableDataTypes { get; }
521 
522  /// <summary>
523  /// Creates and adds a list of <see cref="SubscriptionDataConfig" /> for a given symbol and configuration.
524  /// Can optionally pass in desired subscription data type to use.
525  /// If the config already existed will return existing instance instead
526  /// </summary>
528  Type dataType,
529  Symbol symbol,
530  Resolution? resolution = null,
531  bool fillForward = true,
532  bool extendedMarketHours = false,
533  bool isFilteredSubscription = true,
534  bool isInternalFeed = false,
535  bool isCustomData = false,
536  DataNormalizationMode dataNormalizationMode = DataNormalizationMode.Adjusted,
537  DataMappingMode dataMappingMode = DataMappingMode.OpenInterest,
538  uint contractDepthOffset = 0
539  )
540  {
541  return Add(symbol, resolution, fillForward, extendedMarketHours, isFilteredSubscription, isInternalFeed, isCustomData,
542  new List<Tuple<Type, TickType>> { new Tuple<Type, TickType>(dataType, LeanData.GetCommonTickTypeForCommonDataTypes(dataType, symbol.SecurityType))},
543  dataNormalizationMode, dataMappingMode, contractDepthOffset)
544  .First();
545  }
546 
547  /// <summary>
548  /// Creates and adds a list of <see cref="SubscriptionDataConfig" /> for a given symbol and configuration.
549  /// Can optionally pass in desired subscription data types to use.
550  /// If the config already existed will return existing instance instead
551  /// </summary>
552  public List<SubscriptionDataConfig> Add(
553  Symbol symbol,
554  Resolution? resolution = null,
555  bool fillForward = true,
556  bool extendedMarketHours = false,
557  bool isFilteredSubscription = true,
558  bool isInternalFeed = false,
559  bool isCustomData = false,
560  List<Tuple<Type, TickType>> subscriptionDataTypes = null,
561  DataNormalizationMode dataNormalizationMode = DataNormalizationMode.Adjusted,
562  DataMappingMode dataMappingMode = DataMappingMode.OpenInterest,
563  uint contractDepthOffset = 0
564  )
565  {
566  var dataTypes = subscriptionDataTypes;
567  if(dataTypes == null)
568  {
569  if (symbol.SecurityType == SecurityType.Base && SecurityIdentifier.TryGetCustomDataTypeInstance(symbol.ID.Symbol, out var type))
570  {
571  // we've detected custom data request if we find a type let's use it
572  dataTypes = new List<Tuple<Type, TickType>> { new Tuple<Type, TickType>(type, TickType.Trade) };
573  }
574  else
575  {
576  dataTypes = LookupSubscriptionConfigDataTypes(symbol.SecurityType, resolution ?? Resolution.Minute, symbol.IsCanonical());
577  }
578  }
579 
580  if (!dataTypes.Any())
581  {
582  throw new ArgumentNullException(nameof(dataTypes), "At least one type needed to create new subscriptions");
583  }
584 
585  var resolutionWasProvided = resolution.HasValue;
586  foreach (var typeTuple in dataTypes)
587  {
588  var baseInstance = typeTuple.Item1.GetBaseDataInstance();
589  baseInstance.Symbol = symbol;
590  if (!resolutionWasProvided)
591  {
592  var defaultResolution = baseInstance.DefaultResolution();
593  if (resolution.HasValue && resolution != defaultResolution)
594  {
595  // we are here because there are multiple 'dataTypes'.
596  // if we get different default resolutions lets throw, this shouldn't happen
597  throw new InvalidOperationException(
598  $"Different data types ({string.Join(",", dataTypes.Select(tuple => tuple.Item1))})" +
599  $" provided different default resolutions {defaultResolution} and {resolution}, this is an unexpected invalid operation.");
600  }
601  resolution = defaultResolution;
602  }
603  else
604  {
605  // only assert resolution in backtesting, live can use other data source
606  // for example daily data for options
607  if (!_liveMode)
608  {
609  var supportedResolutions = baseInstance.SupportedResolutions();
610  if (supportedResolutions.Contains(resolution.Value))
611  {
612  continue;
613  }
614 
615  throw new ArgumentException($"Sorry {resolution.ToStringInvariant()} is not a supported resolution for {typeTuple.Item1.Name}" +
616  $" and SecurityType.{symbol.SecurityType.ToStringInvariant()}." +
617  $" Please change your AddData to use one of the supported resolutions ({string.Join(",", supportedResolutions)}).");
618  }
619  }
620  }
621  var marketHoursDbEntry = _marketHoursDatabase.GetEntry(symbol, dataTypes.Select(tuple => tuple.Item1));
622 
623  var exchangeHours = marketHoursDbEntry.ExchangeHours;
624  if (symbol.ID.SecurityType.IsOption() ||
625  symbol.ID.SecurityType == SecurityType.Index)
626  {
627  dataNormalizationMode = DataNormalizationMode.Raw;
628  }
629 
630  if (marketHoursDbEntry.DataTimeZone == null)
631  {
632  throw new ArgumentNullException(nameof(marketHoursDbEntry.DataTimeZone),
633  "DataTimeZone is a required parameter for new subscriptions. Set to the time zone the raw data is time stamped in.");
634  }
635 
636  if (exchangeHours.TimeZone == null)
637  {
638  throw new ArgumentNullException(nameof(exchangeHours.TimeZone),
639  "ExchangeTimeZone is a required parameter for new subscriptions. Set to the time zone the security exchange resides in.");
640  }
641 
642  var result = (from subscriptionDataType in dataTypes
643  let dataType = subscriptionDataType.Item1
644  let tickType = subscriptionDataType.Item2
645  select new SubscriptionDataConfig(
646  dataType,
647  symbol,
648  resolution.Value,
649  marketHoursDbEntry.DataTimeZone,
650  exchangeHours.TimeZone,
651  fillForward,
652  extendedMarketHours,
653  // if the subscription data types were not provided and the tick type is OpenInterest we make it internal
654  subscriptionDataTypes == null && tickType == TickType.OpenInterest || isInternalFeed,
655  isCustomData,
656  isFilteredSubscription: isFilteredSubscription,
657  tickType: tickType,
658  dataNormalizationMode: dataNormalizationMode,
659  dataMappingMode: dataMappingMode,
660  contractDepthOffset: contractDepthOffset)).ToList();
661 
662  for (int i = 0; i < result.Count; i++)
663  {
664  result[i] = SubscriptionManagerGetOrAdd(result[i]);
665 
666  // track all registered data types
667  _registeredTypesProvider.RegisterType(result[i].Type);
668  }
669  return result;
670  }
671 
672  /// <summary>
673  /// Get the data feed types for a given <see cref="SecurityType" /> <see cref="Resolution" />
674  /// </summary>
675  /// <param name="symbolSecurityType">The <see cref="SecurityType" /> used to determine the types</param>
676  /// <param name="resolution">The resolution of the data requested</param>
677  /// <param name="isCanonical">Indicates whether the security is Canonical (future and options)</param>
678  /// <returns>Types that should be added to the <see cref="SubscriptionDataConfig" /></returns>
679  /// <remarks>TODO: data type additions are very related to ticktype and should be more generic/independent of each other</remarks>
680  public List<Tuple<Type, TickType>> LookupSubscriptionConfigDataTypes(
681  SecurityType symbolSecurityType,
682  Resolution resolution,
683  bool isCanonical
684  )
685  {
686  if (isCanonical)
687  {
688  if (symbolSecurityType.IsOption())
689  {
690  return new List<Tuple<Type, TickType>> { new Tuple<Type, TickType>(typeof(OptionUniverse), TickType.Quote) };
691  }
692 
693  return new List<Tuple<Type, TickType>> { new Tuple<Type, TickType>(typeof(FutureUniverse), TickType.Quote) };
694  }
695 
696  IEnumerable<TickType> availableDataType = AvailableDataTypes[symbolSecurityType]
697  // Equities will only look for trades in case of low resolutions.
698  .Where(tickType => LeanData.IsValidConfiguration(symbolSecurityType, resolution, tickType));
699 
700  var result = availableDataType
701  .Select(tickType => new Tuple<Type, TickType>(LeanData.GetDataType(resolution, tickType), tickType)).ToList();
702 
703  if(symbolSecurityType == SecurityType.CryptoFuture)
704  {
705  result.Add(new Tuple<Type, TickType>(typeof(MarginInterestRate), TickType.Quote));
706  }
707  return result;
708  }
709 
710  /// <summary>
711  /// Gets a list of all registered <see cref="SubscriptionDataConfig"/> for a given <see cref="Symbol"/>
712  /// </summary>
713  /// <remarks>Will not return internal subscriptions by default</remarks>
714  public List<SubscriptionDataConfig> GetSubscriptionDataConfigs(Symbol symbol = null, bool includeInternalConfigs = false)
715  {
716  lock (_subscriptionManagerSubscriptions)
717  {
718  return _subscriptionManagerSubscriptions.Keys
719  .Where(config => (includeInternalConfigs || !config.IsInternalFeed) && (symbol == null || config.Symbol.ID == symbol.ID))
720  .OrderBy(config => config.IsInternalFeed)
721  .ToList();
722  }
723  }
724 
725  #endregion
726 
727  #endregion
728 
729  #region IDataManager
730 
731  /// <summary>
732  /// Get the universe selection instance
733  /// </summary>
735 
736  #endregion
737  }
738 }