Lean  $LEAN_TAG$
LiveTradingRealTimeHandler.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Linq;
19 using System.Threading;
20 using QuantConnect.Util;
21 using QuantConnect.Logging;
22 using QuantConnect.Packets;
27 
29 {
30  /// <summary>
31  /// Live trading realtime event processing.
32  /// </summary>
34  {
35  private Thread _realTimeThread;
36  private CancellationTokenSource _cancellationTokenSource = new();
37 
38  /// <summary>
39  /// Gets the current market hours database instance
40  /// </summary>
42 
43  /// <summary>
44  /// Gets the current symbol properties database instance
45  /// </summary>
47 
48  /// <summary>
49  /// Gets the time provider
50  /// </summary>
51  /// <remarks>
52  /// This should be fixed to RealTimeHandler, but made a protected property for testing purposes
53  /// </remarks>
54  protected virtual ITimeProvider TimeProvider { get; } = RealTimeProvider.Instance;
55 
56  /// <summary>
57  /// Boolean flag indicating thread state.
58  /// </summary>
59  public override bool IsActive { get; protected set; }
60 
61  /// <summary>
62  /// Initializes the real time handler for the specified algorithm and job
63  /// </summary>
64  public override void Setup(IAlgorithm algorithm, AlgorithmNodePacket job, IResultHandler resultHandler, IApi api, IIsolatorLimitResultProvider isolatorLimitProvider)
65  {
66  base.Setup(algorithm, job, resultHandler, api, isolatorLimitProvider);
67 
68  var utcNow = TimeProvider.GetUtcNow();
69  var todayInAlgorithmTimeZone = utcNow.ConvertFromUtc(Algorithm.TimeZone).Date;
70 
71  // set up an scheduled event to refresh market hours and symbol properties every certain period of time
72  var times = Time.DateTimeRange(utcNow.Date, Time.EndOfTime, Algorithm.Settings.DatabasesRefreshPeriod).Where(date => date > utcNow);
73 
74  Add(new ScheduledEvent("RefreshMarketHoursAndSymbolProperties", times, (name, triggerTime) =>
75  {
78  }));
79  }
80 
81  /// <summary>
82  /// Get's the timeout the scheduled task time monitor should use
83  /// </summary>
84  protected override int GetTimeMonitorTimeout()
85  {
86  return 500;
87  }
88 
89  /// <summary>
90  /// Execute the live realtime event thread montioring.
91  /// It scans every second monitoring for an event trigger.
92  /// </summary>
93  private void Run()
94  {
95  IsActive = true;
96 
97  // continue thread until cancellation is requested
98  while (!_cancellationTokenSource.IsCancellationRequested)
99  {
100  var time = TimeProvider.GetUtcNow();
101 
102  // pause until the next second
103  var nextSecond = time.RoundUp(TimeSpan.FromSeconds(1));
104  var delay = Convert.ToInt32((nextSecond - time).TotalMilliseconds);
105  Thread.Sleep(delay < 0 ? 1 : delay);
106 
107  // poke each event to see if it should fire, we order by unique id to be deterministic
108  foreach (var kvp in ScheduledEvents.OrderBySafe(pair => pair.Value))
109  {
110  var scheduledEvent = kvp.Key;
111  try
112  {
113  IsolatorLimitProvider.Consume(scheduledEvent, time, TimeMonitor);
114  }
115  catch (Exception exception)
116  {
117  Algorithm.SetRuntimeError(exception, $"Scheduled event: '{scheduledEvent.Name}' at {time}");
118  }
119  }
120  }
121 
122  IsActive = false;
123  Log.Trace("LiveTradingRealTimeHandler.Run(): Exiting thread... Exit triggered: " + _cancellationTokenSource.IsCancellationRequested);
124  }
125 
126  /// <summary>
127  /// Set the current time. If the date changes re-start the realtime event setup routines.
128  /// </summary>
129  /// <param name="time"></param>
130  public override void SetTime(DateTime time)
131  {
133  {
134  base.SetTime(time);
135  }
136  else if (_realTimeThread == null)
137  {
138  // in live mode we use current time for our time keeping
139  // this method is used by backtesting to set time based on the data
140  _realTimeThread = new Thread(Run) { IsBackground = true, Name = "RealTime Thread" };
141  _realTimeThread.Start(); // RealTime scan time for time based events
142  }
143  }
144 
145  /// <summary>
146  /// Scan for past events that didn't fire because there was no data at the scheduled time.
147  /// </summary>
148  /// <param name="time">Current time.</param>
149  public override void ScanPastEvents(DateTime time)
150  {
152  {
153  base.ScanPastEvents(time);
154  }
155  // in live mode we use current time for our time keeping
156  // this method is used by backtesting to scan for past events based on the data
157  }
158 
159  /// <summary>
160  /// Stop the real time thread
161  /// </summary>
162  public override void Exit()
163  {
164  _realTimeThread.StopSafely(TimeSpan.FromMinutes(1), _cancellationTokenSource);
165  _cancellationTokenSource.DisposeSafely();
166  base.Exit();
167  }
168 
169  /// <summary>
170  /// Resets the market hours database, forcing a reload when reused.
171  /// Called in tests where multiple algorithms are run sequentially,
172  /// and we need to guarantee that every test starts with the same environment.
173  /// </summary>
174  protected virtual void ResetMarketHoursDatabase()
175  {
176  MarketHoursDatabase.UpdateDataFolderDatabase();
177  Log.Trace("LiveTradingRealTimeHandler.ResetMarketHoursDatabase(): Updated market hours database.");
178  }
179 
180  /// <summary>
181  /// Resets the symbol properties database, forcing a reload when reused.
182  /// </summary>
183  protected virtual void ResetSymbolPropertiesDatabase()
184  {
185  SymbolPropertiesDatabase.UpdateDataFolderDatabase();
186  Log.Trace("LiveTradingRealTimeHandler.ResetSymbolPropertiesDatabase(): Updated symbol properties database.");
187  }
188  }
189 }