24 using System.Collections.Generic;
37 private bool _initialized;
43 private List<IHistoryProvider> _historyProviders =
new();
56 _brokerage = brokerage;
68 throw new InvalidOperationException(
"BrokerageHistoryProvider can only be initialized once");
72 var dataProvidersList = parameters.
Job?.HistoryProvider.DeserializeList() ??
new List<string>();
73 if (dataProvidersList.IsNullOrEmpty())
75 dataProvidersList.AddRange(
Config.
Get(
"history-provider",
"SubscriptionDataReaderHistoryProvider").DeserializeList());
78 foreach (var historyProviderName
in dataProvidersList)
85 if (dataQueueHandler ==
null)
90 dataQueueHandler.
SetJob((Packets.LiveNodePacket)parameters.Job);
91 Log.
Trace($
"HistoryProviderManager.Initialize(): Created and wrapped '{brokerageName}' as '{typeof(BrokerageHistoryProvider).Name}'");
95 Log.
Trace($
"HistoryProviderManager.Initialize(): Wrapping '{brokerageName}' instance as '{typeof(BrokerageHistoryProvider).Name}'");
100 brokerageHistoryProvider.SetBrokerage((
IBrokerage)dataQueueHandler);
101 historyProvider = brokerageHistoryProvider;
117 _historyProviders.Add(historyProvider);
120 Log.
Trace($
"HistoryProviderManager.Initialize(): history providers [{string.Join(",
", _historyProviders.Select(x => x.GetType().Name))}]");
129 public override IEnumerable<Slice>
GetHistory(IEnumerable<HistoryRequest> requests, DateTimeZone sliceTimeZone)
131 List<IEnumerator<Slice>> historyEnumerators =
new(_historyProviders.Count);
132 var historyRequets = requests.ToList();
133 foreach (var historyProvider
in _historyProviders)
137 var history = historyProvider.GetHistory(historyRequets, sliceTimeZone);
143 historyEnumerators.Add(history.GetEnumerator());
151 Slice latestMergeSlice =
null;
152 while (synchronizer.MoveNext())
154 if (synchronizer.Current ==
null)
158 if (latestMergeSlice ==
null)
160 latestMergeSlice = synchronizer.Current;
163 if (synchronizer.Current.UtcTime > latestMergeSlice.
UtcTime)
167 yield
return latestMergeSlice;
168 latestMergeSlice = synchronizer.Current;
173 latestMergeSlice.
MergeSlice(synchronizer.Current);
176 if (latestMergeSlice !=
null)
178 yield
return latestMergeSlice;
182 private int GetDataPointCount()
184 var dataPointCount = 0;
185 foreach (var historyProvider
in _historyProviders)
187 dataPointCount += historyProvider.DataPointCount;
189 return dataPointCount;