20 using System.Runtime.CompilerServices;
21 using System.Threading;
22 using System.Threading.Tasks;
49 private bool _historyStartDateLimitedWarningEmitted;
50 private bool _historyNumericalPrecisionLimitedWarningEmitted;
51 private readonly
bool _liveMode;
85 var marketHoursDatabaseTask = Task.Run(() => StaticInitializations());
88 var algorithmManager = manager;
92 Log.
Trace($
"Engine.Run(): Resource limits '{job.Controls.CpuAllocation}' CPUs. {job.Controls.RamAllocation} MB RAM.");
96 var initializeComplete =
false;
111 var marketHoursDatabase = marketHoursDatabaseTask.Result;
145 var securityService =
new SecurityService(algorithm.Portfolio.CashBook,
147 symbolPropertiesDatabase,
149 registeredTypesProvider,
151 mapFilePrimaryExchangeProvider,
154 algorithm.Securities.SetSecurityService(securityService);
163 algorithm.TimeKeeper,
166 registeredTypesProvider,
169 algorithm.SubscriptionManager.SetDataManager(dataManager);
171 synchronizer.Initialize(algorithm, dataManager);
189 var historyProvider = GetHistoryProvider();
190 historyProvider.SetBrokerage(brokerage);
191 historyProvider.Initialize(
202 if (!algorithm.GetLocked() || algorithm.IsWarmingUp)
204 AlgorithmHandlers.Results.SendStatusUpdate(AlgorithmStatus.History,
205 Invariant($
"Processing history {progress}%..."));
209 parallelHistoryRequestsEnabled: !_liveMode,
211 objectStore: algorithm.ObjectStore
215 historyProvider.InvalidConfigurationDetected += (sender, args) => {
AlgorithmHandlers.
Results.ErrorMessage(args.Message); };
216 historyProvider.DownloadFailed += (sender, args) => {
AlgorithmHandlers.
Results.ErrorMessage(args.Message, args.StackTrace); };
217 historyProvider.ReaderErrorDetected += (sender, args) => {
AlgorithmHandlers.
Results.RuntimeError(args.Message, args.StackTrace); };
219 algorithm.HistoryProvider = historyProvider;
222 algorithm.BrokerageMessageHandler = factory.CreateBrokerageMessageHandler(algorithm, job,
SystemHandlers.
Api);
235 initializeComplete =
false;
237 var errorMessage =
string.Join(
",", algorithm.ErrorMessages);
238 string stackTrace =
"";
241 var message = e.Message;
242 if (e.InnerException !=
null)
245 var err = interpreter.Interpret(e.InnerException);
246 var stackMessage = interpreter.GetExceptionMessageHeader(err);
247 message += stackMessage;
248 stackTrace += stackMessage;
252 Log.
Error(
"Engine.Run(): " + errorMessage);
257 catch (Exception err)
262 var stackTrace = job.Language ==
Language.Python ? err.Message : err.ToString();
264 var runtimeMessage =
"Algorithm.Initialize() Error: " + err.Message +
" Stack Trace: " + stackTrace;
270 var historyProviderName = algorithm?.HistoryProvider !=
null ? algorithm.HistoryProvider.GetType().FullName :
string.Empty;
272 Log.
Trace($
"JOB HANDLERS:{Environment.NewLine}" +
273 $
" DataFeed: {AlgorithmHandlers.DataFeed.GetType().FullName}{Environment.NewLine}" +
274 $
" Setup: {AlgorithmHandlers.Setup.GetType().FullName}{Environment.NewLine}" +
275 $
" RealTime: {AlgorithmHandlers.RealTime.GetType().FullName}{Environment.NewLine}" +
276 $
" Results: {AlgorithmHandlers.Results.GetType().FullName}{Environment.NewLine}" +
277 $
" Transactions: {AlgorithmHandlers.Transactions.GetType().FullName}{Environment.NewLine}" +
278 $
" Object Store: {AlgorithmHandlers.ObjectStore.GetType().FullName}{Environment.NewLine}" +
279 $
" History Provider: {historyProviderName}{Environment.NewLine}" +
280 $
" Brokerage: {brokerage?.GetType().FullName}{Environment.NewLine}" +
281 $
" Data Provider: {AlgorithmHandlers.DataProvider.GetType().FullName}{Environment.NewLine}");
284 if (initializeComplete)
290 var startTime = DateTime.UtcNow;
293 algorithm.SetAlgorithmId(job.AlgorithmId);
294 algorithm.SetLocked();
301 brokerage.Message += (sender, message) =>
303 algorithm.BrokerageMessageHandler.HandleMessage(message);
306 algorithm.OnBrokerageMessage(message);
307 switch (message.Type)
310 algorithm.OnBrokerageDisconnect();
313 algorithm.OnBrokerageReconnect();
320 $
"Launching analysis for {job.AlgorithmId} with LEAN Engine v{Globals.Version}");
325 var isolator =
new Isolator();
336 algorithmManager.Run(job, algorithm, synchronizer, AlgorithmHandlers.Transactions, AlgorithmHandlers.Results, AlgorithmHandlers.RealTime, SystemHandlers.LeanManager, isolator.CancellationToken);
338 catch (Exception err)
340 algorithm.SetRuntimeError(err,
"AlgorithmManager.Run");
344 Log.
Trace(
"Engine.Run(): Exiting Algorithm Manager");
345 }, job.Controls.RamAllocation, workerThread:workerThread, sleepIntervalMillis: algorithm.LiveMode ? 10000 : 1000);
351 +
" seconds. Please make it run faster.");
354 catch (Exception err)
357 algorithm.SetRuntimeError(err,
"Engine Isolator");
361 if (algorithm.RunTimeError !=
null)
363 HandleAlgorithmError(job, algorithm.RunTimeError);
371 var csvTransactionsFileName =
Config.
Get(
"transaction-log");
372 if (!
string.IsNullOrEmpty(csvTransactionsFileName))
380 var totalSeconds = (DateTime.UtcNow - startTime).TotalSeconds;
381 var dataPoints = algorithmManager.DataPoints + algorithm.HistoryProvider.DataPointCount;
382 var kps = dataPoints / (double) 1000 / totalSeconds;
383 AlgorithmHandlers.
Results.DebugMessage($
"Algorithm Id:({job.AlgorithmId}) completed in {totalSeconds:F2} seconds at {kps:F0}k data points per second. Processing total of {dataPoints:N0} data points.");
386 catch (Exception err)
388 Log.
Error(err,
"Error sending analysis results");
394 dataManager?.RemoveAllSubscriptions();
395 workerThread?.Dispose();
398 synchronizer.DisposeSafely();
400 AlgorithmHandlers.DataFeed.Exit();
403 AlgorithmHandlers.Results.Exit();
406 var millisecondInterval = 10;
407 var millisecondTotalWait = 0;
408 while ((AlgorithmHandlers.Results.IsActive
409 || (AlgorithmHandlers.Transactions !=
null && AlgorithmHandlers.Transactions.IsActive)
410 || (AlgorithmHandlers.DataFeed !=
null && AlgorithmHandlers.DataFeed.IsActive)
411 || (AlgorithmHandlers.RealTime !=
null && AlgorithmHandlers.RealTime.IsActive))
412 && millisecondTotalWait < 30*1000)
414 Thread.Sleep(millisecondInterval);
415 if (millisecondTotalWait % (millisecondInterval * 10) == 0)
417 Log.
Trace(
"Waiting for threads to exit...");
419 millisecondTotalWait += millisecondInterval;
422 if (brokerage !=
null)
424 Log.
Trace(
"Engine.Run(): Disconnecting from brokerage...");
425 brokerage.Disconnect();
428 if (AlgorithmHandlers.Setup !=
null)
430 Log.
Trace(
"Engine.Run(): Disposing of setup handler...");
431 AlgorithmHandlers.Setup.Dispose();
434 Log.
Trace(
"Engine.Main(): Analysis Completed and Results Posted.");
436 catch (Exception err)
438 Log.
Error(err,
"Error running algorithm");
444 SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, algorithmManager.State);
446 AlgorithmHandlers.Results.Exit();
447 AlgorithmHandlers.DataFeed.Exit();
448 AlgorithmHandlers.Transactions.Exit();
449 AlgorithmHandlers.RealTime.Exit();
450 AlgorithmHandlers.DataMonitor.Exit();
461 AlgorithmHandlers.DataFeed?.Exit();
462 if (AlgorithmHandlers.Results !=
null)
464 var message = $
"Runtime Error: {err.Message}";
465 Log.
Trace(
"Engine.Run(): Sending runtime error to user...");
466 AlgorithmHandlers.Results.LogMessage(message);
469 var stackTrace = job.
Language ==
Language.Python ? err.Message : err.ToString();
471 AlgorithmHandlers.Results.RuntimeError(message, stackTrace);
483 provider.InvalidConfigurationDetected += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message); };
484 provider.NumericalPrecisionLimited += (sender, args) =>
486 if (!_historyNumericalPrecisionLimitedWarningEmitted)
488 _historyNumericalPrecisionLimitedWarningEmitted =
true;
489 AlgorithmHandlers.Results.DebugMessage(
"Warning: when performing history requests, the start date will be adjusted if there are numerical precision errors in the factor files.");
492 provider.StartDateLimited += (sender, args) =>
494 if (!_historyStartDateLimitedWarningEmitted)
496 _historyStartDateLimitedWarningEmitted =
true;
497 AlgorithmHandlers.Results.DebugMessage(
"Warning: when performing history requests, the start date will be adjusted if it is before the first known date for the symbol.");
500 provider.DownloadFailed += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message, args.StackTrace); };
501 provider.ReaderErrorDetected += (sender, args) => { AlgorithmHandlers.Results.RuntimeError(args.Message, args.StackTrace); };
511 private static void SaveListOfTrades(
IOrderProvider transactions,
string csvFileName)
513 var orders = transactions.
GetOrders(x => x.Status.IsFill());
515 var path = Path.GetDirectoryName(csvFileName);
516 if (path !=
null && !Directory.Exists(path))
517 Directory.CreateDirectory(path);
519 using (var writer =
new StreamWriter(csvFileName))
521 foreach (var order
in orders)
523 var line = Invariant($
"{order.Time.ToStringInvariant("yyyy-MM-dd HH:mm:ss
")},") +
524 Invariant($
"{order.Symbol.Value},{order.Direction},{order.Quantity},{order.Price}");
525 writer.WriteLine(line);
533 [MethodImpl(MethodImplOptions.NoOptimization | MethodImplOptions.NoInlining)]
537 var nyTime = TimeZones.NewYork;