Lean  $LEAN_TAG$
Engine.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.IO;
19 using System.Linq;
20 using System.Runtime.CompilerServices;
21 using System.Threading;
22 using System.Threading.Tasks;
25 using QuantConnect.Data;
32 using QuantConnect.Logging;
33 using QuantConnect.Orders;
34 using QuantConnect.Packets;
36 using QuantConnect.Util;
37 using static QuantConnect.StringExtensions;
38 
40 {
41  /// <summary>
42  /// LEAN ALGORITHMIC TRADING ENGINE: ENTRY POINT.
43  ///
44  /// The engine loads new tasks, create the algorithms and threads, and sends them
45  /// to Algorithm Manager to be executed. It is the primary operating loop.
46  /// </summary>
47  public class Engine
48  {
49  private bool _historyStartDateLimitedWarningEmitted;
50  private bool _historyNumericalPrecisionLimitedWarningEmitted;
51  private readonly bool _liveMode;
52 
53  /// <summary>
54  /// Gets the configured system handlers for this engine instance
55  /// </summary>
57 
58  /// <summary>
59  /// Gets the configured algorithm handlers for this engine instance
60  /// </summary>
62 
63  /// <summary>
64  /// Initializes a new instance of the <see cref="Engine"/> class using the specified handlers
65  /// </summary>
66  /// <param name="systemHandlers">The system handlers for controlling acquisition of jobs, messaging, and api calls</param>
67  /// <param name="algorithmHandlers">The algorithm handlers for managing algorithm initialization, data, results, transaction, and real time events</param>
68  /// <param name="liveMode">True when running in live mode, false otherwise</param>
69  public Engine(LeanEngineSystemHandlers systemHandlers, LeanEngineAlgorithmHandlers algorithmHandlers, bool liveMode)
70  {
71  _liveMode = liveMode;
72  SystemHandlers = systemHandlers;
73  AlgorithmHandlers = algorithmHandlers;
74  }
75 
76  /// <summary>
77  /// Runs a single backtest/live job from the job queue
78  /// </summary>
79  /// <param name="job">The algorithm job to be processed</param>
80  /// <param name="manager">The algorithm manager instance</param>
81  /// <param name="assemblyPath">The path to the algorithm's assembly</param>
82  /// <param name="workerThread">The worker thread instance</param>
83  public void Run(AlgorithmNodePacket job, AlgorithmManager manager, string assemblyPath, WorkerThread workerThread)
84  {
85  var marketHoursDatabaseTask = Task.Run(() => StaticInitializations());
86 
87  var algorithm = default(IAlgorithm);
88  var algorithmManager = manager;
89 
90  try
91  {
92  Log.Trace($"Engine.Run(): Resource limits '{job.Controls.CpuAllocation}' CPUs. {job.Controls.RamAllocation} MB RAM.");
94 
95  //Reset thread holders.
96  var initializeComplete = false;
97 
98  //-> Initialize messaging system
100 
101  //-> Set the result handler type for this algorithm job, and launch the associated result thread.
103 
104  IBrokerage brokerage = null;
105  DataManager dataManager = null;
106  var synchronizer = _liveMode ? new LiveSynchronizer() : new Synchronizer();
107  try
108  {
109  // we get the mhdb before creating the algorithm instance,
110  // since the algorithm constructor will use it
111  var marketHoursDatabase = marketHoursDatabaseTask.Result;
112 
113  AlgorithmHandlers.Setup.WorkerThread = workerThread;
114 
115  // Save algorithm to cache, load algorithm instance:
116  algorithm = AlgorithmHandlers.Setup.CreateAlgorithmInstance(job, assemblyPath);
117 
118  algorithm.ProjectId = job.ProjectId;
119 
120  // Set algorithm in ILeanManager
121  SystemHandlers.LeanManager.SetAlgorithm(algorithm);
122 
123  // initialize the object store
125 
126  // initialize the data permission manager
128 
129  // notify the user of any errors w/ object store persistence
130  AlgorithmHandlers.ObjectStore.ErrorRaised += (sender, args) => algorithm.Debug($"ObjectStore Persistence Error: {args.Error.Message}");
131 
132  // set the order processor on the transaction manager,needs to be done before initializing the brokerage which might start using it
133  algorithm.Transactions.SetOrderProcessor(AlgorithmHandlers.Transactions);
134 
135  // Initialize the brokerage
136  IBrokerageFactory factory;
137  brokerage = AlgorithmHandlers.Setup.CreateBrokerage(job, algorithm, out factory);
138 
139  // forward brokerage message events to the result handler
140  brokerage.Message += (_, e) => AlgorithmHandlers.Results.BrokerageMessage(e);
141 
142  var symbolPropertiesDatabase = SymbolPropertiesDatabase.FromDataFolder();
143  var mapFilePrimaryExchangeProvider = new MapFilePrimaryExchangeProvider(AlgorithmHandlers.MapFileProvider);
144  var registeredTypesProvider = new RegisteredSecurityDataTypesProvider();
145  var securityService = new SecurityService(algorithm.Portfolio.CashBook,
146  marketHoursDatabase,
147  symbolPropertiesDatabase,
148  algorithm,
149  registeredTypesProvider,
150  new SecurityCacheProvider(algorithm.Portfolio),
151  mapFilePrimaryExchangeProvider,
152  algorithm);
153 
154  algorithm.Securities.SetSecurityService(securityService);
155 
156  dataManager = new DataManager(AlgorithmHandlers.DataFeed,
157  new UniverseSelection(
158  algorithm,
159  securityService,
162  algorithm,
163  algorithm.TimeKeeper,
164  marketHoursDatabase,
165  _liveMode,
166  registeredTypesProvider,
168 
169  algorithm.SubscriptionManager.SetDataManager(dataManager);
170 
171  synchronizer.Initialize(algorithm, dataManager);
172 
173  // Set the algorithm's object store before initializing the data feed, which might use it
174  algorithm.SetObjectStore(AlgorithmHandlers.ObjectStore);
175 
176  // Initialize the data feed before we initialize so he can intercept added securities/universes via events
178  algorithm,
179  job,
184  dataManager,
185  (IDataFeedTimeProvider) synchronizer,
187 
188  // set the history provider before setting up the algorithm
189  var historyProvider = GetHistoryProvider();
190  historyProvider.SetBrokerage(brokerage);
191  historyProvider.Initialize(
193  job,
199  progress =>
200  {
201  // send progress updates to the result handler only during initialization
202  if (!algorithm.GetLocked() || algorithm.IsWarmingUp)
203  {
204  AlgorithmHandlers.Results.SendStatusUpdate(AlgorithmStatus.History,
205  Invariant($"Processing history {progress}%..."));
206  }
207  },
208  // disable parallel history requests for live trading
209  parallelHistoryRequestsEnabled: !_liveMode,
210  dataPermissionManager: AlgorithmHandlers.DataPermissionsManager,
211  objectStore: algorithm.ObjectStore
212  )
213  );
214 
215  historyProvider.InvalidConfigurationDetected += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message); };
216  historyProvider.DownloadFailed += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message, args.StackTrace); };
217  historyProvider.ReaderErrorDetected += (sender, args) => { AlgorithmHandlers.Results.RuntimeError(args.Message, args.StackTrace); };
218 
219  algorithm.HistoryProvider = historyProvider;
220 
221  // initialize the default brokerage message handler
222  algorithm.BrokerageMessageHandler = factory.CreateBrokerageMessageHandler(algorithm, job, SystemHandlers.Api);
223 
224  //Initialize the internal state of algorithm and job: executes the algorithm.Initialize() method.
225  initializeComplete = AlgorithmHandlers.Setup.Setup(new SetupHandlerParameters(dataManager.UniverseSelection, algorithm,
228 
229  // set this again now that we've actually added securities
231 
232  //If there are any reasons it failed, pass these back to the IDE.
233  if (!initializeComplete || AlgorithmHandlers.Setup.Errors.Count > 0)
234  {
235  initializeComplete = false;
236  //Get all the error messages: internal in algorithm and external in setup handler.
237  var errorMessage = string.Join(",", algorithm.ErrorMessages);
238  string stackTrace = "";
239  errorMessage += string.Join(",", AlgorithmHandlers.Setup.Errors.Select(e =>
240  {
241  var message = e.Message;
242  if (e.InnerException != null)
243  {
244  var interpreter = StackExceptionInterpreter.Instance.Value;
245  var err = interpreter.Interpret(e.InnerException);
246  var stackMessage = interpreter.GetExceptionMessageHeader(err);
247  message += stackMessage;
248  stackTrace += stackMessage;
249  }
250  return message;
251  }));
252  Log.Error("Engine.Run(): " + errorMessage);
253  AlgorithmHandlers.Results.RuntimeError(errorMessage, stackTrace);
254  SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, AlgorithmStatus.RuntimeError, errorMessage);
255  }
256  }
257  catch (Exception err)
258  {
259  Log.Error(err);
260 
261  // for python we don't add the ugly pythonNet stack trace
262  var stackTrace = job.Language == Language.Python ? err.Message : err.ToString();
263 
264  var runtimeMessage = "Algorithm.Initialize() Error: " + err.Message + " Stack Trace: " + stackTrace;
265  AlgorithmHandlers.Results.RuntimeError(runtimeMessage, stackTrace);
266  SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, AlgorithmStatus.RuntimeError, runtimeMessage);
267  }
268 
269 
270  var historyProviderName = algorithm?.HistoryProvider != null ? algorithm.HistoryProvider.GetType().FullName : string.Empty;
271  // log the job endpoints
272  Log.Trace($"JOB HANDLERS:{Environment.NewLine}" +
273  $" DataFeed: {AlgorithmHandlers.DataFeed.GetType().FullName}{Environment.NewLine}" +
274  $" Setup: {AlgorithmHandlers.Setup.GetType().FullName}{Environment.NewLine}" +
275  $" RealTime: {AlgorithmHandlers.RealTime.GetType().FullName}{Environment.NewLine}" +
276  $" Results: {AlgorithmHandlers.Results.GetType().FullName}{Environment.NewLine}" +
277  $" Transactions: {AlgorithmHandlers.Transactions.GetType().FullName}{Environment.NewLine}" +
278  $" Object Store: {AlgorithmHandlers.ObjectStore.GetType().FullName}{Environment.NewLine}" +
279  $" History Provider: {historyProviderName}{Environment.NewLine}" +
280  $" Brokerage: {brokerage?.GetType().FullName}{Environment.NewLine}" +
281  $" Data Provider: {AlgorithmHandlers.DataProvider.GetType().FullName}{Environment.NewLine}");
282 
283  //-> Using the job + initialization: load the designated handlers:
284  if (initializeComplete)
285  {
286  // notify the LEAN manager that the algorithm is initialized and starting
287  SystemHandlers.LeanManager.OnAlgorithmStart();
288 
289  //-> Reset the backtest stopwatch; we're now running the algorithm.
290  var startTime = DateTime.UtcNow;
291 
292  //Set algorithm as locked; set it to live mode if we're trading live, and set it to locked for no further updates.
293  algorithm.SetAlgorithmId(job.AlgorithmId);
294  algorithm.SetLocked();
295 
296  //Load the associated handlers for transaction and realtime events:
297  AlgorithmHandlers.Transactions.Initialize(algorithm, brokerage, AlgorithmHandlers.Results);
298  AlgorithmHandlers.RealTime.Setup(algorithm, job, AlgorithmHandlers.Results, SystemHandlers.Api, algorithmManager.TimeLimit);
299 
300  // wire up the brokerage message handler
301  brokerage.Message += (sender, message) =>
302  {
303  algorithm.BrokerageMessageHandler.HandleMessage(message);
304 
305  // fire brokerage message events
306  algorithm.OnBrokerageMessage(message);
307  switch (message.Type)
308  {
309  case BrokerageMessageType.Disconnect:
310  algorithm.OnBrokerageDisconnect();
311  break;
312  case BrokerageMessageType.Reconnect:
313  algorithm.OnBrokerageReconnect();
314  break;
315  }
316  };
317 
318  // Result manager scanning message queue: (started earlier)
319  AlgorithmHandlers.Results.DebugMessage(
320  $"Launching analysis for {job.AlgorithmId} with LEAN Engine v{Globals.Version}");
321 
322  try
323  {
324  //Create a new engine isolator class
325  var isolator = new Isolator();
326 
327  // Execute the Algorithm Code:
328  var complete = isolator.ExecuteWithTimeLimit(AlgorithmHandlers.Setup.MaximumRuntime, algorithmManager.TimeLimit.IsWithinLimit, () =>
329  {
330  try
331  {
332  //Run Algorithm Job:
333  // -> Using this Data Feed,
334  // -> Send Orders to this TransactionHandler,
335  // -> Send Results to ResultHandler.
336  algorithmManager.Run(job, algorithm, synchronizer, AlgorithmHandlers.Transactions, AlgorithmHandlers.Results, AlgorithmHandlers.RealTime, SystemHandlers.LeanManager, isolator.CancellationToken);
337  }
338  catch (Exception err)
339  {
340  algorithm.SetRuntimeError(err, "AlgorithmManager.Run");
341  return;
342  }
343 
344  Log.Trace("Engine.Run(): Exiting Algorithm Manager");
345  }, job.Controls.RamAllocation, workerThread:workerThread, sleepIntervalMillis: algorithm.LiveMode ? 10000 : 1000);
346 
347  if (!complete)
348  {
349  Log.Error("Engine.Main(): Failed to complete in time: " + AlgorithmHandlers.Setup.MaximumRuntime.ToStringInvariant("F"));
350  throw new Exception("Failed to complete algorithm within " + AlgorithmHandlers.Setup.MaximumRuntime.ToStringInvariant("F")
351  + " seconds. Please make it run faster.");
352  }
353  }
354  catch (Exception err)
355  {
356  //Error running the user algorithm: purge datafeed, send error messages, set algorithm status to failed.
357  algorithm.SetRuntimeError(err, "Engine Isolator");
358  }
359 
360  // Algorithm runtime error:
361  if (algorithm.RunTimeError != null)
362  {
363  HandleAlgorithmError(job, algorithm.RunTimeError);
364  }
365 
366  // notify the LEAN manager that the algorithm has finished
367  SystemHandlers.LeanManager.OnAlgorithmEnd();
368 
369  try
370  {
371  var csvTransactionsFileName = Config.Get("transaction-log");
372  if (!string.IsNullOrEmpty(csvTransactionsFileName))
373  {
374  SaveListOfTrades(AlgorithmHandlers.Transactions, csvTransactionsFileName);
375  }
376 
377  if (!_liveMode)
378  {
379  //Diagnostics Completed, Send Result Packet:
380  var totalSeconds = (DateTime.UtcNow - startTime).TotalSeconds;
381  var dataPoints = algorithmManager.DataPoints + algorithm.HistoryProvider.DataPointCount;
382  var kps = dataPoints / (double) 1000 / totalSeconds;
383  AlgorithmHandlers.Results.DebugMessage($"Algorithm Id:({job.AlgorithmId}) completed in {totalSeconds:F2} seconds at {kps:F0}k data points per second. Processing total of {dataPoints:N0} data points.");
384  }
385  }
386  catch (Exception err)
387  {
388  Log.Error(err, "Error sending analysis results");
389  }
390 
391  //Before we return, send terminate commands to close up the threads
394  dataManager?.RemoveAllSubscriptions();
395  workerThread?.Dispose();
396  }
397 
398  synchronizer.DisposeSafely();
399  // Close data feed, alphas. Could be running even if algorithm initialization failed
400  AlgorithmHandlers.DataFeed.Exit();
401 
402  //Close result handler:
403  AlgorithmHandlers.Results.Exit();
404 
405  //Wait for the threads to complete:
406  var millisecondInterval = 10;
407  var millisecondTotalWait = 0;
408  while ((AlgorithmHandlers.Results.IsActive
409  || (AlgorithmHandlers.Transactions != null && AlgorithmHandlers.Transactions.IsActive)
410  || (AlgorithmHandlers.DataFeed != null && AlgorithmHandlers.DataFeed.IsActive)
411  || (AlgorithmHandlers.RealTime != null && AlgorithmHandlers.RealTime.IsActive))
412  && millisecondTotalWait < 30*1000)
413  {
414  Thread.Sleep(millisecondInterval);
415  if (millisecondTotalWait % (millisecondInterval * 10) == 0)
416  {
417  Log.Trace("Waiting for threads to exit...");
418  }
419  millisecondTotalWait += millisecondInterval;
420  }
421 
422  if (brokerage != null)
423  {
424  Log.Trace("Engine.Run(): Disconnecting from brokerage...");
425  brokerage.Disconnect();
426  brokerage.Dispose();
427  }
428  if (AlgorithmHandlers.Setup != null)
429  {
430  Log.Trace("Engine.Run(): Disposing of setup handler...");
431  AlgorithmHandlers.Setup.Dispose();
432  }
433 
434  Log.Trace("Engine.Main(): Analysis Completed and Results Posted.");
435  }
436  catch (Exception err)
437  {
438  Log.Error(err, "Error running algorithm");
439  }
440  finally
441  {
442  //No matter what for live mode; make sure we've set algorithm status in the API for "not running" conditions:
443  if (_liveMode && algorithmManager.State != AlgorithmStatus.Running && algorithmManager.State != AlgorithmStatus.RuntimeError)
444  SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, algorithmManager.State);
445 
446  AlgorithmHandlers.Results.Exit();
447  AlgorithmHandlers.DataFeed.Exit();
448  AlgorithmHandlers.Transactions.Exit();
449  AlgorithmHandlers.RealTime.Exit();
450  AlgorithmHandlers.DataMonitor.Exit();
451  }
452  }
453 
454  /// <summary>
455  /// Handle an error in the algorithm.Run method.
456  /// </summary>
457  /// <param name="job">Job we're processing</param>
458  /// <param name="err">Error from algorithm stack</param>
459  private void HandleAlgorithmError(AlgorithmNodePacket job, Exception err)
460  {
461  AlgorithmHandlers.DataFeed?.Exit();
462  if (AlgorithmHandlers.Results != null)
463  {
464  var message = $"Runtime Error: {err.Message}";
465  Log.Trace("Engine.Run(): Sending runtime error to user...");
466  AlgorithmHandlers.Results.LogMessage(message);
467 
468  // for python we don't add the ugly pythonNet stack trace
469  var stackTrace = job.Language == Language.Python ? err.Message : err.ToString();
470 
471  AlgorithmHandlers.Results.RuntimeError(message, stackTrace);
472  SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, AlgorithmStatus.RuntimeError, $"{message} Stack Trace: {stackTrace}");
473  }
474  }
475 
476  /// <summary>
477  /// Load the history provider from the Composer
478  /// </summary>
479  private HistoryProviderManager GetHistoryProvider()
480  {
481  var provider = new HistoryProviderManager();
482 
483  provider.InvalidConfigurationDetected += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message); };
484  provider.NumericalPrecisionLimited += (sender, args) =>
485  {
486  if (!_historyNumericalPrecisionLimitedWarningEmitted)
487  {
488  _historyNumericalPrecisionLimitedWarningEmitted = true;
489  AlgorithmHandlers.Results.DebugMessage("Warning: when performing history requests, the start date will be adjusted if there are numerical precision errors in the factor files.");
490  }
491  };
492  provider.StartDateLimited += (sender, args) =>
493  {
494  if (!_historyStartDateLimitedWarningEmitted)
495  {
496  _historyStartDateLimitedWarningEmitted = true;
497  AlgorithmHandlers.Results.DebugMessage("Warning: when performing history requests, the start date will be adjusted if it is before the first known date for the symbol.");
498  }
499  };
500  provider.DownloadFailed += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message, args.StackTrace); };
501  provider.ReaderErrorDetected += (sender, args) => { AlgorithmHandlers.Results.RuntimeError(args.Message, args.StackTrace); };
502 
503  return provider;
504  }
505 
506  /// <summary>
507  /// Save a list of trades to disk for a given path
508  /// </summary>
509  /// <param name="transactions">Transactions list via an OrderProvider</param>
510  /// <param name="csvFileName">File path to create</param>
511  private static void SaveListOfTrades(IOrderProvider transactions, string csvFileName)
512  {
513  var orders = transactions.GetOrders(x => x.Status.IsFill());
514 
515  var path = Path.GetDirectoryName(csvFileName);
516  if (path != null && !Directory.Exists(path))
517  Directory.CreateDirectory(path);
518 
519  using (var writer = new StreamWriter(csvFileName))
520  {
521  foreach (var order in orders)
522  {
523  var line = Invariant($"{order.Time.ToStringInvariant("yyyy-MM-dd HH:mm:ss")},") +
524  Invariant($"{order.Symbol.Value},{order.Direction},{order.Quantity},{order.Price}");
525  writer.WriteLine(line);
526  }
527  }
528  }
529 
530  /// <summary>
531  /// Initialize slow static variables
532  /// </summary>
533  [MethodImpl(MethodImplOptions.NoOptimization | MethodImplOptions.NoInlining)]
534  private static MarketHoursDatabase StaticInitializations()
535  {
536  // This is slow because it create all static timezones
537  var nyTime = TimeZones.NewYork;
538  // slow because if goes to disk and parses json
540  }
541 
542  } // End Algorithm Node Core Thread
543 } // End Namespace