Lean  $LEAN_TAG$
SubscriptionSynchronizer.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using System.Linq;
20 using System.Threading;
23 
25 {
26  /// <summary>
27  /// Provides the ability to synchronize subscriptions into time slices
28  /// </summary>
30  {
31  private readonly UniverseSelection _universeSelection;
32  private TimeSliceFactory _timeSliceFactory;
33  private ITimeProvider _timeProvider;
34  private ManualTimeProvider _frontierTimeProvider;
35 
36  /// <summary>
37  /// Event fired when a <see cref="Subscription"/> is finished
38  /// </summary>
39  public event EventHandler<Subscription> SubscriptionFinished;
40 
41  /// <summary>
42  /// Initializes a new instance of the <see cref="SubscriptionSynchronizer"/> class
43  /// </summary>
44  /// <param name="universeSelection">The universe selection instance used to handle universe
45  /// selection subscription output</param>
46  /// <returns>A time slice for the specified frontier time</returns>
47  public SubscriptionSynchronizer(UniverseSelection universeSelection)
48  {
49  _universeSelection = universeSelection;
50  }
51 
52  /// <summary>
53  /// Sets the time provider. If already set will throw.
54  /// </summary>
55  /// <param name="timeProvider">The time provider, used to obtain the current frontier UTC value</param>
56  public void SetTimeProvider(ITimeProvider timeProvider)
57  {
58  if (_timeProvider != null)
59  {
60  throw new Exception("SubscriptionSynchronizer.SetTimeProvider(): can only be called once");
61  }
62  _timeProvider = timeProvider;
63  _frontierTimeProvider = new ManualTimeProvider(_timeProvider.GetUtcNow());
64  }
65 
66  /// <summary>
67  /// Sets the <see cref="TimeSliceFactory"/> instance to use
68  /// </summary>
69  /// <param name="timeSliceFactory">Used to create the new <see cref="TimeSlice"/></param>
70  public void SetTimeSliceFactory(TimeSliceFactory timeSliceFactory)
71  {
72  if (_timeSliceFactory != null)
73  {
74  throw new Exception("SubscriptionSynchronizer.SetTimeSliceFactory(): can only be called once");
75  }
76  _timeSliceFactory = timeSliceFactory;
77  }
78 
79  /// <summary>
80  /// Syncs the specified subscriptions. The frontier time used for synchronization is
81  /// managed internally and dependent upon previous synchronization operations.
82  /// </summary>
83  /// <param name="subscriptions">The subscriptions to sync</param>
84  /// <param name="cancellationToken">The cancellation token to stop enumeration</param>
85  public IEnumerable<TimeSlice> Sync(IEnumerable<Subscription> subscriptions,
86  CancellationToken cancellationToken)
87  {
88  var delayedSubscriptionFinished = new Queue<Subscription>();
89 
90  while (!cancellationToken.IsCancellationRequested)
91  {
92  var changes = SecurityChanges.None;
93  var data = new List<DataFeedPacket>(1);
94  // NOTE: Tight coupling in UniverseSelection.ApplyUniverseSelection
95  Dictionary<Universe, BaseDataCollection> universeData = null; // lazy construction for performance
96  var universeDataForTimeSliceCreate = new Dictionary<Universe, BaseDataCollection>();
97 
98  var frontierUtc = _timeProvider.GetUtcNow();
99  _frontierTimeProvider.SetCurrentTimeUtc(frontierUtc);
100 
101  SecurityChanges newChanges;
102  do
103  {
104  newChanges = SecurityChanges.None;
105  foreach (var subscription in subscriptions)
106  {
107  if (subscription.EndOfStream)
108  {
109  OnSubscriptionFinished(subscription);
110  continue;
111  }
112 
113  // prime if needed
114  if (subscription.Current == null)
115  {
116  if (!subscription.MoveNext())
117  {
118  OnSubscriptionFinished(subscription);
119  continue;
120  }
121  }
122 
123  DataFeedPacket packet = null;
124 
125  while (subscription.Current != null && subscription.Current.EmitTimeUtc <= frontierUtc)
126  {
127  if (packet == null)
128  {
129  // for performance, lets be selfish about creating a new instance
130  packet = new DataFeedPacket(
131  subscription.Security,
132  subscription.Configuration,
133  subscription.RemovedFromUniverse
134  );
135  }
136 
137  // If our subscription is a universe, and we get a delisting event emitted for it, then
138  // the universe itself should be unselected and removed, because the Symbol that the
139  // universe is based on has been delisted. Doing the disposal here allows us to
140  // process the delisting at this point in time before emitting out to the algorithm.
141  // This is very useful for universes that can be delisted, such as ETF constituent
142  // universes (e.g. for ETF constituent universes, since the ETF itself is used to create
143  // the universe Symbol (and set as its underlying), once the ETF is delisted, the
144  // universe should cease to exist, since there are no more constituents of that ETF).
145  if (subscription.Current.Data.DataType == MarketDataType.Auxiliary && subscription.Current.Data is Delisting delisting)
146  {
147  if(subscription.IsUniverseSelectionSubscription)
148  {
149  subscription.Universes.Single().Dispose();
150  }
151  else if(delisting.Type == DelistingType.Delisted)
152  {
153  changes += _universeSelection.HandleDelisting(subscription.Current.Data, subscription.Configuration.IsInternalFeed);
154  }
155  }
156 
157  packet.Add(subscription.Current.Data);
158 
159  if (!subscription.MoveNext())
160  {
161  delayedSubscriptionFinished.Enqueue(subscription);
162  break;
163  }
164  }
165 
166  if (packet?.Count > 0)
167  {
168  // we have new universe data to select based on, store the subscription data until the end
169  if (!subscription.IsUniverseSelectionSubscription)
170  {
171  data.Add(packet);
172  }
173  else
174  {
175  // assume that if the first item is a base data collection then the enumerator handled the aggregation,
176  // otherwise, load all the the data into a new collection instance
177  var packetBaseDataCollection = packet.Data[0] as BaseDataCollection;
178  var packetData = packetBaseDataCollection == null
179  ? packet.Data
180  : packetBaseDataCollection.Data;
181 
182  BaseDataCollection collection;
183  if (universeData != null
184  && universeData.TryGetValue(subscription.Universes.Single(), out collection))
185  {
186  collection.AddRange(packetData);
187  }
188  else
189  {
190  collection = new BaseDataCollection(frontierUtc, frontierUtc, subscription.Configuration.Symbol, packetData, packetBaseDataCollection?.Underlying, packetBaseDataCollection?.FilteredContracts);
191  if (universeData == null)
192  {
193  universeData = new Dictionary<Universe, BaseDataCollection>();
194  }
195  universeData[subscription.Universes.Single()] = collection;
196  }
197  }
198  }
199 
200  if (subscription.IsUniverseSelectionSubscription
201  && subscription.Universes.Single().DisposeRequested)
202  {
203  var universe = subscription.Universes.Single();
204  // check if a universe selection isn't already scheduled for this disposed universe
205  if (universeData == null || !universeData.ContainsKey(universe))
206  {
207  if (universeData == null)
208  {
209  universeData = new Dictionary<Universe, BaseDataCollection>();
210  }
211  // we force trigger one last universe selection for this disposed universe, so it deselects all subscriptions it added
212  universeData[universe] = new BaseDataCollection(frontierUtc, subscription.Configuration.Symbol);
213  }
214 
215  // we need to do this after all usages of subscription.Universes
216  OnSubscriptionFinished(subscription);
217  }
218  }
219 
220  if (universeData != null && universeData.Count > 0)
221  {
222  // if we are going to perform universe selection we emit an empty
223  // time pulse to align algorithm time with current frontier
224  yield return _timeSliceFactory.CreateTimePulse(frontierUtc);
225 
226  foreach (var kvp in universeData)
227  {
228  var universe = kvp.Key;
229  var baseDataCollection = kvp.Value;
230  universeDataForTimeSliceCreate[universe] = baseDataCollection;
231  newChanges += _universeSelection.ApplyUniverseSelection(universe, frontierUtc, baseDataCollection);
232  }
233  universeData.Clear();
234  }
235 
236  changes += newChanges;
237  }
238  while (newChanges != SecurityChanges.None
239  || _universeSelection.AddPendingInternalDataFeeds(frontierUtc));
240 
241  var timeSlice = _timeSliceFactory.Create(frontierUtc, data, changes, universeDataForTimeSliceCreate);
242 
243  while (delayedSubscriptionFinished.Count > 0)
244  {
245  // these subscriptions added valid data to the packet
246  // we need to trigger OnSubscriptionFinished after we create the TimeSlice
247  // else it will drop the data
248  var subscription = delayedSubscriptionFinished.Dequeue();
249  OnSubscriptionFinished(subscription);
250  }
251 
252  yield return timeSlice;
253  }
254  }
255 
256  /// <summary>
257  /// Event invocator for the <see cref="SubscriptionFinished"/> event
258  /// </summary>
259  protected virtual void OnSubscriptionFinished(Subscription subscription)
260  {
261  SubscriptionFinished?.Invoke(this, subscription);
262  }
263 
264  /// <summary>
265  /// Returns the current UTC frontier time
266  /// </summary>
267  public DateTime GetUtcNow()
268  {
269  return _frontierTimeProvider.GetUtcNow();
270  }
271  }
272 }