Lean  $LEAN_TAG$
FileSystemDataFeed.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using System.Linq;
20 using QuantConnect.Data;
29 using QuantConnect.Logging;
30 using QuantConnect.Packets;
32 using QuantConnect.Util;
33 
35 {
36  /// <summary>
37  /// Historical datafeed stream reader for processing files on a local disk.
38  /// </summary>
39  /// <remarks>Filesystem datafeeds are incredibly fast</remarks>
41  {
42  private IAlgorithm _algorithm;
43  private ITimeProvider _timeProvider;
44  private IResultHandler _resultHandler;
45  private IMapFileProvider _mapFileProvider;
46  private IFactorFileProvider _factorFileProvider;
47  private IDataProvider _dataProvider;
48  private IDataCacheProvider _cacheProvider;
49  private SubscriptionCollection _subscriptions;
50  private MarketHoursDatabase _marketHoursDatabase;
51  private SubscriptionDataReaderSubscriptionEnumeratorFactory _subscriptionFactory;
52 
53  /// <summary>
54  /// Flag indicating the hander thread is completely finished and ready to dispose.
55  /// </summary>
56  public bool IsActive { get; private set; }
57 
58  /// <summary>
59  /// Initializes the data feed for the specified job and algorithm
60  /// </summary>
61  public virtual void Initialize(IAlgorithm algorithm,
63  IResultHandler resultHandler,
64  IMapFileProvider mapFileProvider,
65  IFactorFileProvider factorFileProvider,
66  IDataProvider dataProvider,
67  IDataFeedSubscriptionManager subscriptionManager,
68  IDataFeedTimeProvider dataFeedTimeProvider,
69  IDataChannelProvider dataChannelProvider)
70  {
71  _algorithm = algorithm;
72  _resultHandler = resultHandler;
73  _mapFileProvider = mapFileProvider;
74  _factorFileProvider = factorFileProvider;
75  _dataProvider = dataProvider;
76  _timeProvider = dataFeedTimeProvider.FrontierTimeProvider;
77  _subscriptions = subscriptionManager.DataFeedSubscriptions;
78  _cacheProvider = new ZipDataCacheProvider(dataProvider, isDataEphemeral: false);
79  _subscriptionFactory = new SubscriptionDataReaderSubscriptionEnumeratorFactory(
80  _resultHandler,
81  _mapFileProvider,
82  _factorFileProvider,
83  _cacheProvider,
84  algorithm,
85  enablePriceScaling: false);
86 
87  IsActive = true;
88  _marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
89  }
90 
91  /// <summary>
92  /// Creates a file based data enumerator for the given subscription request
93  /// </summary>
94  /// <remarks>Protected so it can be used by the <see cref="LiveTradingDataFeed"/> to warmup requests</remarks>
95  protected IEnumerator<BaseData> CreateEnumerator(SubscriptionRequest request, Resolution? fillForwardResolution = null)
96  {
97  return request.IsUniverseSubscription ? CreateUniverseEnumerator(request, CreateDataEnumerator, fillForwardResolution) : CreateDataEnumerator(request, fillForwardResolution);
98  }
99 
100  private IEnumerator<BaseData> CreateDataEnumerator(SubscriptionRequest request, Resolution? fillForwardResolution)
101  {
102  // ReSharper disable once PossibleMultipleEnumeration
103  var enumerator = _subscriptionFactory.CreateEnumerator(request, _dataProvider);
104  enumerator = ConfigureEnumerator(request, false, enumerator, fillForwardResolution);
105 
106  return enumerator;
107  }
108 
109  /// <summary>
110  /// Creates a new subscription to provide data for the specified security.
111  /// </summary>
112  /// <param name="request">Defines the subscription to be added, including start/end times the universe and security</param>
113  /// <returns>The created <see cref="Subscription"/> if successful, null otherwise</returns>
115  {
116  IEnumerator<BaseData> enumerator;
117  if(_algorithm.IsWarmingUp)
118  {
119  var pivotTimeUtc = _algorithm.StartDate.ConvertToUtc(_algorithm.TimeZone);
120 
121  var warmupRequest = new SubscriptionRequest(request, endTimeUtc: pivotTimeUtc,
122  configuration: new SubscriptionDataConfig(request.Configuration, resolution: _algorithm.Settings.WarmupResolution));
123  IEnumerator<BaseData> warmupEnumerator = null;
124  if (warmupRequest.TradableDaysInDataTimeZone.Any()
125  // since we change the resolution, let's validate it's still valid configuration (example daily equity quotes are not!)
126  && LeanData.IsValidConfiguration(warmupRequest.Configuration.SecurityType, warmupRequest.Configuration.Resolution, warmupRequest.Configuration.TickType))
127  {
128  // let them overlap a day if possible to avoid data gaps since each request will FFed it's own since they are different resolutions
129  pivotTimeUtc = Time.GetStartTimeForTradeBars(request.Security.Exchange.Hours,
130  _algorithm.StartDate.ConvertTo(_algorithm.TimeZone, request.Security.Exchange.TimeZone),
131  Time.OneDay,
132  1,
133  false,
134  warmupRequest.Configuration.DataTimeZone,
135  LeanData.UseDailyStrictEndTimes(_algorithm.Settings, request, request.Security.Symbol, Time.OneDay))
136  .ConvertToUtc(request.Security.Exchange.TimeZone);
137  if (pivotTimeUtc < warmupRequest.StartTimeUtc)
138  {
139  pivotTimeUtc = warmupRequest.StartTimeUtc;
140  }
141 
142  warmupEnumerator = CreateEnumerator(warmupRequest, _algorithm.Settings.WarmupResolution);
143  // don't let future data past
144  warmupEnumerator = new FilterEnumerator<BaseData>(warmupEnumerator, data => data == null || data.EndTime <= warmupRequest.EndTimeLocal);
145  }
146 
147  var normalEnumerator = CreateEnumerator(new SubscriptionRequest(request, startTimeUtc: pivotTimeUtc));
148  // don't let pre start data pass, since we adjust start so they overlap 1 day let's not let this data pass, we just want it for fill forwarding after the target start
149  // this is also useful to drop any initial selection point which was already emitted during warmup
150  normalEnumerator = new FilterEnumerator<BaseData>(normalEnumerator, data => data == null || data.EndTime >= warmupRequest.EndTimeLocal);
151 
152  // after the warmup enumerator we concatenate the 'normal' one
153  enumerator = new ConcatEnumerator(true, warmupEnumerator, normalEnumerator);
154  }
155  else
156  {
157  enumerator = CreateEnumerator(request);
158  }
159 
160  enumerator = AddScheduleWrapper(request, enumerator, null);
161 
162  if (request.IsUniverseSubscription && request.Universe is UserDefinedUniverse)
163  {
164  // for user defined universe we do not use a worker task, since calls to AddData can happen in any moment
165  // and we have to be able to inject selection data points into the enumerator
166  return SubscriptionUtils.Create(request, enumerator, _algorithm.Settings.DailyPreciseEndTime);
167  }
168  return SubscriptionUtils.CreateAndScheduleWorker(request, enumerator, _factorFileProvider, true, _algorithm.Settings.DailyPreciseEndTime);
169  }
170 
171  /// <summary>
172  /// Removes the subscription from the data feed, if it exists
173  /// </summary>
174  /// <param name="subscription">The subscription to remove</param>
175  public virtual void RemoveSubscription(Subscription subscription)
176  {
177  }
178 
179  /// <summary>
180  /// Creates a universe enumerator from the Subscription request, the underlying enumerator func and the fill forward resolution (in some cases)
181  /// </summary>
182  protected IEnumerator<BaseData> CreateUniverseEnumerator(SubscriptionRequest request, Func<SubscriptionRequest, Resolution?, IEnumerator<BaseData>> createUnderlyingEnumerator, Resolution? fillForwardResolution = null)
183  {
184  ISubscriptionEnumeratorFactory factory = _subscriptionFactory;
185  if (request.Universe is ITimeTriggeredUniverse)
186  {
188  _marketHoursDatabase,
189  _timeProvider);
190  }
191  else if (request.Configuration.Type == typeof(FundamentalUniverse))
192  {
194  }
195 
196  // define our data enumerator
197  var enumerator = factory.CreateEnumerator(request, _dataProvider);
198  return enumerator;
199  }
200 
201  /// <summary>
202  /// Returns a scheduled enumerator from the given arguments. It can also return the given underlying enumerator
203  /// </summary>
204  protected IEnumerator<BaseData> AddScheduleWrapper(SubscriptionRequest request, IEnumerator<BaseData> underlying, ITimeProvider timeProvider)
205  {
207  {
208  return underlying;
209  }
210 
211  var schedule = request.Universe.UniverseSettings.Schedule.Get(request.StartTimeLocal, request.EndTimeLocal);
212  if (schedule != null)
213  {
214  return new ScheduledEnumerator(underlying, schedule, timeProvider, request.Configuration.ExchangeTimeZone, request.StartTimeLocal);
215  }
216  return underlying;
217  }
218 
219  /// <summary>
220  /// If required will add a new enumerator for the underlying symbol
221  /// </summary>
222  protected IEnumerator<BaseData> TryAppendUnderlyingEnumerator(SubscriptionRequest request, IEnumerator<BaseData> parent, Func<SubscriptionRequest, Resolution?, IEnumerator<BaseData>> createEnumerator, Resolution? fillForwardResolution)
223  {
224  if (request.Configuration.Symbol.SecurityType.IsOption() && request.Configuration.Symbol.HasUnderlying)
225  {
226  var underlyingSymbol = request.Configuration.Symbol.Underlying;
227  var underlyingMarketHours = _marketHoursDatabase.GetEntry(underlyingSymbol.ID.Market, underlyingSymbol, underlyingSymbol.SecurityType);
228 
229  // TODO: creating this subscription request/config is bad
230  var underlyingRequests = new SubscriptionRequest(request,
231  isUniverseSubscription: false,
232  configuration: new SubscriptionDataConfig(request.Configuration, symbol: underlyingSymbol, objectType: typeof(TradeBar), tickType: TickType.Trade,
233  // there's no guarantee the TZ are the same, specially the data timezone (index & index options)
234  dataTimeZone: underlyingMarketHours.DataTimeZone,
235  exchangeTimeZone: underlyingMarketHours.ExchangeHours.TimeZone));
236 
237  var underlying = createEnumerator(underlyingRequests, fillForwardResolution);
238  underlying = new FilterEnumerator<BaseData>(underlying, data => data.DataType != MarketDataType.Auxiliary);
239 
240  parent = new SynchronizingBaseDataEnumerator(parent, underlying);
241  // we aggregate both underlying and chain data
242  parent = new BaseDataCollectionAggregatorEnumerator(parent, request.Configuration.Symbol);
243  // only let through if underlying and chain data present
244  parent = new FilterEnumerator<BaseData>(parent, data => (data as BaseDataCollection).Underlying != null);
245  parent = ConfigureEnumerator(request, false, parent, fillForwardResolution);
246  }
247 
248  return parent;
249  }
250 
251  /// <summary>
252  /// Send an exit signal to the thread.
253  /// </summary>
254  public virtual void Exit()
255  {
256  if (IsActive)
257  {
258  IsActive = false;
259  Log.Trace("FileSystemDataFeed.Exit(): Start. Setting cancellation token...");
260  _subscriptionFactory?.DisposeSafely();
261  _cacheProvider.DisposeSafely();
262  Log.Trace("FileSystemDataFeed.Exit(): Exit Finished.");
263  }
264  }
265 
266  /// <summary>
267  /// Configure the enumerator with aggregation/fill-forward/filter behaviors. Returns new instance if re-configured
268  /// </summary>
269  protected IEnumerator<BaseData> ConfigureEnumerator(SubscriptionRequest request, bool aggregate, IEnumerator<BaseData> enumerator, Resolution? fillForwardResolution)
270  {
271  if (aggregate)
272  {
273  enumerator = new BaseDataCollectionAggregatorEnumerator(enumerator, request.Configuration.Symbol);
274  }
275 
276  enumerator = TryAddFillForwardEnumerator(request, enumerator, request.Configuration.FillDataForward, fillForwardResolution);
277 
278  // optionally apply exchange/user filters
280  {
281  enumerator = SubscriptionFilterEnumerator.WrapForDataFeed(_resultHandler, enumerator, request.Security,
282  request.EndTimeLocal, request.Configuration.ExtendedMarketHours, false, request.ExchangeHours);
283  }
284 
285  return enumerator;
286  }
287 
288  /// <summary>
289  /// Will add a fill forward enumerator if requested
290  /// </summary>
291  protected IEnumerator<BaseData> TryAddFillForwardEnumerator(SubscriptionRequest request, IEnumerator<BaseData> enumerator, bool fillForward, Resolution? fillForwardResolution)
292  {
293  // optionally apply fill forward logic, but never for tick data
294  if (fillForward && request.Configuration.Resolution != Resolution.Tick)
295  {
296  // copy forward Bid/Ask bars for QuoteBars
297  if (request.Configuration.Type == typeof(QuoteBar))
298  {
299  enumerator = new QuoteBarFillForwardEnumerator(enumerator);
300  }
301 
302  var fillForwardSpan = _subscriptions.UpdateAndGetFillForwardResolution(request.Configuration);
303  if (fillForwardResolution != null && fillForwardResolution != Resolution.Tick)
304  {
305  // if we are giving a FFspan we use it instead of the collection based one. This is useful during warmup when the warmup resolution has been set
306  fillForwardSpan = Ref.Create(fillForwardResolution.Value.ToTimeSpan());
307  }
308 
309  // Pass the security exchange hours explicitly to avoid using the ones in the request, since
310  // those could be different. e.g. when requests are created for open interest data the exchange
311  // hours are set to always open to avoid OI data being filtered out due to the exchange being closed.
312  // This way we allow OI data to be fill-forwarded to the market close time when strict end times is enabled,
313  // so that OI data is available at the same time as trades and quotes.
314  var useDailyStrictEndTimes = LeanData.UseDailyStrictEndTimes(_algorithm.Settings, request, request.Configuration.Symbol,
315  request.Configuration.Increment, request.Security.Exchange.Hours);
316  enumerator = new FillForwardEnumerator(enumerator, request.Security.Exchange, fillForwardSpan,
318  request.Configuration.DataTimeZone, useDailyStrictEndTimes, request.Configuration.Type);
319  }
320 
321  return enumerator;
322  }
323  }
324 }