Lean  $LEAN_TAG$
SynchronizingHistoryProvider.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections.Generic;
18 using System.Linq;
19 using System.Threading;
20 using NodaTime;
21 using QuantConnect.Data;
28 using QuantConnect.Util;
29 
31 {
32  /// <summary>
33  /// Provides an abstract implementation of <see cref="IHistoryProvider"/>
34  /// which provides synchronization of multiple history results
35  /// </summary>
37  {
38  /// <summary>
39  /// The market hours database
40  /// </summary>
42  private int _dataPointCount;
43 
44  /// <summary>
45  /// The algorithm settings instance to use
46  /// </summary>
48 
49  /// <summary>
50  /// Gets the total number of data points emitted by this history provider
51  /// </summary>
52  public override int DataPointCount => _dataPointCount;
53 
54  /// <summary>
55  /// Enumerates the subscriptions into slices
56  /// </summary>
57  protected IEnumerable<Slice> CreateSliceEnumerableFromSubscriptions(List<Subscription> subscriptions, DateTimeZone sliceTimeZone)
58  {
59  // required by TimeSlice.Create, but we don't need it's behavior
60  var frontier = DateTime.MinValue;
61  // never changes, there's no selection during a history request
62  var universeSelectionData = new Dictionary<Universe, BaseDataCollection>();
63  var timeSliceFactory = new TimeSliceFactory(sliceTimeZone);
64  while (true)
65  {
66  var earlyBirdTicks = long.MaxValue;
67  var data = new List<DataFeedPacket>();
68  foreach (var subscription in subscriptions.Where(subscription => !subscription.EndOfStream))
69  {
70  if (subscription.Current == null && !subscription.MoveNext())
71  {
72  // initial pump. We do it here and not when creating the subscriptions so
73  // that parallel workers can all start as fast as possible
74  continue;
75  }
76 
77  DataFeedPacket packet = null;
78  while (subscription.Current.EmitTimeUtc <= frontier)
79  {
80  if (packet == null)
81  {
82  // for performance, lets be selfish about creating a new instance
83  packet = new DataFeedPacket(subscription.Security, subscription.Configuration);
84 
85  // only add if we have data
86  data.Add(packet);
87  }
88 
89  packet.Add(subscription.Current.Data);
90  Interlocked.Increment(ref _dataPointCount);
91  if (!subscription.MoveNext())
92  {
93  break;
94  }
95  }
96  // update our early bird ticks (next frontier time)
97  if (subscription.Current != null)
98  {
99  // take the earliest between the next piece of data or the next tz discontinuity
100  earlyBirdTicks = Math.Min(earlyBirdTicks, subscription.Current.EmitTimeUtc.Ticks);
101  }
102  }
103 
104  if (data.Count != 0)
105  {
106  // reuse the slice construction code from TimeSlice.Create
107  yield return timeSliceFactory.Create(frontier, data, SecurityChanges.None, universeSelectionData).Slice;
108  }
109 
110  // end of subscriptions, after we emit, else we might drop a data point
111  if (earlyBirdTicks == long.MaxValue) break;
112 
113  frontier = new DateTime(Math.Max(earlyBirdTicks, frontier.Ticks), DateTimeKind.Utc);
114  }
115 
116  // make sure we clean up after ourselves
117  foreach (var subscription in subscriptions)
118  {
119  subscription.Dispose();
120  }
121  }
122 
123  /// <summary>
124  /// Retrieves the appropriate <see cref="SecurityExchange"/> based on the data type and symbol.
125  /// </summary>
126  /// <param name="exchange">The default exchange instance.</param>
127  /// <param name="dataType">The type of data being processed.</param>
128  /// <param name="symbol">The security symbol.</param>
129  /// <returns>The security exchange with appropriate market hours.</returns>
130  protected static SecurityExchange GetSecurityExchange(SecurityExchange exchange, Type dataType, Symbol symbol)
131  {
132  if (dataType == typeof(OpenInterest))
133  {
134  // Retrieve the original market hours, which include holidays and closed days.
135  var originalExchangeHours = MarketHours.GetExchangeHours(symbol.ID.Market, symbol, symbol.SecurityType);
136  // Use the original market hours to prevent fill-forwarding on non-trading hours.
137  return new SecurityExchange(originalExchangeHours);
138  }
139  return exchange;
140  }
141 
142  /// <summary>
143  /// Creates a subscription to process the history request
144  /// </summary>
145  protected Subscription CreateSubscription(HistoryRequest request, IEnumerable<BaseData> history)
146  {
147  var config = request.ToSubscriptionDataConfig();
148  var security = new Security(
149  request.ExchangeHours,
150  config,
151  new Cash(Currencies.NullCurrency, 0, 1m),
155  new SecurityCache()
156  );
157 
158  var reader = history.GetEnumerator();
159 
160  var useDailyStrictEndTimes = LeanData.UseDailyStrictEndTimes(AlgorithmSettings, request, config.Symbol, config.Increment);
161  if (useDailyStrictEndTimes)
162  {
163  reader = new StrictDailyEndTimesEnumerator(reader, request.ExchangeHours, request.StartTimeLocal);
164  }
165 
166  // optionally apply fill forward behavior
167  if (request.FillForwardResolution.HasValue)
168  {
169  // FillForwardEnumerator expects these values in local times
170  var start = request.StartTimeUtc.ConvertFromUtc(request.ExchangeHours.TimeZone);
171  var end = request.EndTimeUtc.ConvertFromUtc(request.ExchangeHours.TimeZone);
172 
173  // copy forward Bid/Ask bars for QuoteBars
174  if (request.DataType == typeof(QuoteBar))
175  {
176  reader = new QuoteBarFillForwardEnumerator(reader);
177  }
178 
179  var readOnlyRef = Ref.CreateReadOnly(() => request.FillForwardResolution.Value.ToTimeSpan());
180  var exchange = GetSecurityExchange(security.Exchange, request.DataType, request.Symbol);
181  reader = new FillForwardEnumerator(reader, exchange, readOnlyRef, request.IncludeExtendedMarketHours, end, config.Increment, config.DataTimeZone, useDailyStrictEndTimes, request.DataType);
182  }
183 
184  var subscriptionRequest = new SubscriptionRequest(false, null, security, config, request.StartTimeUtc, request.EndTimeUtc);
185 
186  return SubscriptionUtils.Create(subscriptionRequest, reader, AlgorithmSettings.DailyPreciseEndTime);
187  }
188  }
189 }