18 using System.Collections.Generic;
20 using System.Threading;
49 private readonly
object _lock;
50 private readonly
bool _liveMode;
99 CreateTokenBucket(job?.
Controls?.TrainingLimits),
100 TimeSpan.FromMinutes(
Config.
GetDouble(
"algorithm-manager-time-loop-maximum", 20))
120 _algorithm = algorithm;
123 var methodInvokers =
new Dictionary<Type, MethodInvoker>();
124 var marginCallFrequency = TimeSpan.FromMinutes(5);
125 var nextMarginCallTime = DateTime.MinValue;
129 var pendingDelistings =
new List<Delisting>();
130 var splitWarnings =
new List<Split>();
136 foreach (var config
in algorithm.SubscriptionManager.Subscriptions)
139 if (config.IsCustomData)
142 var genericMethod = (algorithm.GetType()).GetMethod(
"OnData",
new[] { config.Type });
145 if (methodInvokers.ContainsKey(config.Type))
continue;
147 if (genericMethod !=
null)
149 methodInvokers.Add(config.Type, genericMethod.DelegateForCallMethod());
155 algorithm.Schedule.On(
"Daily Sampling", algorithm.Schedule.DateRules.EveryDay(),
156 algorithm.Schedule.TimeRules.Midnight, () =>
158 results.Sample(algorithm.UtcTime);
162 Log.
Trace($
"AlgorithmManager.Run(): Begin DataStream - Start: {algorithm.StartDate} Stop: {algorithm.EndDate} Time: {algorithm.Time} Warmup: {algorithm.IsWarmingUp}");
163 foreach (var timeSlice
in Stream(algorithm, synchronizer, results, token))
171 Log.
Error($
"AlgorithmManager.Run(): Algorithm state changed to {_algorithm.Status} at {timeSlice.Time.ToStringInvariant()}");
176 if (token.IsCancellationRequested)
178 Log.
Error($
"AlgorithmManager.Run(): CancellationRequestion at {timeSlice.Time.ToStringInvariant()}");
183 leanManager.Update();
185 time = timeSlice.Time;
188 if (backtestMode && algorithm.Portfolio.TotalPortfolioValue <= 0)
190 var logMessage =
"AlgorithmManager.Run(): Portfolio value is less than or equal to zero, stopping algorithm.";
192 results.SystemDebugMessage(logMessage);
199 realtime.ScanPastEvents(time);
202 algorithm.SubscriptionManager.ScanPastConsolidators(time, algorithm);
205 algorithm.SetDateTime(time);
208 if (timeSlice.IsTimePulse)
214 algorithm.SetCurrentSlice(timeSlice.Slice);
216 if (timeSlice.Slice.SymbolChangedEvents.Count != 0)
220 algorithm.OnSymbolChangedEvents(timeSlice.Slice.SymbolChangedEvents);
222 catch (Exception err)
224 algorithm.SetRuntimeError(err,
"OnSymbolChangedEvents");
228 foreach (var symbol
in timeSlice.Slice.SymbolChangedEvents.Keys)
231 foreach (var ticket
in transactions.GetOpenOrderTickets(x => x.Symbol == symbol))
233 ticket.Cancel(
"Open order cancelled on symbol changed event");
240 algorithm.ProcessSecurityChanges(timeSlice.SecurityChanges);
242 leanManager.OnSecuritiesChanged(timeSlice.SecurityChanges);
243 realtime.OnSecuritiesChanged(timeSlice.SecurityChanges);
244 results.OnSecuritiesChanged(timeSlice.SecurityChanges);
248 foreach (var update
in timeSlice.SecuritiesUpdateData)
250 var security = update.Target;
252 security.Update(update.Data, update.DataType, update.ContainsFillForwardData);
255 algorithm.TradeBuilder.SetMarketPrice(security.Symbol, security.Price);
259 if (time >= nextSecurityModelScan)
261 foreach (var security
in algorithm.Securities.Values)
272 if (timeSlice.UniverseData.Count > 0)
274 foreach (var dataCollection
in timeSlice.UniverseData.Values)
276 if (!dataCollection.ShouldCacheToSecurity())
continue;
278 foreach (var data
in dataCollection.Data)
280 if (algorithm.Securities.TryGetValue(data.Symbol, out var security))
282 security.Cache.StoreData(
new[] { data }, data.GetType());
289 foreach (var cash
in algorithm.Portfolio.CashBook.Values.Where(x => x.CurrencyConversion !=
null))
295 algorithm.Portfolio.InvalidateTotalPortfolioValue();
298 transactions.ProcessSynchronousEvents();
301 realtime.SetTime(timeSlice.Time);
304 ProcessSplitSymbols(algorithm, splitWarnings, pendingDelistings);
309 Log.
Error($
"AlgorithmManager.Run(): Algorithm state changed to {_algorithm.Status} at {timeSlice.Time.ToStringInvariant()}");
312 if (algorithm.RunTimeError !=
null)
314 Log.
Error($
"AlgorithmManager.Run(): Stopping, encountered a runtime error at {algorithm.UtcTime} UTC.");
319 if (time >= nextMarginCallTime || (_liveMode && nextMarginCallTime > DateTime.UtcNow))
322 bool issueMarginCallWarning;
323 var marginCallOrders = algorithm.Portfolio.MarginCallModel.GetMarginCallOrders(out issueMarginCallWarning);
324 if (marginCallOrders.Count != 0)
326 var executingMarginCall =
false;
330 algorithm.OnMarginCall(marginCallOrders);
332 executingMarginCall =
true;
335 var executedTickets = algorithm.Portfolio.MarginCallModel.ExecuteMarginCall(marginCallOrders);
336 foreach (var ticket
in executedTickets)
338 algorithm.Error($
"{algorithm.Time.ToStringInvariant()} - Executed MarginCallOrder: {ticket.Symbol} - " +
339 $
"Quantity: {ticket.Quantity.ToStringInvariant()} @ {ticket.AverageFillPrice.ToStringInvariant()}"
343 catch (Exception err)
345 algorithm.SetRuntimeError(err, executingMarginCall ?
"Portfolio.MarginCallModel.ExecuteMarginCall" :
"OnMarginCall");
350 else if (issueMarginCallWarning)
354 algorithm.OnMarginCallWarning();
356 catch (Exception err)
358 algorithm.SetRuntimeError(err,
"OnMarginCallWarning");
363 nextMarginCallTime = time + marginCallFrequency;
371 var algorithmSecurityChanges =
new SecurityChanges(timeSlice.SecurityChanges)
374 FilterCustomSecurities =
true,
376 FilterInternalSecurities =
true
379 algorithm.OnSecuritiesChanged(algorithmSecurityChanges);
380 algorithm.OnFrameworkSecuritiesChanged(algorithmSecurityChanges);
382 catch (Exception err)
384 algorithm.SetRuntimeError(err,
"OnSecuritiesChanged");
398 if (timeSlice.ConsolidatorUpdateData.Count > 0)
400 var timeKeeper = algorithm.TimeKeeper;
401 foreach (var update
in timeSlice.ConsolidatorUpdateData)
403 var localTime = timeKeeper.GetLocalTimeKeeper(update.Target.ExchangeTimeZone).LocalTime;
404 var consolidators = update.Target.Consolidators;
405 foreach (var consolidator
in consolidators)
407 foreach (var dataPoint
in update.Data)
410 if (EndTimeIsInNativeResolution(update.Target, dataPoint.EndTime))
412 consolidator.Update(dataPoint);
417 consolidator.Scan(localTime);
422 catch (Exception err)
424 algorithm.SetRuntimeError(err,
"Consolidators update");
429 foreach (var update
in timeSlice.CustomData)
431 MethodInvoker methodInvoker;
432 if (!methodInvokers.TryGetValue(update.DataType, out methodInvoker))
439 foreach (var dataPoint
in update.Data)
441 if (update.DataType.IsInstanceOfType(dataPoint))
443 methodInvoker(algorithm, dataPoint);
447 catch (Exception err)
449 algorithm.SetRuntimeError(err,
"Custom Data");
456 if (timeSlice.Slice.Splits.Count != 0)
458 algorithm.OnSplits(timeSlice.Slice.Splits);
461 catch (Exception err)
463 algorithm.SetRuntimeError(err,
"OnSplits");
469 if (timeSlice.Slice.Dividends.Count != 0)
471 algorithm.OnDividends(timeSlice.Slice.Dividends);
474 catch (Exception err)
476 algorithm.SetRuntimeError(err,
"OnDividends");
482 if (timeSlice.Slice.Delistings.Count != 0)
484 algorithm.OnDelistings(timeSlice.Slice.Delistings);
487 catch (Exception err)
489 algorithm.SetRuntimeError(err,
"OnDelistings");
494 if (!algorithm.LiveMode)
497 foreach (var delisting
in timeSlice.Slice.Delistings.Values)
502 pendingDelistings.Add(delisting);
507 var index = pendingDelistings.FindIndex(x => x.Symbol == delisting.Symbol);
510 pendingDelistings.RemoveAt(index);
517 HandleSplitSymbols(timeSlice.Slice.Splits, splitWarnings);
521 if (timeSlice.Slice.HasData)
524 algorithm.OnData(algorithm.CurrentSlice);
528 algorithm.OnFrameworkData(timeSlice.Slice);
530 catch (Exception err)
532 algorithm.SetRuntimeError(err,
"OnData");
538 transactions.ProcessSynchronousEvents();
541 results.ProcessSynchronousEvents();
544 algorithm.OnEndOfTimeStep();
552 Log.
Trace(
"AlgorithmManager.Run(): Firing On End Of Algorithm...");
555 algorithm.OnEndOfAlgorithm();
557 catch (Exception err)
559 algorithm.SetRuntimeError(err,
"OnEndOfAlgorithm");
564 results.ProcessSynchronousEvents(forceProcess:
true);
569 Log.
Trace(
"AlgorithmManager.Run(): Liquidating algorithm holdings...");
570 algorithm.Liquidate();
571 results.LogMessage(
"Algorithm Liquidated");
578 Log.
Trace(
"AlgorithmManager.Run(): Stopping algorithm...");
579 results.LogMessage(
"Algorithm Stopped");
586 Log.
Trace(
"AlgorithmManager.Run(): Deleting algorithm...");
587 results.DebugMessage(
"Algorithm Id:(" + job.AlgorithmId +
") Deleted by request.");
596 results.Sample(time);
619 var nextWarmupStatusTime = DateTime.MinValue;
621 var warmingUpPercent = 0;
624 nextWarmupStatusTime = DateTime.UtcNow.AddSeconds(1);
625 algorithm.
Debug(
"Algorithm starting warm up...");
637 var startTimeTicks = algorithm.
UtcTime.Ticks;
643 warmupEndTicks = DateTime.UtcNow.Ticks;
647 foreach (var timeSlice
in synchronizer.
StreamData(cancellationToken))
651 var now = DateTime.UtcNow;
652 if (now > nextWarmupStatusTime)
656 nextWarmupStatusTime = now.AddSeconds(2);
657 var newPercent = (int)(100 * (timeSlice.Time.Ticks - startTimeTicks) / (double)(warmupEndTicks - startTimeTicks));
659 if (newPercent != warmingUpPercent)
661 warmingUpPercent = newPercent;
662 algorithm.
Debug($
"Processing algorithm warm-up request {warmingUpPercent}%...");
673 algorithm.
Debug(
"Algorithm finished warming up.");
676 yield
return timeSlice;
688 Log.
Trace(
"ProcessVolatilityHistoryRequirements(): Updating volatility models with historical data...");
696 Log.
Trace(
"ProcessVolatilityHistoryRequirements(): finished.");
704 foreach (var split
in timeSlice.Slice.Splits.Values)
709 if (split.Type !=
SplitType.SplitOccurred)
717 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Skip Split during live warmup: {split}");
723 Log.
Debug($
"AlgorithmManager.Run(): {algorithm.Time}: Applying Split for {split.Symbol}");
729 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Pre-Split for {split}. Security Price: {security.Price} Holdings: {security.Holdings.Quantity}");
734 .DataNormalizationMode();
743 ApplySplitOrDividendToVolatilityModel(algorithm, security, liveMode, mode);
745 if (liveMode && security !=
null)
747 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Post-Split for {split}. Security Price: {security.Price} Holdings: {security.Holdings.Quantity}");
758 catch (Exception err)
760 algorithm.SetRuntimeError(err,
"Split event");
771 foreach (var dividend
in timeSlice.Slice.Dividends.Values)
776 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Skip Dividend during live warmup: {dividend}");
782 Log.
Debug($
"AlgorithmManager.Run(): {algorithm.Time}: Applying Dividend: {dividend}");
788 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Pre-Dividend: {dividend}. " +
789 $
"Security Holdings: {security.Holdings.Quantity} Account Currency Holdings: " +
790 $
"{algorithm.Portfolio.CashBook[algorithm.AccountCurrency].Amount}");
795 .DataNormalizationMode();
801 ApplySplitOrDividendToVolatilityModel(algorithm, security, liveMode, mode);
803 if (liveMode && security !=
null)
805 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Post-Dividend: {dividend}. Security " +
806 $
"Holdings: {security.Holdings.Quantity} Account Currency Holdings: " +
807 $
"{algorithm.Portfolio.CashBook[algorithm.AccountCurrency].Amount}");
815 private void HandleSplitSymbols(
Splits newSplits, List<Split> splitWarnings)
817 foreach (var split
in newSplits.
Values)
823 Log.
Debug($
"AlgorithmManager.HandleSplitSymbols(): {_algorithm.Time} - Security split occurred: Split Factor: {split} Reference Price: {split.ReferencePrice}");
830 Log.
Debug($
"AlgorithmManager.HandleSplitSymbols(): {_algorithm.Time} - Security split warning: {split}");
833 if (!splitWarnings.Any(x => x.Symbol == split.Symbol && x.Type ==
SplitType.Warning))
835 splitWarnings.Add(split);
843 private void ProcessSplitSymbols(
IAlgorithm algorithm, List<Split> splitWarnings, List<Delisting> pendingDelistings)
849 for (
int i = splitWarnings.Count - 1; i >= 0; i--)
851 var split = splitWarnings[i];
852 var security = algorithm.
Securities[split.Symbol];
854 if (!security.IsTradable
857 Log.
Debug($
"AlgorithmManager.ProcessSplitSymbols(): {_algorithm.Time} - Removing split warning for {security.Symbol}");
860 splitWarnings.RemoveAt(i);
868 var nextMarketClose = security.Exchange.Hours.GetNextMarketClose(security.LocalTime,
false);
874 if (configs.Count == 0)
878 $
"AlgorithmManager.ProcessSplitSymbols(): {_algorithm.Time} - No subscriptions found for {security.Symbol}" +
879 $
", IsTradable: {security.IsTradable}" +
880 $
", Active: {algorithm.UniverseManager.ActiveSecurities.Keys.Contains(split.Symbol)}");
884 .RoundDownInTimeZone(configs.GetHighestResolution().ToTimeSpan(), security.Exchange.TimeZone, configs.First().DataTimeZone);
887 if (security.LocalTime < latestMarketOnCloseTimeRoundedDownByResolution)
continue;
891 potentialDerivate.Symbol.SecurityType.IsOption() &&
892 potentialDerivate.Symbol.Underlying == security.Symbol &&
893 !potentialDerivate.Symbol.Underlying.IsCanonical() &&
894 potentialDerivate.HoldStock
897 foreach (var derivative
in derivatives)
899 var optionContractSymbol = derivative.Symbol;
900 var optionContractSecurity = (
Option)derivative;
902 if (pendingDelistings.Any(x => x.Symbol == optionContractSymbol
903 && x.Time.Date == optionContractSecurity.LocalTime.Date))
910 algorithm.
Transactions.
CancelOpenOrders(optionContractSymbol,
"Canceled due to impending split. Separate MarketOnClose order submitted to liquidate position.");
913 -optionContractSecurity.Holdings.Quantity, 0, 0, algorithm.
UtcTime,
914 "Liquidated due to impending split. Option splits are not currently supported."
921 optionContractSecurity.IsTradable =
false;
923 algorithm.
Debug($
"MarketOnClose order submitted for option contract '{optionContractSymbol}' due to impending {split.Symbol.Value} split event. "
924 +
"Option splits are not currently supported.");
928 splitWarnings.RemoveAt(i);
935 private static void ApplySplitOrDividendToVolatilityModel(
IAlgorithm algorithm,
Security security,
bool liveMode,
941 algorithm.
TimeZone, liveMode, dataNormalizationMode);
956 && dataPointEndTime.Ticks % config.
Increment.Ticks == 0)
962 return dataPointEndTime == roundedDataPointEndTime;
971 if (controls ==
null)
979 Log.
Trace(
"AlgorithmManager.CreateTokenBucket(): Initializing LeakyBucket: " +
980 $
"Capacity: {controls.Capacity} " +
981 $
"RefillAmount: {controls.RefillAmount} " +
982 $
"TimeInterval: {controls.TimeIntervalMinutes}"