20 using System.Runtime.CompilerServices;
21 using System.Threading;
22 using System.Threading.Tasks;
49 private bool _historyStartDateLimitedWarningEmitted;
50 private bool _historyNumericalPrecisionLimitedWarningEmitted;
51 private readonly
bool _liveMode;
85 var marketHoursDatabaseTask = Task.Run(() => StaticInitializations());
88 var algorithmManager = manager;
92 Log.
Trace($
"Engine.Run(): Resource limits '{job.Controls.CpuAllocation}' CPUs. {job.Controls.RamAllocation} MB RAM.");
96 var initializeComplete =
false;
111 var marketHoursDatabase = marketHoursDatabaseTask.Result;
145 var securityService =
new SecurityService(algorithm.Portfolio.CashBook,
147 symbolPropertiesDatabase,
149 registeredTypesProvider,
151 mapFilePrimaryExchangeProvider,
154 algorithm.Securities.SetSecurityService(securityService);
163 algorithm.TimeKeeper,
166 registeredTypesProvider,
169 algorithm.SubscriptionManager.SetDataManager(dataManager);
171 synchronizer.Initialize(algorithm, dataManager);
189 var historyProvider = GetHistoryProvider();
190 historyProvider.SetBrokerage(brokerage);
191 historyProvider.Initialize(
202 if (!algorithm.GetLocked() || algorithm.IsWarmingUp)
204 AlgorithmHandlers.Results.SendStatusUpdate(AlgorithmStatus.History,
205 Invariant($
"Processing history {progress}%..."));
209 parallelHistoryRequestsEnabled: !_liveMode,
211 objectStore: algorithm.ObjectStore,
212 algorithmSettings: algorithm.Settings
216 historyProvider.InvalidConfigurationDetected += (sender, args) => {
AlgorithmHandlers.
Results.ErrorMessage(args.Message); };
217 historyProvider.DownloadFailed += (sender, args) => {
AlgorithmHandlers.
Results.ErrorMessage(args.Message, args.StackTrace); };
218 historyProvider.ReaderErrorDetected += (sender, args) => {
AlgorithmHandlers.
Results.RuntimeError(args.Message, args.StackTrace); };
221 algorithm.HistoryProvider = historyProvider;
224 algorithm.BrokerageMessageHandler = factory.CreateBrokerageMessageHandler(algorithm, job,
SystemHandlers.
Api);
237 initializeComplete =
false;
239 var errorMessage =
string.Join(
",", algorithm.ErrorMessages);
240 string stackTrace =
"";
243 var message = e.Message;
244 if (e.InnerException !=
null)
247 var err = interpreter.Interpret(e.InnerException);
248 var stackMessage = interpreter.GetExceptionMessageHeader(err);
249 message += stackMessage;
250 stackTrace += stackMessage;
254 Log.
Error(
"Engine.Run(): " + errorMessage);
259 catch (Exception err)
264 var stackTrace = job.Language ==
Language.Python ? err.Message : err.ToString();
266 var runtimeMessage =
"Algorithm.Initialize() Error: " + err.Message +
" Stack Trace: " + stackTrace;
272 var historyProviderName = algorithm?.HistoryProvider !=
null ? algorithm.HistoryProvider.GetType().FullName :
string.Empty;
274 Log.
Trace($
"JOB HANDLERS:{Environment.NewLine}" +
275 $
" DataFeed: {AlgorithmHandlers.DataFeed.GetType().FullName}{Environment.NewLine}" +
276 $
" Setup: {AlgorithmHandlers.Setup.GetType().FullName}{Environment.NewLine}" +
277 $
" RealTime: {AlgorithmHandlers.RealTime.GetType().FullName}{Environment.NewLine}" +
278 $
" Results: {AlgorithmHandlers.Results.GetType().FullName}{Environment.NewLine}" +
279 $
" Transactions: {AlgorithmHandlers.Transactions.GetType().FullName}{Environment.NewLine}" +
280 $
" Object Store: {AlgorithmHandlers.ObjectStore.GetType().FullName}{Environment.NewLine}" +
281 $
" History Provider: {historyProviderName}{Environment.NewLine}" +
282 $
" Brokerage: {brokerage?.GetType().FullName}{Environment.NewLine}" +
283 $
" Data Provider: {AlgorithmHandlers.DataProvider.GetType().FullName}{Environment.NewLine}");
286 if (initializeComplete)
292 var startTime = DateTime.UtcNow;
295 algorithm.SetAlgorithmId(job.AlgorithmId);
296 algorithm.SetLocked();
303 brokerage.Message += (sender, message) =>
305 algorithm.BrokerageMessageHandler.HandleMessage(message);
308 algorithm.OnBrokerageMessage(message);
309 switch (message.Type)
312 algorithm.OnBrokerageDisconnect();
315 algorithm.OnBrokerageReconnect();
322 $
"Launching analysis for {job.AlgorithmId} with LEAN Engine v{Globals.Version}");
327 var isolator =
new Isolator();
338 algorithmManager.Run(job, algorithm, synchronizer, AlgorithmHandlers.Transactions, AlgorithmHandlers.Results, AlgorithmHandlers.RealTime, SystemHandlers.LeanManager, isolator.CancellationTokenSource);
340 catch (Exception err)
342 algorithm.SetRuntimeError(err,
"AlgorithmManager.Run");
346 Log.
Trace(
"Engine.Run(): Exiting Algorithm Manager");
347 }, job.Controls.RamAllocation, workerThread:workerThread, sleepIntervalMillis: algorithm.LiveMode ? 10000 : 1000);
353 +
" seconds. Please make it run faster.");
356 catch (Exception err)
359 algorithm.SetRuntimeError(err,
"Engine Isolator");
363 if (algorithm.RunTimeError !=
null)
365 HandleAlgorithmError(job, algorithm.RunTimeError);
373 var csvTransactionsFileName =
Config.
Get(
"transaction-log");
374 if (!
string.IsNullOrEmpty(csvTransactionsFileName))
382 var totalSeconds = (DateTime.UtcNow - startTime).TotalSeconds;
383 var dataPoints = algorithmManager.DataPoints + algorithm.HistoryProvider.DataPointCount;
384 var kps = dataPoints / (double) 1000 / totalSeconds;
385 AlgorithmHandlers.
Results.DebugMessage($
"Algorithm Id:({job.AlgorithmId}) completed in {totalSeconds:F2} seconds at {kps:F0}k data points per second. Processing total of {dataPoints:N0} data points.");
388 catch (Exception err)
390 Log.
Error(err,
"Error sending analysis results");
396 dataManager?.RemoveAllSubscriptions();
397 workerThread?.Dispose();
400 synchronizer.DisposeSafely();
402 AlgorithmHandlers.DataFeed.Exit();
405 AlgorithmHandlers.Results.Exit();
408 var millisecondInterval = 10;
409 var millisecondTotalWait = 0;
410 while ((AlgorithmHandlers.Results.IsActive
411 || (AlgorithmHandlers.Transactions !=
null && AlgorithmHandlers.Transactions.IsActive)
412 || (AlgorithmHandlers.DataFeed !=
null && AlgorithmHandlers.DataFeed.IsActive)
413 || (AlgorithmHandlers.RealTime !=
null && AlgorithmHandlers.RealTime.IsActive))
414 && millisecondTotalWait < 30*1000)
416 Thread.Sleep(millisecondInterval);
417 if (millisecondTotalWait % (millisecondInterval * 10) == 0)
419 Log.
Trace(
"Waiting for threads to exit...");
421 millisecondTotalWait += millisecondInterval;
424 if (brokerage !=
null)
426 Log.
Trace(
"Engine.Run(): Disconnecting from brokerage...");
427 brokerage.Disconnect();
430 if (AlgorithmHandlers.Setup !=
null)
432 Log.
Trace(
"Engine.Run(): Disposing of setup handler...");
433 AlgorithmHandlers.Setup.Dispose();
436 Log.
Trace(
"Engine.Main(): Analysis Completed and Results Posted.");
438 catch (Exception err)
440 Log.
Error(err,
"Error running algorithm");
446 SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, algorithmManager.State);
448 AlgorithmHandlers.Results.Exit();
449 AlgorithmHandlers.DataFeed.Exit();
450 AlgorithmHandlers.Transactions.Exit();
451 AlgorithmHandlers.RealTime.Exit();
452 AlgorithmHandlers.DataMonitor.Exit();
463 AlgorithmHandlers.DataFeed?.Exit();
464 if (AlgorithmHandlers.Results !=
null)
466 var message = $
"Runtime Error: {err.Message}";
467 Log.
Trace(
"Engine.Run(): Sending runtime error to user...");
468 AlgorithmHandlers.Results.LogMessage(message);
471 var stackTrace = job.
Language ==
Language.Python ? err.Message : err.ToString();
473 AlgorithmHandlers.Results.RuntimeError(message, stackTrace);
485 provider.InvalidConfigurationDetected += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message); };
486 provider.NumericalPrecisionLimited += (sender, args) =>
488 if (!_historyNumericalPrecisionLimitedWarningEmitted)
490 _historyNumericalPrecisionLimitedWarningEmitted =
true;
491 AlgorithmHandlers.Results.DebugMessage(
"Warning: when performing history requests, the start date will be adjusted if there are numerical precision errors in the factor files.");
494 provider.StartDateLimited += (sender, args) =>
496 if (!_historyStartDateLimitedWarningEmitted)
498 _historyStartDateLimitedWarningEmitted =
true;
499 AlgorithmHandlers.Results.DebugMessage(
"Warning: when performing history requests, the start date will be adjusted if it is before the first known date for the symbol.");
502 provider.DownloadFailed += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message, args.StackTrace); };
503 provider.ReaderErrorDetected += (sender, args) => { AlgorithmHandlers.Results.RuntimeError(args.Message, args.StackTrace); };
513 private static void SaveListOfTrades(
IOrderProvider transactions,
string csvFileName)
515 var orders = transactions.
GetOrders(x => x.Status.IsFill());
517 var path = Path.GetDirectoryName(csvFileName);
518 if (path !=
null && !Directory.Exists(path))
519 Directory.CreateDirectory(path);
521 using (var writer =
new StreamWriter(csvFileName))
523 foreach (var order
in orders)
525 var line = Invariant($
"{order.Time.ToStringInvariant("yyyy-MM-dd HH:mm:ss
")},") +
526 Invariant($
"{order.Symbol.Value},{order.Direction},{order.Quantity},{order.Price}");
527 writer.WriteLine(line);
535 [MethodImpl(MethodImplOptions.NoOptimization | MethodImplOptions.NoInlining)]
539 var nyTime = TimeZones.NewYork;