Lean  $LEAN_TAG$
SynchronizingHistoryProvider.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections.Generic;
18 using System.Linq;
19 using System.Threading;
20 using NodaTime;
21 using QuantConnect.Data;
28 using QuantConnect.Util;
29 
31 {
32  /// <summary>
33  /// Provides an abstract implementation of <see cref="IHistoryProvider"/>
34  /// which provides synchronization of multiple history results
35  /// </summary>
37  {
38  private int _dataPointCount;
39 
40  /// <summary>
41  /// Gets the total number of data points emitted by this history provider
42  /// </summary>
43  public override int DataPointCount => _dataPointCount;
44 
45  /// <summary>
46  /// Enumerates the subscriptions into slices
47  /// </summary>
48  protected IEnumerable<Slice> CreateSliceEnumerableFromSubscriptions(List<Subscription> subscriptions, DateTimeZone sliceTimeZone)
49  {
50  // required by TimeSlice.Create, but we don't need it's behavior
51  var frontier = DateTime.MinValue;
52  // never changes, there's no selection during a history request
53  var universeSelectionData = new Dictionary<Universe, BaseDataCollection>();
54  var timeSliceFactory = new TimeSliceFactory(sliceTimeZone);
55  while (true)
56  {
57  var earlyBirdTicks = long.MaxValue;
58  var data = new List<DataFeedPacket>();
59  foreach (var subscription in subscriptions.Where(subscription => !subscription.EndOfStream))
60  {
61  if (subscription.Current == null && !subscription.MoveNext())
62  {
63  // initial pump. We do it here and not when creating the subscriptions so
64  // that parallel workers can all start as fast as possible
65  continue;
66  }
67 
68  DataFeedPacket packet = null;
69  while (subscription.Current.EmitTimeUtc <= frontier)
70  {
71  if (packet == null)
72  {
73  // for performance, lets be selfish about creating a new instance
74  packet = new DataFeedPacket(subscription.Security, subscription.Configuration);
75 
76  // only add if we have data
77  data.Add(packet);
78  }
79 
80  packet.Add(subscription.Current.Data);
81  Interlocked.Increment(ref _dataPointCount);
82  if (!subscription.MoveNext())
83  {
84  break;
85  }
86  }
87  // update our early bird ticks (next frontier time)
88  if (subscription.Current != null)
89  {
90  // take the earliest between the next piece of data or the next tz discontinuity
91  earlyBirdTicks = Math.Min(earlyBirdTicks, subscription.Current.EmitTimeUtc.Ticks);
92  }
93  }
94 
95  if (data.Count != 0)
96  {
97  // reuse the slice construction code from TimeSlice.Create
98  yield return timeSliceFactory.Create(frontier, data, SecurityChanges.None, universeSelectionData).Slice;
99  }
100 
101  // end of subscriptions, after we emit, else we might drop a data point
102  if (earlyBirdTicks == long.MaxValue) break;
103 
104  frontier = new DateTime(Math.Max(earlyBirdTicks, frontier.Ticks), DateTimeKind.Utc);
105  }
106 
107  // make sure we clean up after ourselves
108  foreach (var subscription in subscriptions)
109  {
110  subscription.Dispose();
111  }
112  }
113 
114  /// <summary>
115  /// Creates a subscription to process the history request
116  /// </summary>
117  protected Subscription CreateSubscription(HistoryRequest request, IEnumerable<BaseData> history)
118  {
119  var config = request.ToSubscriptionDataConfig();
120  var security = new Security(
121  request.ExchangeHours,
122  config,
123  new Cash(Currencies.NullCurrency, 0, 1m),
127  new SecurityCache()
128  );
129 
130  var reader = history.GetEnumerator();
131 
132  // optionally apply fill forward behavior
133  if (request.FillForwardResolution.HasValue)
134  {
135  // FillForwardEnumerator expects these values in local times
136  var start = request.StartTimeUtc.ConvertFromUtc(request.ExchangeHours.TimeZone);
137  var end = request.EndTimeUtc.ConvertFromUtc(request.ExchangeHours.TimeZone);
138 
139  // copy forward Bid/Ask bars for QuoteBars
140  if (request.DataType == typeof(QuoteBar))
141  {
142  reader = new QuoteBarFillForwardEnumerator(reader);
143  }
144 
145  var readOnlyRef = Ref.CreateReadOnly(() => request.FillForwardResolution.Value.ToTimeSpan());
146  reader = new FillForwardEnumerator(reader, security.Exchange, readOnlyRef, request.IncludeExtendedMarketHours, end, config.Increment, config.DataTimeZone);
147  }
148 
149  var subscriptionRequest = new SubscriptionRequest(false, null, security, config, request.StartTimeUtc, request.EndTimeUtc);
150 
151  return SubscriptionUtils.Create(subscriptionRequest, reader);
152  }
153  }
154 }