Lean  $LEAN_TAG$
SubscriptionDataReaderHistoryProvider.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using NodaTime;
20 using QuantConnect.Data;
28 using QuantConnect.Util;
30 
32 {
33  /// <summary>
34  /// Provides an implementation of <see cref="IHistoryProvider"/> that uses <see cref="BaseData"/>
35  /// instances to retrieve historical data
36  /// </summary>
38  {
39  private SymbolProperties _nullSymbolProperties;
40  private SecurityCache _nullCache;
41  private Cash _nullCash;
42 
43  private IDataProvider _dataProvider;
44  private IMapFileProvider _mapFileProvider;
45  private IFactorFileProvider _factorFileProvider;
46  private IDataCacheProvider _dataCacheProvider;
47  private IObjectStore _objectStore;
48  private bool _parallelHistoryRequestsEnabled;
49  private bool _initialized;
50 
51  /// <summary>
52  /// Manager used to allow or deny access to a requested datasource for specific users
53  /// </summary>
55 
56  /// <summary>
57  /// Initializes this history provider to work for the specified job
58  /// </summary>
59  /// <param name="parameters">The initialization parameters</param>
60  public override void Initialize(HistoryProviderInitializeParameters parameters)
61  {
62  if (_initialized)
63  {
64  // let's make sure no one tries to change our parameters values
65  throw new InvalidOperationException("SubscriptionDataReaderHistoryProvider can only be initialized once");
66  }
67  _initialized = true;
68  _dataProvider = parameters.DataProvider;
69  _mapFileProvider = parameters.MapFileProvider;
70  _dataCacheProvider = parameters.DataCacheProvider;
71  _factorFileProvider = parameters.FactorFileProvider;
72  _objectStore = parameters.ObjectStore;
74  _parallelHistoryRequestsEnabled = parameters.ParallelHistoryRequestsEnabled;
75 
76  _nullCache = new SecurityCache();
77  _nullCash = new Cash(Currencies.NullCurrency, 0, 1m);
78  _nullSymbolProperties = SymbolProperties.GetDefault(Currencies.NullCurrency);
79  }
80 
81  /// <summary>
82  /// Gets the history for the requested securities
83  /// </summary>
84  /// <param name="requests">The historical data requests</param>
85  /// <param name="sliceTimeZone">The time zone used when time stamping the slice instances</param>
86  /// <returns>An enumerable of the slices of data covering the span specified in each request</returns>
87  public override IEnumerable<Slice> GetHistory(IEnumerable<HistoryRequest> requests, DateTimeZone sliceTimeZone)
88  {
89  // create subscription objects from the configs
90  var subscriptions = new List<Subscription>();
91  foreach (var request in requests)
92  {
93  var subscription = CreateSubscription(request);
94  subscriptions.Add(subscription);
95  }
96 
97  return CreateSliceEnumerableFromSubscriptions(subscriptions, sliceTimeZone);
98  }
99 
100  /// <summary>
101  /// Creates a subscription to process the request
102  /// </summary>
103  private Subscription CreateSubscription(HistoryRequest request)
104  {
105  var config = request.ToSubscriptionDataConfig();
107 
108  // this security is internal only we do not need to worry about a few of it's properties
109  // TODO: we don't need fee/fill/BPM/etc either. Even better we should refactor & remove the need for the security
110  var security = new Security(
111  request.ExchangeHours,
112  config,
113  _nullCash,
114  _nullSymbolProperties,
117  _nullCache
118  );
119 
120  var dataReader = new SubscriptionDataReader(config,
121  request,
122  _mapFileProvider,
123  _factorFileProvider,
124  _dataCacheProvider,
125  _dataProvider,
126  _objectStore);
127 
128  dataReader.InvalidConfigurationDetected += (sender, args) => { OnInvalidConfigurationDetected(args); };
129  dataReader.NumericalPrecisionLimited += (sender, args) => { OnNumericalPrecisionLimited(args); };
130  dataReader.StartDateLimited += (sender, args) => { OnStartDateLimited(args); };
131  dataReader.DownloadFailed += (sender, args) => { OnDownloadFailed(args); };
132  dataReader.ReaderErrorDetected += (sender, args) => { OnReaderErrorDetected(args); };
133 
134  IEnumerator<BaseData> reader = dataReader;
135  var intraday = GetIntradayDataEnumerator(dataReader, request);
136  if (intraday != null)
137  {
138  // we optionally concatenate the intraday data enumerator
139  reader = new ConcatEnumerator(true, reader, intraday);
140  }
141 
143  reader,
144  config,
145  _factorFileProvider,
146  dataReader,
147  _mapFileProvider,
148  request.StartTimeLocal,
149  request.EndTimeLocal);
150 
151  // optionally apply fill forward behavior
152  if (request.FillForwardResolution.HasValue)
153  {
154  // copy forward Bid/Ask bars for QuoteBars
155  if (request.DataType == typeof(QuoteBar))
156  {
157  reader = new QuoteBarFillForwardEnumerator(reader);
158  }
159 
160  var readOnlyRef = Ref.CreateReadOnly(() => request.FillForwardResolution.Value.ToTimeSpan());
161  reader = new FillForwardEnumerator(reader, security.Exchange, readOnlyRef, request.IncludeExtendedMarketHours, request.EndTimeLocal, config.Increment, config.DataTimeZone);
162  }
163 
164  // since the SubscriptionDataReader performs an any overlap condition on the trade bar's entire
165  // range (time->end time) we can end up passing the incorrect data (too far past, possibly future),
166  // so to combat this we deliberately filter the results from the data reader to fix these cases
167  // which only apply to non-tick data
168 
169  reader = new SubscriptionFilterEnumerator(reader, security, request.EndTimeLocal, config.ExtendedMarketHours, false, request.ExchangeHours);
170 
171  // allow all ticks
172  if (config.Resolution != Resolution.Tick)
173  {
174  var timeBasedFilter = new TimeBasedFilter(request);
175  reader = new FilterEnumerator<BaseData>(reader, timeBasedFilter.Filter);
176  }
177 
178  var subscriptionRequest = new SubscriptionRequest(false, null, security, config, request.StartTimeUtc, request.EndTimeUtc);
179  if (_parallelHistoryRequestsEnabled)
180  {
181  return SubscriptionUtils.CreateAndScheduleWorker(subscriptionRequest, reader, _factorFileProvider, false);
182  }
183  return SubscriptionUtils.Create(subscriptionRequest, reader);
184  }
185 
186  /// <summary>
187  /// Gets the intraday data enumerator if any
188  /// </summary>
189  protected virtual IEnumerator<BaseData> GetIntradayDataEnumerator(IEnumerator<BaseData> rawData, HistoryRequest request)
190  {
191  return null;
192  }
193 
194  /// <summary>
195  /// Internal helper class to filter data based on requested times
196  /// </summary>
197  private class TimeBasedFilter
198  {
199  public Type RequestedType { get; set; }
200  public DateTime EndTimeLocal { get; set; }
201  public DateTime StartTimeLocal { get; set; }
202  public TimeBasedFilter(HistoryRequest request)
203  {
204  RequestedType = request.DataType;
205  EndTimeLocal = request.EndTimeLocal;
206  StartTimeLocal = request.StartTimeLocal;
207  }
208  public bool Filter(BaseData data)
209  {
210  // filter out all aux data, unless if we are asking for aux data
211  if (data.DataType == MarketDataType.Auxiliary && data.GetType() != RequestedType) return false;
212  // filter out future data
213  if (data.EndTime > EndTimeLocal) return false;
214  // filter out data before the start
215  return data.EndTime > StartTimeLocal;
216  }
217  }
218  }
219 }