Lean  $LEAN_TAG$
HistoryProviderManager.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using NodaTime;
18 using QuantConnect.Data;
21 using QuantConnect.Logging;
22 using QuantConnect.Util;
23 using System;
24 using System.Collections.Generic;
25 using System.Linq;
27 
29 {
30  /// <summary>
31  /// Provides an implementation of <see cref="IHistoryProvider"/> which
32  /// acts as a wrapper to use multiple history providers together
33  /// </summary>
35  {
36  private IBrokerage _brokerage;
37  private bool _initialized;
38 
39  /// <summary>
40  /// Collection of history providers being used
41  /// </summary>
42  /// <remarks>Protected for testing purposes</remarks>
43  private List<IHistoryProvider> _historyProviders = new();
44 
45  /// <summary>
46  /// Gets the total number of data points emitted by this history provider
47  /// </summary>
48  public override int DataPointCount => GetDataPointCount();
49 
50  /// <summary>
51  /// Sets the brokerage to be used for historical requests
52  /// </summary>
53  /// <param name="brokerage">The brokerage instance</param>
54  public void SetBrokerage(IBrokerage brokerage)
55  {
56  _brokerage = brokerage;
57  }
58 
59  /// <summary>
60  /// Initializes this history provider to work for the specified job
61  /// </summary>
62  /// <param name="parameters">The initialization parameters</param>
63  public override void Initialize(HistoryProviderInitializeParameters parameters)
64  {
65  if (_initialized)
66  {
67  // let's make sure no one tries to change our parameters values
68  throw new InvalidOperationException("BrokerageHistoryProvider can only be initialized once");
69  }
70  _initialized = true;
71 
72  var dataProvidersList = parameters.Job?.HistoryProvider.DeserializeList() ?? new List<string>();
73  if (dataProvidersList.IsNullOrEmpty())
74  {
75  dataProvidersList.AddRange(Config.Get("history-provider", "SubscriptionDataReaderHistoryProvider").DeserializeList());
76  }
77 
78  foreach (var historyProviderName in dataProvidersList)
79  {
80  IHistoryProvider historyProvider;
81  if (HistoryExtensions.TryGetBrokerageName(historyProviderName, out var brokerageName))
82  {
83  // we get the data queue handler if it already exists
84  var dataQueueHandler = Composer.Instance.GetPart<IDataQueueHandler>((x) => x.GetType().Name == brokerageName);
85  if (dataQueueHandler == null)
86  {
87  // we need to create the brokerage/data queue handler
88  dataQueueHandler = Composer.Instance.GetExportedValueByTypeName<IDataQueueHandler>(brokerageName);
89  // initialize it
90  dataQueueHandler.SetJob((Packets.LiveNodePacket)parameters.Job);
91  Log.Trace($"HistoryProviderManager.Initialize(): Created and wrapped '{brokerageName}' as '{typeof(BrokerageHistoryProvider).Name}'");
92  }
93  else
94  {
95  Log.Trace($"HistoryProviderManager.Initialize(): Wrapping '{brokerageName}' instance as '{typeof(BrokerageHistoryProvider).Name}'");
96  }
97 
98  // wrap it
99  var brokerageHistoryProvider = new BrokerageHistoryProvider();
100  brokerageHistoryProvider.SetBrokerage((IBrokerage)dataQueueHandler);
101  historyProvider = brokerageHistoryProvider;
102  }
103  else
104  {
105  historyProvider = Composer.Instance.GetExportedValueByTypeName<IHistoryProvider>(historyProviderName);
106  if (historyProvider is BrokerageHistoryProvider)
107  {
108  (historyProvider as BrokerageHistoryProvider).SetBrokerage(_brokerage);
109  }
110  }
111  historyProvider.Initialize(parameters);
112  historyProvider.InvalidConfigurationDetected += (sender, args) => { OnInvalidConfigurationDetected(args); };
113  historyProvider.NumericalPrecisionLimited += (sender, args) => { OnNumericalPrecisionLimited(args); };
114  historyProvider.StartDateLimited += (sender, args) => { OnStartDateLimited(args); };
115  historyProvider.DownloadFailed += (sender, args) => { OnDownloadFailed(args); };
116  historyProvider.ReaderErrorDetected += (sender, args) => { OnReaderErrorDetected(args); };
117  _historyProviders.Add(historyProvider);
118  }
119 
120  Log.Trace($"HistoryProviderManager.Initialize(): history providers [{string.Join(",", _historyProviders.Select(x => x.GetType().Name))}]");
121  }
122 
123  /// <summary>
124  /// Gets the history for the requested securities
125  /// </summary>
126  /// <param name="requests">The historical data requests</param>
127  /// <param name="sliceTimeZone">The time zone used when time stamping the slice instances</param>
128  /// <returns>An enumerable of the slices of data covering the span specified in each request</returns>
129  public override IEnumerable<Slice> GetHistory(IEnumerable<HistoryRequest> requests, DateTimeZone sliceTimeZone)
130  {
131  List<IEnumerator<Slice>> historyEnumerators = new(_historyProviders.Count);
132  var historyRequets = requests.ToList();
133  foreach (var historyProvider in _historyProviders)
134  {
135  try
136  {
137  var history = historyProvider.GetHistory(historyRequets, sliceTimeZone);
138  if (history == null)
139  {
140  // doesn't support this history request, that's okay
141  continue;
142  }
143  historyEnumerators.Add(history.GetEnumerator());
144  }
145  catch (Exception e)
146  {
147  // ignore
148  }
149  }
150  using var synchronizer = new SynchronizingSliceEnumerator(historyEnumerators);
151  Slice latestMergeSlice = null;
152  while (synchronizer.MoveNext())
153  {
154  if (synchronizer.Current == null)
155  {
156  continue;
157  }
158  if (latestMergeSlice == null)
159  {
160  latestMergeSlice = synchronizer.Current;
161  continue;
162  }
163  if (synchronizer.Current.UtcTime > latestMergeSlice.UtcTime)
164  {
165  // a newer slice we emit the old and keep a reference of the new
166  // so in the next loop we merge if required
167  yield return latestMergeSlice;
168  latestMergeSlice = synchronizer.Current;
169  }
170  else
171  {
172  // a new slice with same time we merge them into 'latestMergeSlice'
173  latestMergeSlice.MergeSlice(synchronizer.Current);
174  }
175  }
176  if (latestMergeSlice != null)
177  {
178  yield return latestMergeSlice;
179  }
180  }
181 
182  private int GetDataPointCount()
183  {
184  var dataPointCount = 0;
185  foreach (var historyProvider in _historyProviders)
186  {
187  dataPointCount += historyProvider.DataPointCount;
188  }
189  return dataPointCount;
190  }
191  }
192 }