Lean  $LEAN_TAG$
Subscription.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections;
19 using System.Collections.Generic;
20 using System.Linq;
21 using NodaTime;
22 using QuantConnect.Data;
25 using QuantConnect.Util;
26 
28 {
29  /// <summary>
30  /// Represents the data required for a data feed to process a single subscription
31  /// </summary>
32  public class Subscription : IEnumerator<SubscriptionData>
33  {
34  private bool _removedFromUniverse;
35  private readonly IEnumerator<SubscriptionData> _enumerator;
36 
37  /// <summary>
38  /// The subcription requests associated with this subscription
39  /// </summary>
40  internal List<SubscriptionRequest> SubscriptionRequests { get; private set; }
41 
42  /// <summary>
43  /// Event fired when a new data point is available
44  /// </summary>
45  public event EventHandler NewDataAvailable;
46 
47  /// <summary>
48  /// Gets the universe for this subscription
49  /// </summary>
50  public IEnumerable<Universe> Universes => SubscriptionRequests
51  .Where(x => x.Universe != null)
52  .Select(x => x.Universe);
53 
54  /// <summary>
55  /// Gets the security this subscription points to
56  /// </summary>
57  public readonly ISecurityPrice Security;
58 
59  /// <summary>
60  /// Gets the configuration for this subscritions
61  /// </summary>
63 
64  /// <summary>
65  /// Gets the exchange time zone associated with this subscription
66  /// </summary>
67  public DateTimeZone TimeZone { get; }
68 
69  /// <summary>
70  /// Gets the offset provider for time zone conversions to and from the data's local time
71  /// </summary>
73 
74  /// <summary>
75  /// Gets the most current value from the subscription source
76  /// </summary>
77  public decimal RealtimePrice { get; set; }
78 
79  /// <summary>
80  /// Gets true if this subscription is finished, false otherwise
81  /// </summary>
82  public bool EndOfStream { get; private set; }
83 
84  /// <summary>
85  /// Gets true if this subscription is used in universe selection
86  /// </summary>
87  public bool IsUniverseSelectionSubscription { get; }
88 
89  /// <summary>
90  /// Gets the start time of this subscription in UTC
91  /// </summary>
92  public DateTime UtcStartTime { get; }
93 
94  /// <summary>
95  /// Gets the end time of this subscription in UTC
96  /// </summary>
97  public DateTime UtcEndTime { get; }
98 
99  /// <summary>
100  /// Gets whether or not this subscription has been removed from its parent universe
101  /// </summary>
103 
104  /// <summary>
105  /// Initializes a new instance of the <see cref="Subscription"/> class with a universe
106  /// </summary>
107  /// <param name="subscriptionRequest">Specified for universe subscriptions</param>
108  /// <param name="enumerator">The subscription's data source</param>
109  /// <param name="timeZoneOffsetProvider">The offset provider used to convert data local times to utc</param>
110  public Subscription(
111  SubscriptionRequest subscriptionRequest,
112  IEnumerator<SubscriptionData> enumerator,
113  TimeZoneOffsetProvider timeZoneOffsetProvider)
114  {
115  SubscriptionRequests = new List<SubscriptionRequest> { subscriptionRequest };
116  _enumerator = enumerator;
117  Security = subscriptionRequest.Security;
119  Configuration = subscriptionRequest.Configuration;
120  OffsetProvider = timeZoneOffsetProvider;
121  TimeZone = subscriptionRequest.Security.Exchange.TimeZone;
122  UtcStartTime = subscriptionRequest.StartTimeUtc;
123  UtcEndTime = subscriptionRequest.EndTimeUtc;
124 
125  RemovedFromUniverse = Ref.CreateReadOnly(() => _removedFromUniverse);
126  }
127 
128  /// <summary>
129  /// Adds a <see cref="SubscriptionRequest"/> for this subscription
130  /// </summary>
131  /// <param name="subscriptionRequest">The <see cref="SubscriptionRequest"/> to add</param>
132  public bool AddSubscriptionRequest(SubscriptionRequest subscriptionRequest)
133  {
135  || subscriptionRequest.IsUniverseSubscription)
136  {
137  throw new Exception("Subscription.AddSubscriptionRequest(): Universe selection" +
138  " subscriptions should not have more than 1 SubscriptionRequest");
139  }
140 
141  // this shouldn't happen but just in case..
142  if (subscriptionRequest.Configuration != Configuration)
143  {
144  throw new Exception("Subscription.AddSubscriptionRequest(): Requesting to add" +
145  "a different SubscriptionDataConfig");
146  }
147 
148  // Only allow one subscription request per universe
149  if (!Universes.Contains(subscriptionRequest.Universe))
150  {
151  SubscriptionRequests.Add(subscriptionRequest);
152  // TODO this might update the 'UtcStartTime' and 'UtcEndTime' of this subscription
153  return true;
154  }
155  return false;
156  }
157 
158  /// <summary>
159  /// Removes one or all <see cref="SubscriptionRequest"/> from this subscription
160  /// </summary>
161  /// <param name="universe">Universe requesting to remove <see cref="SubscriptionRequest"/>.
162  /// Default value, null, will remove all universes</param>
163  /// <returns>True, if the subscription is empty and ready to be removed</returns>
164  public bool RemoveSubscriptionRequest(Universe universe = null)
165  {
166  // TODO this might update the 'UtcStartTime' and 'UtcEndTime' of this subscription
167  IEnumerable<Universe> removedUniverses;
168  if (universe == null)
169  {
170  var subscriptionRequests = SubscriptionRequests;
171  SubscriptionRequests = new List<SubscriptionRequest>();
172  removedUniverses = subscriptionRequests.Where(x => x.Universe != null)
173  .Select(x => x.Universe);
174  }
175  else
176  {
177  SubscriptionRequests.RemoveAll(x => x.Universe == universe);
178  removedUniverses = new[] {universe};
179  }
180 
181  var emptySubscription = !SubscriptionRequests.Any();
182  if (emptySubscription)
183  {
184  // if the security is no longer a member of the universe, then mark the subscription properly
185  // universe may be null for internal currency conversion feeds
186  // TODO : Put currency feeds in their own internal universe
187  if (!removedUniverses.Any(x => x.Securities.ContainsKey(Configuration.Symbol)))
188  {
190  }
191  }
192 
193  return emptySubscription;
194  }
195 
196  /// <summary>
197  /// Advances the enumerator to the next element of the collection.
198  /// </summary>
199  /// <returns>
200  /// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection.
201  /// </returns>
202  /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created. </exception><filterpriority>2</filterpriority>
203  public virtual bool MoveNext()
204  {
205  if (EndOfStream)
206  {
207  return false;
208  }
209 
210  var moveNext = _enumerator.MoveNext();
211  EndOfStream = !moveNext;
212  Current = _enumerator.Current;
213  return moveNext;
214  }
215 
216  /// <summary>
217  /// Sets the enumerator to its initial position, which is before the first element in the collection.
218  /// </summary>
219  /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created. </exception><filterpriority>2</filterpriority>
220  public void Reset()
221  {
222  _enumerator.Reset();
223  }
224 
225  /// <summary>
226  /// Gets the element in the collection at the current position of the enumerator.
227  /// </summary>
228  /// <returns>
229  /// The element in the collection at the current position of the enumerator.
230  /// </returns>
231  public SubscriptionData Current { get; private set; }
232 
233  /// <summary>
234  /// Gets the current element in the collection.
235  /// </summary>
236  /// <returns>
237  /// The current element in the collection.
238  /// </returns>
239  /// <filterpriority>2</filterpriority>
240  object IEnumerator.Current => Current;
241 
242  /// <summary>
243  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
244  /// </summary>
245  /// <filterpriority>2</filterpriority>
246  public void Dispose()
247  {
248  EndOfStream = true;
249  _enumerator.DisposeSafely();
250  }
251 
252  /// <summary>
253  /// Mark this subscription as having been removed from the universe.
254  /// Data for this time step will be discarded.
255  /// </summary>
257  {
258  _removedFromUniverse = true;
259  }
260 
261  /// <summary>
262  /// Serves as a hash function for a particular type.
263  /// </summary>
264  /// <returns>
265  /// A hash code for the current <see cref="T:System.Object"/>.
266  /// </returns>
267  /// <filterpriority>2</filterpriority>
268  public override int GetHashCode()
269  {
270  return Configuration.GetHashCode();
271  }
272 
273  /// <summary>Determines whether the specified object is equal to the current object.</summary>
274  /// <param name="obj">The object to compare with the current object. </param>
275  /// <returns>
276  /// <see langword="true" /> if the specified object is equal to the current object; otherwise, <see langword="false" />.</returns>
277  public override bool Equals(object obj)
278  {
279  var subscription = obj as Subscription;
280  if (subscription == null)
281  {
282  return false;
283  }
284 
285  return subscription.Configuration.Equals(Configuration);
286  }
287 
288  /// <summary>Returns a string that represents the current object.</summary>
289  /// <returns>A string that represents the current object.</returns>
290  /// <filterpriority>2</filterpriority>
291  public override string ToString()
292  {
293  return Configuration.ToString();
294  }
295 
296  /// <summary>
297  /// Event invocator for the <see cref="NewDataAvailable"/> event
298  /// </summary>
299  public void OnNewDataAvailable()
300  {
301  NewDataAvailable?.Invoke(this, EventArgs.Empty);
302  }
303  }
304 }