Lean  $LEAN_TAG$
PeriodCountConsolidatorBase.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Runtime.CompilerServices;
20 using Python.Runtime;
21 
23 {
24  /// <summary>
25  /// Provides a base class for consolidators that emit data based on the passing of a period of time
26  /// or after seeing a max count of data points.
27  /// </summary>
28  /// <typeparam name="T">The input type of the consolidator</typeparam>
29  /// <typeparam name="TConsolidated">The output type of the consolidator</typeparam>
30  public abstract class PeriodCountConsolidatorBase<T, TConsolidated> : DataConsolidator<T>
31  where T : IBaseData
32  where TConsolidated : BaseData
33  {
34  // The SecurityIdentifier that we are consolidating for.
35  private SecurityIdentifier _securityIdentifier;
36  private bool _securityIdentifierIsSet;
37  //The number of data updates between creating new bars.
38  private int? _maxCount;
39  //
40  private IPeriodSpecification _periodSpecification;
41  //The minimum timespan between creating new bars.
42  private TimeSpan? _period;
43  //The number of pieces of data we've accumulated since our last emit
44  private int _currentCount;
45  //The working bar used for aggregating the data
46  private TConsolidated _workingBar;
47  //The last time we emitted a consolidated bar
48  private DateTime? _lastEmit;
49  private bool _validateTimeSpan;
50 
51  private PeriodCountConsolidatorBase(IPeriodSpecification periodSpecification)
52  {
53  _periodSpecification = periodSpecification;
54  _period = _periodSpecification.Period;
55  }
56 
57  /// <summary>
58  /// Creates a consolidator to produce a new <typeparamref name="TConsolidated"/> instance representing the period
59  /// </summary>
60  /// <param name="period">The minimum span of time before emitting a consolidated bar</param>
61  protected PeriodCountConsolidatorBase(TimeSpan period)
62  : this(new TimeSpanPeriodSpecification(period))
63  {
64  _period = _periodSpecification.Period;
65  }
66 
67  /// <summary>
68  /// Creates a consolidator to produce a new <typeparamref name="TConsolidated"/> instance representing the last count pieces of data
69  /// </summary>
70  /// <param name="maxCount">The number of pieces to accept before emiting a consolidated bar</param>
71  protected PeriodCountConsolidatorBase(int maxCount)
72  : this(new BarCountPeriodSpecification())
73  {
74  _maxCount = maxCount;
75  }
76 
77  /// <summary>
78  /// Creates a consolidator to produce a new <typeparamref name="TConsolidated"/> instance representing the last count pieces of data or the period, whichever comes first
79  /// </summary>
80  /// <param name="maxCount">The number of pieces to accept before emiting a consolidated bar</param>
81  /// <param name="period">The minimum span of time before emitting a consolidated bar</param>
82  protected PeriodCountConsolidatorBase(int maxCount, TimeSpan period)
83  : this(new MixedModePeriodSpecification(period))
84  {
85  _maxCount = maxCount;
86  _period = _periodSpecification.Period;
87  }
88 
89  /// <summary>
90  /// Creates a consolidator to produce a new <typeparamref name="TConsolidated"/> instance representing the last count pieces of data or the period, whichever comes first
91  /// </summary>
92  /// <param name="func">Func that defines the start time of a consolidated data</param>
93  protected PeriodCountConsolidatorBase(Func<DateTime, CalendarInfo> func)
94  : this(new FuncPeriodSpecification(func))
95  {
96  _period = Time.OneSecond;
97  }
98 
99  /// <summary>
100  /// Creates a consolidator to produce a new <typeparamref name="TConsolidated"/> instance representing the last count pieces of data or the period, whichever comes first
101  /// </summary>
102  /// <param name="pyObject">Python object that defines either a function object that defines the start time of a consolidated data or a timespan</param>
103  protected PeriodCountConsolidatorBase(PyObject pyObject)
104  : this(GetPeriodSpecificationFromPyObject(pyObject))
105  {
106  }
107 
108  /// <summary>
109  /// Gets the type produced by this consolidator
110  /// </summary>
111  public override Type OutputType => typeof(TConsolidated);
112 
113  /// <summary>
114  /// Gets a clone of the data being currently consolidated
115  /// </summary>
116  public override IBaseData WorkingData => _workingBar?.Clone();
117 
118  /// <summary>
119  /// Event handler that fires when a new piece of data is produced. We define this as a 'new'
120  /// event so we can expose it as a <typeparamref name="TConsolidated"/> instead of a <see cref="BaseData"/> instance
121  /// </summary>
122  public new event EventHandler<TConsolidated> DataConsolidated;
123 
124  /// <summary>
125  /// Updates this consolidator with the specified data. This method is
126  /// responsible for raising the DataConsolidated event
127  /// In time span mode, the bar range is closed on the left and open on the right: [T, T+TimeSpan).
128  /// For example, if time span is 1 minute, we have [10:00, 10:01): so data at 10:01 is not
129  /// included in the bar starting at 10:00.
130  /// </summary>
131  /// <exception cref="InvalidOperationException">Thrown when multiple symbols are being consolidated.</exception>
132  /// <param name="data">The new data for the consolidator</param>
133  public override void Update(T data)
134  {
135  if (!_securityIdentifierIsSet)
136  {
137  _securityIdentifierIsSet = true;
138  _securityIdentifier = data.Symbol.ID;
139  }
140  else if (!data.Symbol.ID.Equals(_securityIdentifier))
141  {
142  throw new InvalidOperationException($"Consolidators can only be used with a single symbol. The previous consolidated SecurityIdentifier ({_securityIdentifier}) is not the same as in the current data ({data.Symbol.ID}).");
143  }
144 
145  if (!ShouldProcess(data))
146  {
147  // first allow the base class a chance to filter out data it doesn't want
148  // before we start incrementing counts and what not
149  return;
150  }
151 
152  if (!_validateTimeSpan && _period.HasValue && _periodSpecification is TimeSpanPeriodSpecification)
153  {
154  // only do this check once
155  _validateTimeSpan = true;
156  var dataLength = data.EndTime - data.Time;
157  if (dataLength > _period)
158  {
159  throw new ArgumentException($"For Symbol {data.Symbol} can not consolidate bars of period: {_period}, using data of the same or higher period: {data.EndTime - data.Time}");
160  }
161  }
162 
163  //Decide to fire the event
164  var fireDataConsolidated = false;
165 
166  // decide to aggregate data before or after firing OnDataConsolidated event
167  // always aggregate before firing in counting mode
168  bool aggregateBeforeFire = _maxCount.HasValue;
169 
170  if (_maxCount.HasValue)
171  {
172  // we're in count mode
173  _currentCount++;
174  if (_currentCount >= _maxCount.Value)
175  {
176  _currentCount = 0;
177  fireDataConsolidated = true;
178  }
179  }
180 
181  if (!_lastEmit.HasValue)
182  {
183  // initialize this value for period computations
184  _lastEmit = IsTimeBased ? DateTime.MinValue : data.Time;
185  }
186 
187  if (_period.HasValue)
188  {
189  // we're in time span mode and initialized
190  if (_workingBar != null && data.Time - _workingBar.Time >= _period.Value && GetRoundedBarTime(data) > _lastEmit)
191  {
192  fireDataConsolidated = true;
193  }
194 
195  // special case: always aggregate before event trigger when TimeSpan is zero
196  if (_period.Value == TimeSpan.Zero)
197  {
198  fireDataConsolidated = true;
199  aggregateBeforeFire = true;
200  }
201  }
202 
203  if (aggregateBeforeFire)
204  {
205  if (data.Time >= _lastEmit)
206  {
207  AggregateBar(ref _workingBar, data);
208  }
209  }
210 
211  //Fire the event
212  if (fireDataConsolidated)
213  {
214  var workingTradeBar = _workingBar as TradeBar;
215  if (workingTradeBar != null)
216  {
217  // we kind of are cheating here...
218  if (_period.HasValue)
219  {
220  workingTradeBar.Period = _period.Value;
221  }
222  // since trade bar has period it aggregates this properly
223  else if (!(data is TradeBar))
224  {
225  workingTradeBar.Period = data.Time - _lastEmit.Value;
226  }
227  }
228 
229  // Set _lastEmit first because OnDataConsolidated will set _workingBar to null
230  _lastEmit = IsTimeBased && _workingBar != null ? _workingBar.EndTime : data.Time;
231  OnDataConsolidated(_workingBar);
232  }
233 
234  if (!aggregateBeforeFire)
235  {
236  if (data.Time >= _lastEmit)
237  {
238  AggregateBar(ref _workingBar, data);
239  }
240  }
241  }
242 
243  /// <summary>
244  /// Scans this consolidator to see if it should emit a bar due to time passing
245  /// </summary>
246  /// <param name="currentLocalTime">The current time in the local time zone (same as <see cref="BaseData.Time"/>)</param>
247  public override void Scan(DateTime currentLocalTime)
248  {
249  if (_workingBar != null && _period.HasValue && _period.Value != TimeSpan.Zero
250  && currentLocalTime - _workingBar.Time >= _period.Value && GetRoundedBarTime(currentLocalTime) > _lastEmit)
251  {
252  _lastEmit = _workingBar.EndTime;
253  OnDataConsolidated(_workingBar);
254  }
255  }
256 
257  /// <summary>
258  /// Returns true if this consolidator is time-based, false otherwise
259  /// </summary>
260  protected bool IsTimeBased => !_maxCount.HasValue;
261 
262  /// <summary>
263  /// Gets the time period for this consolidator
264  /// </summary>
265  protected TimeSpan? Period => _period;
266 
267  /// <summary>
268  /// Determines whether or not the specified data should be processed
269  /// </summary>
270  /// <param name="data">The data to check</param>
271  /// <returns>True if the consolidator should process this data, false otherwise</returns>
272  protected virtual bool ShouldProcess(T data) => true;
273 
274  /// <summary>
275  /// Aggregates the new 'data' into the 'workingBar'. The 'workingBar' will be
276  /// null following the event firing
277  /// </summary>
278  /// <param name="workingBar">The bar we're building, null if the event was just fired and we're starting a new consolidated bar</param>
279  /// <param name="data">The new data</param>
280  protected abstract void AggregateBar(ref TConsolidated workingBar, T data);
281 
282  /// <summary>
283  /// Gets a rounded-down bar time. Called by AggregateBar in derived classes.
284  /// </summary>
285  /// <param name="time">The bar time to be rounded down</param>
286  /// <returns>The rounded bar time</returns>
287  [MethodImpl(MethodImplOptions.AggressiveInlining)]
288  protected DateTime GetRoundedBarTime(DateTime time)
289  {
290  var startTime = _periodSpecification.GetRoundedBarTime(time);
291 
292  // In the case of a new bar, define the period defined at opening time
293  if (_workingBar == null)
294  {
295  _period = _periodSpecification.Period;
296  }
297 
298  return startTime;
299  }
300 
301  /// <summary>
302  /// Gets a rounded-down bar start time. Called by AggregateBar in derived classes.
303  /// </summary>
304  /// <param name="inputData">The input data point</param>
305  /// <returns>The rounded bar start time</returns>
306  [MethodImpl(MethodImplOptions.AggressiveInlining)]
307  protected DateTime GetRoundedBarTime(IBaseData inputData)
308  {
309  var potentialStartTime = GetRoundedBarTime(inputData.Time);
310  if(_period.HasValue && potentialStartTime + _period < inputData.EndTime)
311  {
312  // whops! the end time we were giving is beyond our potential end time, so let's use the giving bars star time instead
313  potentialStartTime = inputData.Time;
314  }
315 
316  return potentialStartTime;
317  }
318 
319  /// <summary>
320  /// Event invocator for the <see cref="DataConsolidated"/> event
321  /// </summary>
322  /// <param name="e">The consolidated data</param>
323  protected virtual void OnDataConsolidated(TConsolidated e)
324  {
325  base.OnDataConsolidated(e);
326  DataConsolidated?.Invoke(this, e);
327 
328  _workingBar = null;
329  }
330 
331  /// <summary>
332  /// Gets the period specification from the PyObject that can either represent a function object that defines the start time of a consolidated data or a timespan.
333  /// </summary>
334  /// <param name="pyObject">Python object that defines either a function object that defines the start time of a consolidated data or a timespan</param>
335  /// <returns>IPeriodSpecification that represents the PyObject</returns>
336  private static IPeriodSpecification GetPeriodSpecificationFromPyObject(PyObject pyObject)
337  {
338  Func<DateTime, CalendarInfo> expiryFunc;
339  if (pyObject.TryConvertToDelegate(out expiryFunc))
340  {
341  return new FuncPeriodSpecification(expiryFunc);
342  }
343 
344  using (Py.GIL())
345  {
346  return new TimeSpanPeriodSpecification(pyObject.As<TimeSpan>());
347  }
348  }
349 
350  /// <summary>
351  /// Distinguishes between the different ways a consolidated data start time can be specified
352  /// </summary>
353  private interface IPeriodSpecification
354  {
355  TimeSpan? Period { get; }
356  DateTime GetRoundedBarTime(DateTime time);
357  }
358 
359  /// <summary>
360  /// User defined the bars period using a counter
361  /// </summary>
362  private class BarCountPeriodSpecification : IPeriodSpecification
363  {
364  public TimeSpan? Period { get; } = null;
365 
366  public DateTime GetRoundedBarTime(DateTime time) => time;
367  }
368 
369  /// <summary>
370  /// User defined the bars period using a counter and a period (mixed mode)
371  /// </summary>
372  private class MixedModePeriodSpecification : IPeriodSpecification
373  {
374  public TimeSpan? Period { get; }
375 
376  public MixedModePeriodSpecification(TimeSpan period)
377  {
378  Period = period;
379  }
380 
381  public DateTime GetRoundedBarTime(DateTime time) => time;
382  }
383 
384  /// <summary>
385  /// User defined the bars period using a time span
386  /// </summary>
387  private class TimeSpanPeriodSpecification : IPeriodSpecification
388  {
389  public TimeSpan? Period { get; }
390 
391  public TimeSpanPeriodSpecification(TimeSpan period)
392  {
393  Period = period;
394  }
395 
396  public DateTime GetRoundedBarTime(DateTime time) =>
397  Period.Value > Time.OneDay
398  ? time // #4915 For periods larger than a day, don't use a rounding schedule.
399  : time.RoundDown(Period.Value);
400  }
401 
402  /// <summary>
403  /// Special case for bars where the open time is defined by a function.
404  /// We assert on construction that the function returns a date time in the past or equal to the given time instant.
405  /// </summary>
406  private class FuncPeriodSpecification : IPeriodSpecification
407  {
408  private static readonly DateTime _verificationDate = new DateTime(2022, 01, 03, 10, 10, 10);
409  public TimeSpan? Period { get; private set; }
410 
411  public readonly Func<DateTime, CalendarInfo> _calendarInfoFunc;
412 
413  public FuncPeriodSpecification(Func<DateTime, CalendarInfo> expiryFunc)
414  {
415  if (expiryFunc(_verificationDate).Start > _verificationDate)
416  {
417  throw new ArgumentException($"{nameof(FuncPeriodSpecification)}: Please use a function that computes the start of the bar associated with the given date time. Should never return a time later than the one passed in.");
418  }
419  _calendarInfoFunc = expiryFunc;
420  }
421 
422  public DateTime GetRoundedBarTime(DateTime time)
423  {
424  var calendarInfo = _calendarInfoFunc(time);
425  Period = calendarInfo.Period;
426  return calendarInfo.Start;
427  }
428  }
429  }
430 }