Lean  $LEAN_TAG$
Engine.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.IO;
19 using System.Linq;
20 using System.Runtime.CompilerServices;
21 using System.Threading;
22 using System.Threading.Tasks;
25 using QuantConnect.Data;
32 using QuantConnect.Logging;
33 using QuantConnect.Orders;
34 using QuantConnect.Packets;
36 using QuantConnect.Util;
37 using static QuantConnect.StringExtensions;
38 
40 {
41  /// <summary>
42  /// LEAN ALGORITHMIC TRADING ENGINE: ENTRY POINT.
43  ///
44  /// The engine loads new tasks, create the algorithms and threads, and sends them
45  /// to Algorithm Manager to be executed. It is the primary operating loop.
46  /// </summary>
47  public class Engine
48  {
49  private bool _historyStartDateLimitedWarningEmitted;
50  private bool _historyNumericalPrecisionLimitedWarningEmitted;
51  private readonly bool _liveMode;
52 
53  /// <summary>
54  /// Gets the configured system handlers for this engine instance
55  /// </summary>
57 
58  /// <summary>
59  /// Gets the configured algorithm handlers for this engine instance
60  /// </summary>
62 
63  /// <summary>
64  /// Initializes a new instance of the <see cref="Engine"/> class using the specified handlers
65  /// </summary>
66  /// <param name="systemHandlers">The system handlers for controlling acquisition of jobs, messaging, and api calls</param>
67  /// <param name="algorithmHandlers">The algorithm handlers for managing algorithm initialization, data, results, transaction, and real time events</param>
68  /// <param name="liveMode">True when running in live mode, false otherwise</param>
69  public Engine(LeanEngineSystemHandlers systemHandlers, LeanEngineAlgorithmHandlers algorithmHandlers, bool liveMode)
70  {
71  _liveMode = liveMode;
72  SystemHandlers = systemHandlers;
73  AlgorithmHandlers = algorithmHandlers;
74  }
75 
76  /// <summary>
77  /// Runs a single backtest/live job from the job queue
78  /// </summary>
79  /// <param name="job">The algorithm job to be processed</param>
80  /// <param name="manager">The algorithm manager instance</param>
81  /// <param name="assemblyPath">The path to the algorithm's assembly</param>
82  /// <param name="workerThread">The worker thread instance</param>
83  public void Run(AlgorithmNodePacket job, AlgorithmManager manager, string assemblyPath, WorkerThread workerThread)
84  {
85  var marketHoursDatabaseTask = Task.Run(() => StaticInitializations());
86 
87  var algorithm = default(IAlgorithm);
88  var algorithmManager = manager;
89 
90  try
91  {
92  Log.Trace($"Engine.Run(): Resource limits '{job.Controls.CpuAllocation}' CPUs. {job.Controls.RamAllocation} MB RAM.");
94 
95  //Reset thread holders.
96  var initializeComplete = false;
97 
98  //-> Initialize messaging system
100 
101  //-> Set the result handler type for this algorithm job, and launch the associated result thread.
103 
104  IBrokerage brokerage = null;
105  DataManager dataManager = null;
106  var synchronizer = _liveMode ? new LiveSynchronizer() : new Synchronizer();
107  try
108  {
109  // we get the mhdb before creating the algorithm instance,
110  // since the algorithm constructor will use it
111  var marketHoursDatabase = marketHoursDatabaseTask.Result;
112 
113  AlgorithmHandlers.Setup.WorkerThread = workerThread;
114 
115  // Save algorithm to cache, load algorithm instance:
116  algorithm = AlgorithmHandlers.Setup.CreateAlgorithmInstance(job, assemblyPath);
117 
118  algorithm.ProjectId = job.ProjectId;
119 
120  // Set algorithm in ILeanManager
121  SystemHandlers.LeanManager.SetAlgorithm(algorithm);
122 
123  // initialize the object store
125 
126  // initialize the data permission manager
128 
129  // notify the user of any errors w/ object store persistence
130  AlgorithmHandlers.ObjectStore.ErrorRaised += (sender, args) => algorithm.Debug($"ObjectStore Persistence Error: {args.Error.Message}");
131 
132  // set the order processor on the transaction manager,needs to be done before initializing the brokerage which might start using it
133  algorithm.Transactions.SetOrderProcessor(AlgorithmHandlers.Transactions);
134 
135  // Initialize the brokerage
136  IBrokerageFactory factory;
137  brokerage = AlgorithmHandlers.Setup.CreateBrokerage(job, algorithm, out factory);
138 
139  // forward brokerage message events to the result handler
140  brokerage.Message += (_, e) => AlgorithmHandlers.Results.BrokerageMessage(e);
141 
142  var symbolPropertiesDatabase = SymbolPropertiesDatabase.FromDataFolder();
143  var mapFilePrimaryExchangeProvider = new MapFilePrimaryExchangeProvider(AlgorithmHandlers.MapFileProvider);
144  var registeredTypesProvider = new RegisteredSecurityDataTypesProvider();
145  var securityService = new SecurityService(algorithm.Portfolio.CashBook,
146  marketHoursDatabase,
147  symbolPropertiesDatabase,
148  algorithm,
149  registeredTypesProvider,
150  new SecurityCacheProvider(algorithm.Portfolio),
151  mapFilePrimaryExchangeProvider,
152  algorithm);
153 
154  algorithm.Securities.SetSecurityService(securityService);
155 
156  dataManager = new DataManager(AlgorithmHandlers.DataFeed,
157  new UniverseSelection(
158  algorithm,
159  securityService,
162  algorithm,
163  algorithm.TimeKeeper,
164  marketHoursDatabase,
165  _liveMode,
166  registeredTypesProvider,
168 
169  algorithm.SubscriptionManager.SetDataManager(dataManager);
170 
171  synchronizer.Initialize(algorithm, dataManager);
172 
173  // Set the algorithm's object store before initializing the data feed, which might use it
174  algorithm.SetObjectStore(AlgorithmHandlers.ObjectStore);
175 
176  // Initialize the data feed before we initialize so he can intercept added securities/universes via events
178  algorithm,
179  job,
184  dataManager,
185  (IDataFeedTimeProvider) synchronizer,
187 
188  // set the history provider before setting up the algorithm
189  var historyProvider = GetHistoryProvider();
190  historyProvider.SetBrokerage(brokerage);
191  historyProvider.Initialize(
193  job,
199  progress =>
200  {
201  // send progress updates to the result handler only during initialization
202  if (!algorithm.GetLocked() || algorithm.IsWarmingUp)
203  {
204  AlgorithmHandlers.Results.SendStatusUpdate(AlgorithmStatus.History,
205  Invariant($"Processing history {progress}%..."));
206  }
207  },
208  // disable parallel history requests for live trading
209  parallelHistoryRequestsEnabled: !_liveMode,
210  dataPermissionManager: AlgorithmHandlers.DataPermissionsManager,
211  objectStore: algorithm.ObjectStore,
212  algorithmSettings: algorithm.Settings
213  )
214  );
215 
216  historyProvider.InvalidConfigurationDetected += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message); };
217  historyProvider.DownloadFailed += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message, args.StackTrace); };
218  historyProvider.ReaderErrorDetected += (sender, args) => { AlgorithmHandlers.Results.RuntimeError(args.Message, args.StackTrace); };
219 
220  Composer.Instance.AddPart(historyProvider);
221  algorithm.HistoryProvider = historyProvider;
222 
223  // initialize the default brokerage message handler
224  algorithm.BrokerageMessageHandler = factory.CreateBrokerageMessageHandler(algorithm, job, SystemHandlers.Api);
225 
226  //Initialize the internal state of algorithm and job: executes the algorithm.Initialize() method.
227  initializeComplete = AlgorithmHandlers.Setup.Setup(new SetupHandlerParameters(dataManager.UniverseSelection, algorithm,
230 
231  // set this again now that we've actually added securities
233 
234  //If there are any reasons it failed, pass these back to the IDE.
235  if (!initializeComplete || AlgorithmHandlers.Setup.Errors.Count > 0)
236  {
237  initializeComplete = false;
238  //Get all the error messages: internal in algorithm and external in setup handler.
239  var errorMessage = string.Join(",", algorithm.ErrorMessages);
240  string stackTrace = "";
241  errorMessage += string.Join(",", AlgorithmHandlers.Setup.Errors.Select(e =>
242  {
243  var message = e.Message;
244  if (e.InnerException != null)
245  {
246  var interpreter = StackExceptionInterpreter.Instance.Value;
247  var err = interpreter.Interpret(e.InnerException);
248  var stackMessage = interpreter.GetExceptionMessageHeader(err);
249  message += stackMessage;
250  stackTrace += stackMessage;
251  }
252  return message;
253  }));
254  Log.Error("Engine.Run(): " + errorMessage);
255  AlgorithmHandlers.Results.RuntimeError(errorMessage, stackTrace);
256  SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, AlgorithmStatus.RuntimeError, errorMessage);
257  }
258  }
259  catch (Exception err)
260  {
261  Log.Error(err);
262 
263  // for python we don't add the ugly pythonNet stack trace
264  var stackTrace = job.Language == Language.Python ? err.Message : err.ToString();
265 
266  var runtimeMessage = "Algorithm.Initialize() Error: " + err.Message + " Stack Trace: " + stackTrace;
267  AlgorithmHandlers.Results.RuntimeError(runtimeMessage, stackTrace);
268  SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, AlgorithmStatus.RuntimeError, runtimeMessage);
269  }
270 
271 
272  var historyProviderName = algorithm?.HistoryProvider != null ? algorithm.HistoryProvider.GetType().FullName : string.Empty;
273  // log the job endpoints
274  Log.Trace($"JOB HANDLERS:{Environment.NewLine}" +
275  $" DataFeed: {AlgorithmHandlers.DataFeed.GetType().FullName}{Environment.NewLine}" +
276  $" Setup: {AlgorithmHandlers.Setup.GetType().FullName}{Environment.NewLine}" +
277  $" RealTime: {AlgorithmHandlers.RealTime.GetType().FullName}{Environment.NewLine}" +
278  $" Results: {AlgorithmHandlers.Results.GetType().FullName}{Environment.NewLine}" +
279  $" Transactions: {AlgorithmHandlers.Transactions.GetType().FullName}{Environment.NewLine}" +
280  $" Object Store: {AlgorithmHandlers.ObjectStore.GetType().FullName}{Environment.NewLine}" +
281  $" History Provider: {historyProviderName}{Environment.NewLine}" +
282  $" Brokerage: {brokerage?.GetType().FullName}{Environment.NewLine}" +
283  $" Data Provider: {AlgorithmHandlers.DataProvider.GetType().FullName}{Environment.NewLine}");
284 
285  //-> Using the job + initialization: load the designated handlers:
286  if (initializeComplete)
287  {
288  // notify the LEAN manager that the algorithm is initialized and starting
289  SystemHandlers.LeanManager.OnAlgorithmStart();
290 
291  //-> Reset the backtest stopwatch; we're now running the algorithm.
292  var startTime = DateTime.UtcNow;
293 
294  //Set algorithm as locked; set it to live mode if we're trading live, and set it to locked for no further updates.
295  algorithm.SetAlgorithmId(job.AlgorithmId);
296  algorithm.SetLocked();
297 
298  //Load the associated handlers for transaction and realtime events:
299  AlgorithmHandlers.Transactions.Initialize(algorithm, brokerage, AlgorithmHandlers.Results);
300  AlgorithmHandlers.RealTime.Setup(algorithm, job, AlgorithmHandlers.Results, SystemHandlers.Api, algorithmManager.TimeLimit);
301 
302  // wire up the brokerage message handler
303  brokerage.Message += (sender, message) =>
304  {
305  algorithm.BrokerageMessageHandler.HandleMessage(message);
306 
307  // fire brokerage message events
308  algorithm.OnBrokerageMessage(message);
309  switch (message.Type)
310  {
311  case BrokerageMessageType.Disconnect:
312  algorithm.OnBrokerageDisconnect();
313  break;
314  case BrokerageMessageType.Reconnect:
315  algorithm.OnBrokerageReconnect();
316  break;
317  }
318  };
319 
320  // Result manager scanning message queue: (started earlier)
321  AlgorithmHandlers.Results.DebugMessage(
322  $"Launching analysis for {job.AlgorithmId} with LEAN Engine v{Globals.Version}");
323 
324  try
325  {
326  //Create a new engine isolator class
327  var isolator = new Isolator();
328 
329  // Execute the Algorithm Code:
330  var complete = isolator.ExecuteWithTimeLimit(AlgorithmHandlers.Setup.MaximumRuntime, algorithmManager.TimeLimit.IsWithinLimit, () =>
331  {
332  try
333  {
334  //Run Algorithm Job:
335  // -> Using this Data Feed,
336  // -> Send Orders to this TransactionHandler,
337  // -> Send Results to ResultHandler.
338  algorithmManager.Run(job, algorithm, synchronizer, AlgorithmHandlers.Transactions, AlgorithmHandlers.Results, AlgorithmHandlers.RealTime, SystemHandlers.LeanManager, isolator.CancellationTokenSource);
339  }
340  catch (Exception err)
341  {
342  algorithm.SetRuntimeError(err, "AlgorithmManager.Run");
343  return;
344  }
345 
346  Log.Trace("Engine.Run(): Exiting Algorithm Manager");
347  }, job.Controls.RamAllocation, workerThread:workerThread, sleepIntervalMillis: algorithm.LiveMode ? 10000 : 1000);
348 
349  if (!complete)
350  {
351  Log.Error("Engine.Main(): Failed to complete in time: " + AlgorithmHandlers.Setup.MaximumRuntime.ToStringInvariant("F"));
352  throw new Exception("Failed to complete algorithm within " + AlgorithmHandlers.Setup.MaximumRuntime.ToStringInvariant("F")
353  + " seconds. Please make it run faster.");
354  }
355  }
356  catch (Exception err)
357  {
358  //Error running the user algorithm: purge datafeed, send error messages, set algorithm status to failed.
359  algorithm.SetRuntimeError(err, "Engine Isolator");
360  }
361 
362  // Algorithm runtime error:
363  if (algorithm.RunTimeError != null)
364  {
365  HandleAlgorithmError(job, algorithm.RunTimeError);
366  }
367 
368  // notify the LEAN manager that the algorithm has finished
369  SystemHandlers.LeanManager.OnAlgorithmEnd();
370 
371  try
372  {
373  var csvTransactionsFileName = Config.Get("transaction-log");
374  if (!string.IsNullOrEmpty(csvTransactionsFileName))
375  {
376  SaveListOfTrades(AlgorithmHandlers.Transactions, csvTransactionsFileName);
377  }
378 
379  if (!_liveMode)
380  {
381  //Diagnostics Completed, Send Result Packet:
382  var totalSeconds = (DateTime.UtcNow - startTime).TotalSeconds;
383  var dataPoints = algorithmManager.DataPoints + algorithm.HistoryProvider.DataPointCount;
384  var kps = dataPoints / (double) 1000 / totalSeconds;
385  AlgorithmHandlers.Results.DebugMessage($"Algorithm Id:({job.AlgorithmId}) completed in {totalSeconds:F2} seconds at {kps:F0}k data points per second. Processing total of {dataPoints:N0} data points.");
386  }
387  }
388  catch (Exception err)
389  {
390  Log.Error(err, "Error sending analysis results");
391  }
392 
393  //Before we return, send terminate commands to close up the threads
396  dataManager?.RemoveAllSubscriptions();
397  workerThread?.Dispose();
398  }
399 
400  synchronizer.DisposeSafely();
401  // Close data feed, alphas. Could be running even if algorithm initialization failed
402  AlgorithmHandlers.DataFeed.Exit();
403 
404  //Close result handler:
405  AlgorithmHandlers.Results.Exit();
406 
407  //Wait for the threads to complete:
408  var millisecondInterval = 10;
409  var millisecondTotalWait = 0;
410  while ((AlgorithmHandlers.Results.IsActive
411  || (AlgorithmHandlers.Transactions != null && AlgorithmHandlers.Transactions.IsActive)
412  || (AlgorithmHandlers.DataFeed != null && AlgorithmHandlers.DataFeed.IsActive)
413  || (AlgorithmHandlers.RealTime != null && AlgorithmHandlers.RealTime.IsActive))
414  && millisecondTotalWait < 30*1000)
415  {
416  Thread.Sleep(millisecondInterval);
417  if (millisecondTotalWait % (millisecondInterval * 10) == 0)
418  {
419  Log.Trace("Waiting for threads to exit...");
420  }
421  millisecondTotalWait += millisecondInterval;
422  }
423 
424  if (brokerage != null)
425  {
426  Log.Trace("Engine.Run(): Disconnecting from brokerage...");
427  brokerage.Disconnect();
428  brokerage.Dispose();
429  }
430  if (AlgorithmHandlers.Setup != null)
431  {
432  Log.Trace("Engine.Run(): Disposing of setup handler...");
433  AlgorithmHandlers.Setup.Dispose();
434  }
435 
436  Log.Trace("Engine.Main(): Analysis Completed and Results Posted.");
437  }
438  catch (Exception err)
439  {
440  Log.Error(err, "Error running algorithm");
441  }
442  finally
443  {
444  //No matter what for live mode; make sure we've set algorithm status in the API for "not running" conditions:
445  if (_liveMode && algorithmManager.State != AlgorithmStatus.Running && algorithmManager.State != AlgorithmStatus.RuntimeError)
446  SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, algorithmManager.State);
447 
448  AlgorithmHandlers.Results.Exit();
449  AlgorithmHandlers.DataFeed.Exit();
450  AlgorithmHandlers.Transactions.Exit();
451  AlgorithmHandlers.RealTime.Exit();
452  AlgorithmHandlers.DataMonitor.Exit();
453  }
454  }
455 
456  /// <summary>
457  /// Handle an error in the algorithm.Run method.
458  /// </summary>
459  /// <param name="job">Job we're processing</param>
460  /// <param name="err">Error from algorithm stack</param>
461  private void HandleAlgorithmError(AlgorithmNodePacket job, Exception err)
462  {
463  AlgorithmHandlers.DataFeed?.Exit();
464  if (AlgorithmHandlers.Results != null)
465  {
466  var message = $"Runtime Error: {err.Message}";
467  Log.Trace("Engine.Run(): Sending runtime error to user...");
468  AlgorithmHandlers.Results.LogMessage(message);
469 
470  // for python we don't add the ugly pythonNet stack trace
471  var stackTrace = job.Language == Language.Python ? err.Message : err.ToString();
472 
473  AlgorithmHandlers.Results.RuntimeError(message, stackTrace);
474  SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, AlgorithmStatus.RuntimeError, $"{message} Stack Trace: {stackTrace}");
475  }
476  }
477 
478  /// <summary>
479  /// Load the history provider from the Composer
480  /// </summary>
481  private HistoryProviderManager GetHistoryProvider()
482  {
483  var provider = new HistoryProviderManager();
484 
485  provider.InvalidConfigurationDetected += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message); };
486  provider.NumericalPrecisionLimited += (sender, args) =>
487  {
488  if (!_historyNumericalPrecisionLimitedWarningEmitted)
489  {
490  _historyNumericalPrecisionLimitedWarningEmitted = true;
491  AlgorithmHandlers.Results.DebugMessage("Warning: when performing history requests, the start date will be adjusted if there are numerical precision errors in the factor files.");
492  }
493  };
494  provider.StartDateLimited += (sender, args) =>
495  {
496  if (!_historyStartDateLimitedWarningEmitted)
497  {
498  _historyStartDateLimitedWarningEmitted = true;
499  AlgorithmHandlers.Results.DebugMessage("Warning: when performing history requests, the start date will be adjusted if it is before the first known date for the symbol.");
500  }
501  };
502  provider.DownloadFailed += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message, args.StackTrace); };
503  provider.ReaderErrorDetected += (sender, args) => { AlgorithmHandlers.Results.RuntimeError(args.Message, args.StackTrace); };
504 
505  return provider;
506  }
507 
508  /// <summary>
509  /// Save a list of trades to disk for a given path
510  /// </summary>
511  /// <param name="transactions">Transactions list via an OrderProvider</param>
512  /// <param name="csvFileName">File path to create</param>
513  private static void SaveListOfTrades(IOrderProvider transactions, string csvFileName)
514  {
515  var orders = transactions.GetOrders(x => x.Status.IsFill());
516 
517  var path = Path.GetDirectoryName(csvFileName);
518  if (path != null && !Directory.Exists(path))
519  Directory.CreateDirectory(path);
520 
521  using (var writer = new StreamWriter(csvFileName))
522  {
523  foreach (var order in orders)
524  {
525  var line = Invariant($"{order.Time.ToStringInvariant("yyyy-MM-dd HH:mm:ss")},") +
526  Invariant($"{order.Symbol.Value},{order.Direction},{order.Quantity},{order.Price}");
527  writer.WriteLine(line);
528  }
529  }
530  }
531 
532  /// <summary>
533  /// Initialize slow static variables
534  /// </summary>
535  [MethodImpl(MethodImplOptions.NoOptimization | MethodImplOptions.NoInlining)]
536  private static MarketHoursDatabase StaticInitializations()
537  {
538  // This is slow because it create all static timezones
539  var nyTime = TimeZones.NewYork;
540  // slow because if goes to disk and parses json
542  }
543 
544  } // End Algorithm Node Core Thread
545 } // End Namespace