Lean  $LEAN_TAG$
LiveTradingResultHandler.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using System.Diagnostics;
20 using System.IO;
21 using System.Linq;
22 using System.Runtime.CompilerServices;
23 using System.Threading;
24 using System.Threading.Tasks;
25 using Newtonsoft.Json;
30 using QuantConnect.Logging;
32 using QuantConnect.Orders;
33 using QuantConnect.Packets;
37 using QuantConnect.Util;
38 
40 {
41  /// <summary>
42  /// Live trading result handler implementation passes the messages to the QC live trading interface.
43  /// </summary>
44  /// <remarks>Live trading result handler is quite busy. It sends constant price updates, equity updates and order/holdings updates.</remarks>
46  {
47  // Required properties for the cloud app.
48  private LiveNodePacket _job;
49 
50  //Update loop:
51  private DateTime _nextUpdate;
52  private DateTime _nextChartsUpdate;
53  private DateTime _nextChartTrimming;
54  private DateTime _nextLogStoreUpdate;
55  private DateTime _nextStatisticsUpdate;
56  private DateTime _nextInsightStoreUpdate;
57  private DateTime _currentUtcDate;
58 
59  private readonly TimeSpan _storeInsightPeriod;
60 
61  private DateTime _nextPortfolioMarginUpdate;
62  private DateTime _previousPortfolioMarginUpdate;
63  private readonly TimeSpan _samplePortfolioPeriod;
64  private readonly Chart _intradayPortfolioState = new(PortfolioMarginKey);
65 
66  /// <summary>
67  /// The earliest time of next dump to the status file
68  /// </summary>
69  private DateTime _nextStatusUpdate;
70 
71  //Log Message Store:
72  private DateTime _nextSample;
73  private IApi _api;
74  private readonly CancellationTokenSource _cancellationTokenSource;
75  private readonly int _streamedChartLimit;
76  private readonly int _streamedChartGroupSize;
77 
78  private bool _sampleChartAlways;
79  private bool _userExchangeIsOpen;
80  private ReferenceWrapper<decimal> _portfolioValue;
81  private ReferenceWrapper<decimal> _benchmarkValue;
82  private DateTime _lastChartSampleLogicCheck;
83  private readonly Dictionary<string, SecurityExchangeHours> _exchangeHours;
84 
85 
86  /// <summary>
87  /// Creates a new instance
88  /// </summary>
90  {
91  _exchangeHours = new Dictionary<string, SecurityExchangeHours>();
92  _cancellationTokenSource = new CancellationTokenSource();
93  ResamplePeriod = TimeSpan.FromSeconds(2);
94  NotificationPeriod = TimeSpan.FromSeconds(1);
95  _samplePortfolioPeriod = _storeInsightPeriod = TimeSpan.FromMinutes(10);
96  _streamedChartLimit = Config.GetInt("streamed-chart-limit", 12);
97  _streamedChartGroupSize = Config.GetInt("streamed-chart-group-size", 3);
98 
99  _portfolioValue = new ReferenceWrapper<decimal>(0);
100  _benchmarkValue = new ReferenceWrapper<decimal>(0);
101  }
102 
103  /// <summary>
104  /// Initialize the result handler with this result packet.
105  /// </summary>
106  /// <param name="parameters">DTO parameters class to initialize a result handler</param>
107  public override void Initialize(ResultHandlerInitializeParameters parameters)
108  {
109  _api = parameters.Api;
110  _job = (LiveNodePacket)parameters.Job;
111  if (_job == null) throw new Exception("LiveResultHandler.Constructor(): Submitted Job type invalid.");
112  var utcNow = DateTime.UtcNow;
113  _currentUtcDate = utcNow.Date;
114 
115  _nextPortfolioMarginUpdate = utcNow.RoundDown(_samplePortfolioPeriod).Add(_samplePortfolioPeriod);
116  base.Initialize(parameters);
117  }
118 
119  /// <summary>
120  /// Live trading result handler thread.
121  /// </summary>
122  protected override void Run()
123  {
124  // give the algorithm time to initialize, else we will log an error right away
125  ExitEvent.WaitOne(3000);
126 
127  // -> 1. Run Primary Sender Loop: Continually process messages from queue as soon as they arrive.
128  while (!(ExitTriggered && Messages.IsEmpty))
129  {
130  try
131  {
132  //1. Process Simple Messages in Queue
133  Packet packet;
134  if (Messages.TryDequeue(out packet))
135  {
136  MessagingHandler.Send(packet);
137  }
138 
139  //2. Update the packet scanner:
140  Update();
141 
142  if (Messages.IsEmpty)
143  {
144  // prevent thread lock/tight loop when there's no work to be done
145  ExitEvent.WaitOne(Time.GetSecondUnevenWait(1000));
146  }
147  }
148  catch (Exception err)
149  {
150  Log.Error(err);
151  }
152  } // While !End.
153 
154  Log.Trace("LiveTradingResultHandler.Run(): Ending Thread...");
155  } // End Run();
156 
157 
158  /// <summary>
159  /// Every so often send an update to the browser with the current state of the algorithm.
160  /// </summary>
161  private void Update()
162  {
163  //Error checks if the algorithm & threads have not loaded yet, or are closing down.
164  if (Algorithm?.Transactions == null || TransactionHandler.Orders == null || !Algorithm.GetLocked())
165  {
166  Log.Debug("LiveTradingResultHandler.Update(): Algorithm not yet initialized.");
167  ExitEvent.WaitOne(1000);
168  return;
169  }
170 
171  if (ExitTriggered)
172  {
173  return;
174  }
175 
176  var utcNow = DateTime.UtcNow;
177  if (utcNow > _nextUpdate)
178  {
179  try
180  {
181  Dictionary<int, Order> deltaOrders;
182  {
183  var stopwatch = Stopwatch.StartNew();
184  deltaOrders = GetDeltaOrders(LastDeltaOrderPosition, shouldStop: orderCount => stopwatch.ElapsedMilliseconds > 15);
185  }
186  var deltaOrderEvents = TransactionHandler.OrderEvents.Skip(LastDeltaOrderEventsPosition).Take(50).ToList();
187  LastDeltaOrderEventsPosition += deltaOrderEvents.Count;
188 
189  //Create and send back the changes in chart since the algorithm started.
190  var deltaCharts = new Dictionary<string, Chart>();
191  Log.Debug("LiveTradingResultHandler.Update(): Build delta charts");
192  var performanceCharts = new Dictionary<string, Chart>();
193  lock (ChartLock)
194  {
195  //Get the updates since the last chart
196  foreach (var chart in Charts)
197  {
198  var chartUpdates = chart.Value.GetUpdates();
199  // we only want to stream charts that have new updates
200  if (!chartUpdates.IsEmpty())
201  {
202  // remove directory pathing characters from chart names
203  var safeName = chart.Value.Name.Replace('/', '-');
204  DictionarySafeAdd(deltaCharts, safeName, chartUpdates, "deltaCharts");
205  }
206 
207  if (AlgorithmPerformanceCharts.Contains(chart.Key))
208  {
209  performanceCharts[chart.Key] = chart.Value.Clone();
210  }
211 
212  if (chartUpdates.Name == PortfolioMarginKey)
213  {
215  }
216  }
217  }
218  Log.Debug("LiveTradingResultHandler.Update(): End build delta charts");
219 
220  //Profit loss changes, get the banner statistics, summary information on the performance for the headers.
221  var serverStatistics = GetServerStatistics(utcNow);
222  var holdings = GetHoldings(Algorithm.Securities.Values, Algorithm.SubscriptionManager.SubscriptionDataConfigService);
223 
224  //Add the algorithm statistics first.
225  Log.Debug("LiveTradingResultHandler.Update(): Build run time stats");
226 
227  var summary = GenerateStatisticsResults(performanceCharts).Summary;
228  var runtimeStatistics = GetAlgorithmRuntimeStatistics(summary);
229  Log.Debug("LiveTradingResultHandler.Update(): End build run time stats");
230 
231 
232  // since we're sending multiple packets, let's do it async and forget about it
233  // chart data can get big so let's break them up into groups
234  var splitPackets = SplitPackets(deltaCharts, deltaOrders, holdings, Algorithm.Portfolio.CashBook, runtimeStatistics, serverStatistics, deltaOrderEvents);
235 
236  foreach (var liveResultPacket in splitPackets)
237  {
238  MessagingHandler.Send(liveResultPacket);
239  }
240 
241  //Send full packet to storage.
242  if (utcNow > _nextChartsUpdate)
243  {
244  Log.Debug("LiveTradingResultHandler.Update(): Pre-store result");
245  var chartComplete = new Dictionary<string, Chart>();
246  lock (ChartLock)
247  {
248  foreach (var chart in Charts)
249  {
250  // remove directory pathing characters from chart names
251  var safeName = chart.Value.Name.Replace('/', '-');
252  DictionarySafeAdd(chartComplete, safeName, chart.Value.Clone(), "chartComplete");
253  }
254  }
255 
256  var orderEvents = GetOrderEventsToStore();
257 
258  var deltaStatistics = new Dictionary<string, string>();
259  var orders = new Dictionary<int, Order>(TransactionHandler.Orders);
260  var complete = new LiveResultPacket(_job, new LiveResult(new LiveResultParameters(chartComplete, orders, Algorithm.Transactions.TransactionRecord, holdings, Algorithm.Portfolio.CashBook, deltaStatistics, runtimeStatistics, orderEvents, serverStatistics, state: GetAlgorithmState())));
261  StoreResult(complete);
262  _nextChartsUpdate = DateTime.UtcNow.Add(ChartUpdateInterval);
263  Log.Debug("LiveTradingResultHandler.Update(): End-store result");
264  }
265 
266  // Upload the logs every 1-2 minutes; this can be a heavy operation depending on amount of live logging and should probably be done asynchronously.
267  if (utcNow > _nextLogStoreUpdate)
268  {
269  List<LogEntry> logs;
270  Log.Debug("LiveTradingResultHandler.Update(): Storing log...");
271  lock (LogStore)
272  {
273  // we need a new container instance so we can store the logs outside the lock
274  logs = new List<LogEntry>(LogStore);
275  LogStore.Clear();
276  }
277  SaveLogs(AlgorithmId, logs);
278 
279  _nextLogStoreUpdate = DateTime.UtcNow.AddMinutes(2);
280  Log.Debug("LiveTradingResultHandler.Update(): Finished storing log");
281  }
282 
283  // Every minute send usage statistics:
284  if (utcNow > _nextStatisticsUpdate)
285  {
286  try
287  {
288  _api.SendStatistics(
289  _job.AlgorithmId,
295  GetNetReturn(),
297  TotalTradesCount(), 0);
298  }
299  catch (Exception err)
300  {
301  Log.Error(err, "Error sending statistics:");
302  }
303  _nextStatisticsUpdate = utcNow.AddMinutes(1);
304  }
305 
306  if (utcNow > _nextStatusUpdate)
307  {
308  var chartComplete = new Dictionary<string, Chart>();
309  lock (ChartLock)
310  {
311  foreach (var chart in Charts)
312  {
313  // remove directory pathing characters from chart names
314  var safeName = chart.Value.Name.Replace('/', '-');
315  DictionarySafeAdd(chartComplete, safeName, chart.Value.Clone(), "chartComplete");
316  }
317  }
318  StoreStatusFile(
319  runtimeStatistics,
320  // only store holdings we are invested in
321  holdings.Where(pair => pair.Value.Quantity != 0).ToDictionary(pair => pair.Key, pair => pair.Value),
322  chartComplete,
324  new SortedDictionary<DateTime, decimal>(Algorithm.Transactions.TransactionRecord),
325  serverStatistics);
326 
328  }
329 
330  if (_currentUtcDate != utcNow.Date)
331  {
332  StoreOrderEvents(_currentUtcDate, GetOrderEventsToStore());
333  // start storing in a new date file
334  _currentUtcDate = utcNow.Date;
335  }
336 
337  if (utcNow > _nextChartTrimming)
338  {
339  Log.Debug("LiveTradingResultHandler.Update(): Trimming charts");
340  var timeLimitUtc = utcNow.AddDays(-2);
341  lock (ChartLock)
342  {
343  foreach (var chart in Charts)
344  {
345  foreach (var series in chart.Value.Series)
346  {
347  // trim data that's older than 2 days
348  series.Value.Values =
349  (from v in series.Value.Values
350  where v.Time > timeLimitUtc
351  select v).ToList();
352  }
353  }
354  }
355  _nextChartTrimming = DateTime.UtcNow.AddMinutes(10);
356  Log.Debug("LiveTradingResultHandler.Update(): Finished trimming charts");
357  }
358 
359  if (utcNow > _nextInsightStoreUpdate)
360  {
361  StoreInsights();
362 
363  _nextInsightStoreUpdate = DateTime.UtcNow.Add(_storeInsightPeriod);
364  }
365  }
366  catch (Exception err)
367  {
368  Log.Error(err, "LiveTradingResultHandler().Update(): ", true);
369  }
370 
371  //Set the new update time after we've finished processing.
372  // The processing can takes time depending on how large the packets are.
373  _nextUpdate = DateTime.UtcNow.Add(MainUpdateInterval);
374  } // End Update Charts:
375  }
376 
377  /// <summary>
378  /// Assigns the next earliest status update time
379  /// </summary>
380  protected virtual void SetNextStatusUpdate()
381  {
382  // Update the status json file every X
383  _nextStatusUpdate = DateTime.UtcNow.AddMinutes(10);
384  }
385 
386  /// <summary>
387  /// Stores the order events
388  /// </summary>
389  /// <param name="utcTime">The utc date associated with these order events</param>
390  /// <param name="orderEvents">The order events to store</param>
391  protected override void StoreOrderEvents(DateTime utcTime, List<OrderEvent> orderEvents)
392  {
393  if (orderEvents.Count <= 0)
394  {
395  return;
396  }
397 
398  var filename = $"{AlgorithmId}-{utcTime:yyyy-MM-dd}-order-events.json";
399  var path = GetResultsPath(filename);
400 
401  var data = JsonConvert.SerializeObject(orderEvents, Formatting.None, SerializerSettings);
402 
403  File.WriteAllText(path, data);
404  }
405 
406  /// <summary>
407  /// Gets the order events generated in '_currentUtcDate'
408  /// </summary>
409  private List<OrderEvent> GetOrderEventsToStore()
410  {
411  return TransactionHandler.OrderEvents.Where(orderEvent => orderEvent.UtcTime >= _currentUtcDate).ToList();
412  }
413 
414  /// <summary>
415  /// Will store the complete status of the algorithm in a single json file
416  /// </summary>
417  /// <remarks>Will sample charts every 12 hours, 2 data points per day at maximum,
418  /// to reduce file size</remarks>
419  private void StoreStatusFile(SortedDictionary<string, string> runtimeStatistics,
420  Dictionary<string, Holding> holdings,
421  Dictionary<string, Chart> chartComplete,
422  Dictionary<string, string> algorithmState,
423  SortedDictionary<DateTime, decimal> profitLoss,
424  Dictionary<string, string> serverStatistics = null,
425  StatisticsResults statistics = null)
426  {
427  try
428  {
429  Log.Debug("LiveTradingResultHandler.Update(): status update start...");
430 
431  if (statistics == null)
432  {
433  statistics = GenerateStatisticsResults(chartComplete, profitLoss);
434  }
435 
436  // sample the entire charts with a 12 hours resolution
437  var dailySampler = new SeriesSampler(TimeSpan.FromHours(12));
438  chartComplete = dailySampler.SampleCharts(chartComplete, Time.Start, Time.EndOfTime);
439 
440  if (chartComplete.TryGetValue(PortfolioMarginKey, out var marginChart))
441  {
443  }
444 
445  var result = new LiveResult(new LiveResultParameters(chartComplete,
446  new Dictionary<int, Order>(TransactionHandler.Orders),
448  holdings,
449  Algorithm?.Portfolio.CashBook ?? new(),
450  statistics: statistics.Summary,
451  runtimeStatistics: runtimeStatistics,
452  orderEvents: null, // we stored order events separately
453  serverStatistics: serverStatistics,
454  state: algorithmState));
455 
456  SaveResults($"{AlgorithmId}.json", result);
457  Log.Debug("LiveTradingResultHandler.Update(): status update end.");
458  }
459  catch (Exception err)
460  {
461  Log.Error(err, "Error storing status update");
462  }
463  }
464 
465  /// <summary>
466  /// Run over all the data and break it into smaller packets to ensure they all arrive at the terminal
467  /// </summary>
468  private IEnumerable<LiveResultPacket> SplitPackets(Dictionary<string, Chart> deltaCharts,
469  Dictionary<int, Order> deltaOrders,
470  Dictionary<string, Holding> holdings,
471  CashBook cashbook,
472  SortedDictionary<string, string> runtimeStatistics,
473  Dictionary<string, string> serverStatistics,
474  List<OrderEvent> deltaOrderEvents)
475  {
476  // break the charts into groups
477  var current = new Dictionary<string, Chart>();
478  var chartPackets = new List<LiveResultPacket>();
479 
480  // First add send charts
481 
482  // Loop through all the charts, add them to packets to be sent.
483  // Group three charts per packet
484  foreach (var deltaChart in deltaCharts.Values)
485  {
486  current.Add(deltaChart.Name, deltaChart);
487 
488  if (current.Count >= _streamedChartGroupSize)
489  {
490  // Add the micro packet to transport.
491  chartPackets.Add(new LiveResultPacket(_job, new LiveResult { Charts = current }));
492 
493  // Reset the carrier variable.
494  current = new Dictionary<string, Chart>();
495  if (chartPackets.Count * _streamedChartGroupSize >= _streamedChartLimit)
496  {
497  // stream a maximum number of charts
498  break;
499  }
500  }
501  }
502 
503  // Add whatever is left over here too
504  // unless it is a wildcard subscription
505  if (current.Count > 0)
506  {
507  chartPackets.Add(new LiveResultPacket(_job, new LiveResult { Charts = current }));
508  }
509 
510  // these are easier to split up, not as big as the chart objects
511  var packets = new[]
512  {
513  new LiveResultPacket(_job, new LiveResult { Holdings = holdings, CashBook = cashbook}),
514  new LiveResultPacket(_job, new LiveResult
515  {
516  RuntimeStatistics = runtimeStatistics,
517  ServerStatistics = serverStatistics
518  })
519  };
520 
521  var result = packets.Concat(chartPackets);
522 
523  // only send order and order event packet if there is actually any update
524  if (deltaOrders.Count > 0 || deltaOrderEvents.Count > 0)
525  {
526  result = result.Concat(new[] { new LiveResultPacket(_job, new LiveResult { Orders = deltaOrders, OrderEvents = deltaOrderEvents }) });
527  }
528 
529  return result;
530  }
531 
532 
533  /// <summary>
534  /// Send a live trading debug message to the live console.
535  /// </summary>
536  /// <param name="message">Message we'd like shown in console.</param>
537  /// <remarks>When there are already 500 messages in the queue it stops adding new messages.</remarks>
538  public void DebugMessage(string message)
539  {
540  if (Messages.Count > 500) return; //if too many in the queue already skip the logging.
541  Messages.Enqueue(new DebugPacket(_job.ProjectId, AlgorithmId, CompileId, message));
542  AddToLogStore(message);
543  }
544 
545  /// <summary>
546  /// Send a live trading system debug message to the live console.
547  /// </summary>
548  /// <param name="message">Message we'd like shown in console.</param>
549  public void SystemDebugMessage(string message)
550  {
551  Messages.Enqueue(new SystemDebugPacket(_job.ProjectId, AlgorithmId, CompileId, message));
552  AddToLogStore(message);
553  }
554 
555 
556  /// <summary>
557  /// Log string messages and send them to the console.
558  /// </summary>
559  /// <param name="message">String message wed like logged.</param>
560  /// <remarks>When there are already 500 messages in the queue it stops adding new messages.</remarks>
561  public void LogMessage(string message)
562  {
563  //Send the logging messages out immediately for live trading:
564  if (Messages.Count > 500) return;
565  Messages.Enqueue(new LogPacket(AlgorithmId, message));
566  AddToLogStore(message);
567  }
568 
569  /// <summary>
570  /// Save an algorithm message to the log store. Uses a different timestamped method of adding messaging to interweve debug and logging messages.
571  /// </summary>
572  /// <param name="message">String message to send to browser.</param>
573  protected override void AddToLogStore(string message)
574  {
575  Log.Debug("LiveTradingResultHandler.AddToLogStore(): Adding");
576  base.AddToLogStore(DateTime.Now.ToStringInvariant(DateFormat.UI) + " " + message);
577  Log.Debug("LiveTradingResultHandler.AddToLogStore(): Finished adding");
578  }
579 
580  /// <summary>
581  /// Send an error message back to the browser console and highlight it read.
582  /// </summary>
583  /// <param name="message">Message we'd like shown in console.</param>
584  /// <param name="stacktrace">Stacktrace to show in the console.</param>
585  public void ErrorMessage(string message, string stacktrace = "")
586  {
587  if (Messages.Count > 500) return;
588  Messages.Enqueue(new HandledErrorPacket(AlgorithmId, message, stacktrace));
589  AddToLogStore(message + (!string.IsNullOrEmpty(stacktrace) ? ": StackTrace: " + stacktrace : string.Empty));
590  }
591 
592  /// <summary>
593  /// Send a list of secutity types that the algorithm trades to the browser to show the market clock - is this market open or closed!
594  /// </summary>
595  /// <param name="types">List of security types</param>
596  public void SecurityType(List<SecurityType> types)
597  {
598  var packet = new SecurityTypesPacket { Types = types };
599  Messages.Enqueue(packet);
600  }
601 
602  /// <summary>
603  /// Send a runtime error back to the users browser and highlight it red.
604  /// </summary>
605  /// <param name="message">Runtime error message</param>
606  /// <param name="stacktrace">Associated error stack trace.</param>
607  public virtual void RuntimeError(string message, string stacktrace = "")
608  {
609  Messages.Enqueue(new RuntimeErrorPacket(_job.UserId, AlgorithmId, message, stacktrace));
610  AddToLogStore(message + (!string.IsNullOrEmpty(stacktrace) ? ": StackTrace: " + stacktrace : string.Empty));
611  SetAlgorithmState(message, stacktrace);
612  }
613 
614  /// <summary>
615  /// Process brokerage message events
616  /// </summary>
617  /// <param name="brokerageMessageEvent">The brokerage message event</param>
618  public virtual void BrokerageMessage(BrokerageMessageEvent brokerageMessageEvent)
619  {
620  // NOP
621  }
622 
623  /// <summary>
624  /// Add a sample to the chart specified by the chartName, and seriesName.
625  /// </summary>
626  /// <param name="chartName">String chart name to place the sample.</param>
627  /// <param name="seriesName">Series name for the chart.</param>
628  /// <param name="seriesIndex">Series chart index - which chart should this series belong</param>
629  /// <param name="seriesType">Series type for the chart.</param>
630  /// <param name="value">Value for the chart sample.</param>
631  /// <param name="unit">Unit for the chart axis</param>
632  /// <remarks>Sample can be used to create new charts or sample equity - daily performance.</remarks>
633  protected override void Sample(string chartName, string seriesName, int seriesIndex, SeriesType seriesType, ISeriesPoint value,
634  string unit = "$")
635  {
636  // Sampling during warming up period skews statistics
638  {
639  return;
640  }
641 
642  Log.Debug("LiveTradingResultHandler.Sample(): Sampling " + chartName + "." + seriesName);
643  lock (ChartLock)
644  {
645  //Add a copy locally:
646  if (!Charts.TryGetValue(chartName, out var chart))
647  {
648  Charts.AddOrUpdate(chartName, new Chart(chartName));
649  chart = Charts[chartName];
650  }
651 
652  //Add the sample to our chart:
653  if (!chart.Series.TryGetValue(seriesName, out var series))
654  {
655  series = BaseSeries.Create(seriesType, seriesName, seriesIndex, unit);
656  chart.Series.Add(seriesName, series);
657  }
658 
659  //Add our value:
660  series.Values.Add(value);
661  }
662  Log.Debug("LiveTradingResultHandler.Sample(): Done sampling " + chartName + "." + seriesName);
663  }
664 
665  /// <summary>
666  /// Add a range of samples from the users algorithms to the end of our current list.
667  /// </summary>
668  /// <param name="updates">Chart updates since the last request.</param>
669  /// <seealso cref="Sample(string,string,int,SeriesType,ISeriesPoint,string)"/>
670  protected void SampleRange(IEnumerable<Chart> updates)
671  {
672  Log.Debug("LiveTradingResultHandler.SampleRange(): Begin sampling");
673  lock (ChartLock)
674  {
675  foreach (var update in updates)
676  {
677  //Create the chart if it doesn't exist already:
678  Chart chart;
679  if (!Charts.TryGetValue(update.Name, out chart))
680  {
681  chart = new Chart(update.Name);
682  Charts.AddOrUpdate(update.Name, chart);
683  }
684 
685  //Add these samples to this chart.
686  foreach (BaseSeries series in update.Series.Values)
687  {
688  if (series.Values.Count > 0)
689  {
690  var thisSeries = chart.TryAddAndGetSeries(series.Name, series, forceAddNew: false);
691  if (series.SeriesType == SeriesType.Pie)
692  {
693  var dataPoint = series.ConsolidateChartPoints();
694  if (dataPoint != null)
695  {
696  thisSeries.AddPoint(dataPoint);
697  }
698  }
699  else
700  {
701  //We already have this record, so just the new samples to the end:
702  thisSeries.Values.AddRange(series.Values);
703  }
704  }
705  }
706  }
707  }
708  Log.Debug("LiveTradingResultHandler.SampleRange(): Finished sampling");
709  }
710 
711  /// <summary>
712  /// Set the algorithm of the result handler after its been initialized.
713  /// </summary>
714  /// <param name="algorithm">Algorithm object matching IAlgorithm interface</param>
715  /// <param name="startingPortfolioValue">Algorithm starting capital for statistics calculations</param>
716  public virtual void SetAlgorithm(IAlgorithm algorithm, decimal startingPortfolioValue)
717  {
718  Algorithm = algorithm;
720  DailyPortfolioValue = StartingPortfolioValue = startingPortfolioValue;
721  _portfolioValue = new ReferenceWrapper<decimal>(startingPortfolioValue);
724 
725  var types = new List<SecurityType>();
726  foreach (var kvp in Algorithm.Securities)
727  {
728  var security = kvp.Value;
729 
730  if (!types.Contains(security.Type)) types.Add(security.Type);
731  }
732  SecurityType(types);
733 
734  // we need to forward Console.Write messages to the algorithm's Debug function
735  var debug = new FuncTextWriter(algorithm.Debug);
736  var error = new FuncTextWriter(algorithm.Error);
737  Console.SetOut(debug);
738  Console.SetError(error);
739 
740  UpdateAlgorithmStatus();
741 
742  // Wire algorithm name and tags updates
743  algorithm.NameUpdated += (sender, name) => AlgorithmNameUpdated(name);
744  algorithm.TagsUpdated += (sender, tags) => AlgorithmTagsUpdated(tags);
745  }
746 
747 
748  /// <summary>
749  /// Send a algorithm status update to the user of the algorithms running state.
750  /// </summary>
751  /// <param name="status">Status enum of the algorithm.</param>
752  /// <param name="message">Optional string message describing reason for status change.</param>
753  public void SendStatusUpdate(AlgorithmStatus status, string message = "")
754  {
755  Log.Trace($"LiveTradingResultHandler.SendStatusUpdate(): status: '{status}'. {(string.IsNullOrEmpty(message) ? string.Empty : " " + message)}");
756  var packet = new AlgorithmStatusPacket(_job.AlgorithmId, _job.ProjectId, status, message);
757  Messages.Enqueue(packet);
758  }
759 
760 
761  /// <summary>
762  /// Set a dynamic runtime statistic to show in the (live) algorithm header
763  /// </summary>
764  /// <param name="key">Runtime headline statistic name</param>
765  /// <param name="value">Runtime headline statistic value</param>
766  public void RuntimeStatistic(string key, string value)
767  {
768  Log.Debug("LiveTradingResultHandler.RuntimeStatistic(): Begin setting statistic");
769  lock (RuntimeStatistics)
770  {
771  if (!RuntimeStatistics.ContainsKey(key))
772  {
773  RuntimeStatistics.Add(key, value);
774  }
775  RuntimeStatistics[key] = value;
776  }
777  Log.Debug("LiveTradingResultHandler.RuntimeStatistic(): End setting statistic");
778  }
779 
780  /// <summary>
781  /// Send a final analysis result back to the IDE.
782  /// </summary>
783  protected void SendFinalResult()
784  {
785  Log.Trace("LiveTradingResultHandler.SendFinalResult(): Starting...");
786  try
787  {
788  var endTime = DateTime.UtcNow;
789  var endState = GetAlgorithmState(endTime);
790  LiveResultPacket result;
791  // could happen if algorithm failed to init
792  if (Algorithm != null)
793  {
794  //Convert local dictionary:
795  var charts = new Dictionary<string, Chart>();
796  lock (ChartLock)
797  {
798  foreach (var kvp in Charts)
799  {
800  charts.Add(kvp.Key, kvp.Value.Clone());
801  }
802  }
803 
804  var orders = new Dictionary<int, Order>(TransactionHandler.Orders);
805  var profitLoss = new SortedDictionary<DateTime, decimal>(Algorithm.Transactions.TransactionRecord);
806  var holdings = GetHoldings(Algorithm.Securities.Values, Algorithm.SubscriptionManager.SubscriptionDataConfigService, onlyInvested: true);
807  var statisticsResults = GenerateStatisticsResults(charts, profitLoss);
808  var runtime = GetAlgorithmRuntimeStatistics(statisticsResults.Summary);
809 
810  StoreStatusFile(runtime, holdings, charts, endState, profitLoss, statistics: statisticsResults);
811 
812  //Create a packet:
813  result = new LiveResultPacket(_job,
814  new LiveResult(new LiveResultParameters(charts, orders, profitLoss, new Dictionary<string, Holding>(),
815  Algorithm.Portfolio.CashBook, statisticsResults.Summary, runtime, GetOrderEventsToStore(),
816  algorithmConfiguration: AlgorithmConfiguration.Create(Algorithm, null), state: endState)));
817  }
818  else
819  {
820  StoreStatusFile(new(), new(), new(), endState, new());
821 
822  result = LiveResultPacket.CreateEmpty(_job);
823  result.Results.State = endState;
824  }
825 
826  StoreInsights();
827 
828  //Store to S3:
829  StoreResult(result);
830  Log.Trace("LiveTradingResultHandler.SendFinalResult(): Finished storing results. Start sending...");
831  //Truncate packet to fit within 32kb:
832  result.Results = new LiveResult();
833 
834  //Send the truncated packet:
835  MessagingHandler.Send(result);
836  }
837  catch (Exception err)
838  {
839  Log.Error(err);
840  }
841  Log.Trace("LiveTradingResultHandler.SendFinalResult(): Ended");
842  }
843 
844  /// <summary>
845  /// Process the log entries and save it to permanent storage
846  /// </summary>
847  /// <param name="id">Id that will be incorporated into the algorithm log name</param>
848  /// <param name="logs">Log list</param>
849  /// <returns>Returns the location of the logs</returns>
850  public override string SaveLogs(string id, List<LogEntry> logs)
851  {
852  try
853  {
854  var logLines = logs.Select(x => x.Message);
855  var filename = $"{id}-log.txt";
856  var path = GetResultsPath(filename);
857  File.AppendAllLines(path, logLines);
858  return path;
859  }
860  catch (Exception err)
861  {
862  Log.Error(err);
863  }
864  return "";
865  }
866 
867  /// <summary>
868  /// Save the snapshot of the total results to storage.
869  /// </summary>
870  /// <param name="packet">Packet to store.</param>
871  protected override void StoreResult(Packet packet)
872  {
873  try
874  {
875  Log.Debug("LiveTradingResultHandler.StoreResult(): Begin store result sampling");
876 
877  // Make sure this is the right type of packet:
878  if (packet.Type != PacketType.LiveResult) return;
879 
880  // Port to packet format:
881  var live = packet as LiveResultPacket;
882 
883  if (live != null)
884  {
885  if (live.Results.OrderEvents != null)
886  {
887  // we store order events separately
888  StoreOrderEvents(_currentUtcDate, live.Results.OrderEvents);
889  // lets null the orders events so that they aren't stored again and generate a giant file
890  live.Results.OrderEvents = null;
891  }
892 
893  // we need to down sample
894  var start = DateTime.UtcNow.Date;
895  var stop = start.AddDays(1);
896 
897  // truncate to just today, we don't need more than this for anyone
898  Truncate(live.Results, start, stop);
899 
900  var highResolutionCharts = new Dictionary<string, Chart>(live.Results.Charts);
901 
902  // minute resolution data, save today
903  var minuteSampler = new SeriesSampler(TimeSpan.FromMinutes(1));
904  var minuteCharts = minuteSampler.SampleCharts(live.Results.Charts, start, stop);
905 
906  // swap out our charts with the sampled data
907  minuteCharts.Remove(PortfolioMarginKey);
908  live.Results.Charts = minuteCharts;
909  SaveResults(CreateKey("minute"), live.Results);
910 
911  // 10 minute resolution data, save today
912  var tenminuteSampler = new SeriesSampler(TimeSpan.FromMinutes(10));
913  var tenminuteCharts = tenminuteSampler.SampleCharts(live.Results.Charts, start, stop);
914  lock (_intradayPortfolioState)
915  {
916  var clone = _intradayPortfolioState.Clone();
918  tenminuteCharts[PortfolioMarginKey] = clone;
919  }
920 
921  live.Results.Charts = tenminuteCharts;
922  SaveResults(CreateKey("10minute"), live.Results);
923 
924  // high resolution data, we only want to save an hour
925  highResolutionCharts.Remove(PortfolioMarginKey);
926  live.Results.Charts = highResolutionCharts;
927  start = DateTime.UtcNow.RoundDown(TimeSpan.FromHours(1));
928  stop = DateTime.UtcNow.RoundUp(TimeSpan.FromHours(1));
929 
930  Truncate(live.Results, start, stop);
931 
932  foreach (var name in live.Results.Charts.Keys)
933  {
934  var result = new LiveResult
935  {
936  Orders = new Dictionary<int, Order>(live.Results.Orders),
937  Holdings = new Dictionary<string, Holding>(live.Results.Holdings),
938  Charts = new Dictionary<string, Chart> { { name, live.Results.Charts[name] } }
939  };
940 
941  SaveResults(CreateKey("second_" + CreateSafeChartName(name), "yyyy-MM-dd-HH"), result);
942  }
943  }
944  else
945  {
946  Log.Error("LiveResultHandler.StoreResult(): Result Null.");
947  }
948 
949  Log.Debug("LiveTradingResultHandler.StoreResult(): End store result sampling");
950  }
951  catch (Exception err)
952  {
953  Log.Error(err);
954  }
955  }
956 
957  /// <summary>
958  /// New order event for the algorithm
959  /// </summary>
960  /// <param name="newEvent">New event details</param>
961  public override void OrderEvent(OrderEvent newEvent)
962  {
963  var brokerIds = string.Empty;
964  var order = TransactionHandler.GetOrderById(newEvent.OrderId);
965  if (order != null && order.BrokerId.Count > 0) brokerIds = string.Join(", ", order.BrokerId);
966 
967  //Send the message to frontend as packet:
968  Log.Trace("LiveTradingResultHandler.OrderEvent(): " + newEvent + " BrokerId: " + brokerIds, true);
969  Messages.Enqueue(new OrderEventPacket(AlgorithmId, newEvent));
970 
971  var message = "New Order Event: " + newEvent;
972  DebugMessage(message);
973  }
974 
975  /// <summary>
976  /// Terminate the result thread and apply any required exit procedures like sending final results
977  /// </summary>
978  public override void Exit()
979  {
980  if (!ExitTriggered)
981  {
982  _cancellationTokenSource.Cancel();
983 
984  if (Algorithm != null)
985  {
986  // first process synchronous events so we add any new message or log
988  }
989 
990  // Set exit flag, update task will send any message before stopping
991  ExitTriggered = true;
992  ExitEvent.Set();
993 
994  lock (LogStore)
995  {
997  LogStore.Clear();
998  }
999 
1000  StopUpdateRunner();
1001 
1002  SendFinalResult();
1003 
1004  base.Exit();
1005 
1006  _cancellationTokenSource.DisposeSafely();
1007  }
1008  }
1009 
1010  /// <summary>
1011  /// Truncates the chart and order data in the result packet to within the specified time frame
1012  /// </summary>
1013  private static void Truncate(LiveResult result, DateTime start, DateTime stop)
1014  {
1015  //Log.Trace("LiveTradingResultHandler.Truncate: Start: " + start.ToString("u") + " Stop : " + stop.ToString("u"));
1016  //Log.Trace("LiveTradingResultHandler.Truncate: Truncate Delta: " + (unixDateStop - unixDateStart) + " Incoming Points: " + result.Charts["Strategy Equity"].Series["Equity"].Values.Count);
1017 
1018  var charts = new Dictionary<string, Chart>();
1019  foreach (var kvp in result.Charts)
1020  {
1021  var chart = kvp.Value;
1022  var newChart = new Chart(chart.Name);
1023  charts.Add(kvp.Key, newChart);
1024  foreach (var series in chart.Series.Values)
1025  {
1026  var newSeries = series.Clone(empty: true);
1027  newSeries.Values.AddRange(series.Values.Where(chartPoint => chartPoint.Time >= start && chartPoint.Time <= stop));
1028  newChart.AddSeries(newSeries);
1029  }
1030  }
1031  result.Charts = charts;
1032  result.Orders = result.Orders.Values.Where(x =>
1033  (x.Time >= start && x.Time <= stop) ||
1034  (x.LastFillTime != null && x.LastFillTime >= start && x.LastFillTime <= stop) ||
1035  (x.LastUpdateTime != null && x.LastUpdateTime >= start && x.LastUpdateTime <= stop)
1036  ).ToDictionary(x => x.Id);
1037 
1038  //Log.Trace("LiveTradingResultHandler.Truncate: Truncate Outgoing: " + result.Charts["Strategy Equity"].Series["Equity"].Values.Count);
1039  }
1040 
1041  private string CreateKey(string suffix, string dateFormat = "yyyy-MM-dd")
1042  {
1043  return $"{AlgorithmId}-{DateTime.UtcNow.ToStringInvariant(dateFormat)}_{suffix}.json";
1044  }
1045 
1046  /// <summary>
1047  /// Escape the chartname so that it can be saved to a file system
1048  /// </summary>
1049  /// <param name="chartName">The name of a chart</param>
1050  /// <returns>The name of the chart will all escape all characters except RFC 2396 unreserved characters</returns>
1051  protected virtual string CreateSafeChartName(string chartName)
1052  {
1053  return Uri.EscapeDataString(chartName);
1054  }
1055 
1056  /// <summary>
1057  /// Process the synchronous result events, sampling and message reading.
1058  /// This method is triggered from the algorithm manager thread.
1059  /// </summary>
1060  /// <remarks>Prime candidate for putting into a base class. Is identical across all result handlers.</remarks>
1061  public virtual void ProcessSynchronousEvents(bool forceProcess = false)
1062  {
1063  var time = DateTime.UtcNow;
1064 
1065  // Check to see if we should update stored portfolio values
1066  UpdatePortfolioValue(time, forceProcess);
1067 
1068  // Update the equity bar
1070 
1071  if (time > _nextPortfolioMarginUpdate || forceProcess)
1072  {
1073  _nextPortfolioMarginUpdate = time.RoundDown(_samplePortfolioPeriod).Add(_samplePortfolioPeriod);
1074 
1075  var newState = PortfolioState.Create(Algorithm.Portfolio, time, GetPortfolioValue());
1076  lock (_intradayPortfolioState)
1077  {
1078  if (_previousPortfolioMarginUpdate.Date != time.Date)
1079  {
1080  // we crossed into a new day
1081  _previousPortfolioMarginUpdate = time.Date;
1082  _intradayPortfolioState.Series.Clear();
1083  }
1084 
1085  if (newState != null)
1086  {
1087  PortfolioMarginChart.AddSample(_intradayPortfolioState, newState, MapFileProvider, time);
1088  }
1089  }
1090  }
1091 
1092  if (time > _nextSample || forceProcess)
1093  {
1094  Log.Debug("LiveTradingResultHandler.ProcessSynchronousEvents(): Enter");
1095 
1096  //Set next sample time: 4000 samples per backtest
1097  _nextSample = time.Add(ResamplePeriod);
1098 
1099  // Check to see if we should update stored bench values
1100  UpdateBenchmarkValue(time, forceProcess);
1101 
1102  //Sample the portfolio value over time for chart.
1103  SampleEquity(time);
1104 
1105  //Also add the user samples / plots to the result handler tracking:
1107  }
1108 
1109  ProcessAlgorithmLogs(messageQueueLimit: 500);
1110 
1111  //Set the running statistics:
1112  foreach (var pair in Algorithm.RuntimeStatistics)
1113  {
1114  RuntimeStatistic(pair.Key, pair.Value);
1115  }
1116 
1117  //Send all the notification messages but timeout within a second, or if this is a force process, wait till its done.
1118  var timeout = DateTime.UtcNow.AddSeconds(1);
1119  while (!Algorithm.Notify.Messages.IsEmpty && (DateTime.UtcNow < timeout || forceProcess))
1120  {
1121  Notification message;
1122  if (Algorithm.Notify.Messages.TryDequeue(out message))
1123  {
1124  //Process the notification messages:
1125  Log.Trace("LiveTradingResultHandler.ProcessSynchronousEvents(): Processing Notification...");
1126  try
1127  {
1129  }
1130  catch (Exception err)
1131  {
1132  Algorithm.Debug(err.Message);
1133  Log.Error(err, "Sending notification: " + message.GetType().FullName);
1134  }
1135  }
1136  }
1137 
1138  Log.Debug("LiveTradingResultHandler.ProcessSynchronousEvents(): Exit");
1139  }
1140 
1141  /// <summary>
1142  /// Event fired each time that we add/remove securities from the data feed.
1143  /// On Security change we re determine when should we sample charts, if the user added Crypto, Forex or an extended market hours subscription
1144  /// we will always sample charts. Else, we will keep the exchange per market to query later on demand
1145  /// </summary>
1146  public override void OnSecuritiesChanged(SecurityChanges changes)
1147  {
1148  if (_sampleChartAlways)
1149  {
1150  return;
1151  }
1152  foreach (var securityChange in changes.AddedSecurities)
1153  {
1154  var symbol = securityChange.Symbol;
1155  if (symbol.SecurityType == QuantConnect.SecurityType.Base)
1156  {
1157  // ignore custom data
1158  continue;
1159  }
1160 
1161  // if the user added Crypto, Forex, Daily or an extended market hours subscription just sample always, one way trip.
1162  _sampleChartAlways = symbol.SecurityType == QuantConnect.SecurityType.Crypto
1163  || symbol.SecurityType == QuantConnect.SecurityType.Forex
1164  || Algorithm.SubscriptionManager.SubscriptionDataConfigService.GetSubscriptionDataConfigs(symbol)
1165  .Any(config => config.ExtendedMarketHours || config.Resolution == Resolution.Daily);
1166  if (_sampleChartAlways)
1167  {
1168  // we set it once to true
1169  return;
1170  }
1171 
1172  if (!_exchangeHours.ContainsKey(securityChange.Symbol.ID.Market))
1173  {
1174  // per market we keep track of the exchange hours
1175  _exchangeHours[securityChange.Symbol.ID.Market] = securityChange.Exchange.Hours;
1176  }
1177  }
1178  }
1179 
1180  /// <summary>
1181  /// Samples portfolio equity, benchmark, and daily performance
1182  /// </summary>
1183  /// <param name="time">Current UTC time in the AlgorithmManager loop</param>
1184  public void Sample(DateTime time)
1185  {
1186  // Force an update for our values before doing our daily sample
1187  UpdatePortfolioValue(time);
1188  UpdateBenchmarkValue(time);
1189  base.Sample(time);
1190  }
1191 
1192  /// <summary>
1193  /// Gets the current portfolio value
1194  /// </summary>
1195  /// <remarks>Useful so that live trading implementation can freeze the returned value if there is no user exchange open
1196  /// so we ignore extended market hours updates</remarks>
1197  protected override decimal GetPortfolioValue()
1198  {
1199  return _portfolioValue.Value;
1200  }
1201 
1202  /// <summary>
1203  /// Gets the current benchmark value
1204  /// </summary>
1205  /// <remarks>Useful so that live trading implementation can freeze the returned value if there is no user exchange open
1206  /// so we ignore extended market hours updates</remarks>
1207  /// <param name="time">Time to resolve benchmark value at</param>
1208  protected override decimal GetBenchmarkValue(DateTime time)
1209  {
1210  return _benchmarkValue.Value;
1211  }
1212 
1213  /// <summary>
1214  /// True if user exchange are open and we should update portfolio and benchmark value
1215  /// </summary>
1216  /// <remarks>Useful so that live trading implementation can freeze the returned value if there is no user exchange open
1217  /// so we ignore extended market hours updates</remarks>
1218  [MethodImpl(MethodImplOptions.AggressiveInlining)]
1219  private bool UserExchangeIsOpen(DateTime utcDateTime)
1220  {
1221  if (_sampleChartAlways || _exchangeHours.Count == 0)
1222  {
1223  return true;
1224  }
1225 
1226  if (_lastChartSampleLogicCheck.Day == utcDateTime.Day
1227  && _lastChartSampleLogicCheck.Hour == utcDateTime.Hour
1228  && _lastChartSampleLogicCheck.Minute == utcDateTime.Minute)
1229  {
1230  // we cache the value for a minute
1231  return _userExchangeIsOpen;
1232  }
1233  _lastChartSampleLogicCheck = utcDateTime;
1234 
1235  foreach (var exchangeHour in _exchangeHours.Values)
1236  {
1237  if (exchangeHour.IsOpen(utcDateTime.ConvertFromUtc(exchangeHour.TimeZone), false))
1238  {
1239  // one of the users exchanges is open
1240  _userExchangeIsOpen = true;
1241  return true;
1242  }
1243  }
1244 
1245  // no user exchange is open
1246  _userExchangeIsOpen = false;
1247  return false;
1248  }
1249 
1250  private static void DictionarySafeAdd<T>(Dictionary<string, T> dictionary, string key, T value, string dictionaryName)
1251  {
1252  if (!dictionary.TryAdd(key, value))
1253  {
1254  Log.Error($"LiveTradingResultHandler.DictionarySafeAdd(): dictionary {dictionaryName} already contains key {key}");
1255  }
1256  }
1257 
1258  /// <summary>
1259  /// Will launch a task which will call the API and update the algorithm status every minute
1260  /// </summary>
1261  private void UpdateAlgorithmStatus()
1262  {
1263  if (!ExitTriggered
1264  && !_cancellationTokenSource.IsCancellationRequested) // just in case
1265  {
1266  // wait until after we're warmed up to start sending running status each minute
1267  if (!Algorithm.IsWarmingUp)
1268  {
1269  _api.SetAlgorithmStatus(_job.AlgorithmId, AlgorithmStatus.Running);
1270  }
1271  Task.Delay(TimeSpan.FromMinutes(1), _cancellationTokenSource.Token).ContinueWith(_ => UpdateAlgorithmStatus());
1272  }
1273  }
1274 
1275  [MethodImpl(MethodImplOptions.AggressiveInlining)]
1276  private void UpdateBenchmarkValue(DateTime time, bool force = false)
1277  {
1278  if (force || UserExchangeIsOpen(time))
1279  {
1280  _benchmarkValue = new ReferenceWrapper<decimal>(base.GetBenchmarkValue(time));
1281  }
1282  }
1283 
1284  [MethodImpl(MethodImplOptions.AggressiveInlining)]
1285  private void UpdatePortfolioValue(DateTime time, bool force = false)
1286  {
1287  if (force || UserExchangeIsOpen(time))
1288  {
1289  _portfolioValue = new ReferenceWrapper<decimal>(base.GetPortfolioValue());
1290  }
1291  }
1292 
1293  /// <summary>
1294  /// Helper method to fetch the algorithm holdings
1295  /// </summary>
1296  public static Dictionary<string, Holding> GetHoldings(IEnumerable<Security> securities, ISubscriptionDataConfigService subscriptionDataConfigService, bool onlyInvested = false)
1297  {
1298  var holdings = new Dictionary<string, Holding>();
1299 
1300  foreach (var security in securities
1301  // If we are invested we send it always, if not, we send non internal, non canonical and tradable securities. When securities are removed they are marked as non tradable.
1302  .Where(s => s.Invested || !onlyInvested && (!s.IsInternalFeed() && s.IsTradable && !s.Symbol.IsCanonical()
1303  // Continuous futures are different because it's mapped securities are internal and the continuous contract is canonical and non tradable but we want to send them anyways
1304  // but we don't want to sent non canonical, non tradable futures, these would be the future chain assets, or continuous mapped contracts that have been removed
1305  || s.Symbol.SecurityType == QuantConnect.SecurityType.Future && (s.IsTradable || s.Symbol.IsCanonical() && subscriptionDataConfigService.GetSubscriptionDataConfigs(s.Symbol).Any())))
1306  .OrderBy(x => x.Symbol.Value))
1307  {
1308  DictionarySafeAdd(holdings, security.Symbol.ID.ToString(), new Holding(security), "holdings");
1309  }
1310 
1311  return holdings;
1312  }
1313 
1314  /// <summary>
1315  /// Calculates and gets the current statistics for the algorithm
1316  /// </summary>
1317  /// <returns>The current statistics</returns>
1319  {
1320  return GenerateStatisticsResults();
1321  }
1322 
1323  /// <summary>
1324  /// Sets or updates a custom summary statistic
1325  /// </summary>
1326  /// <param name="name">The statistic name</param>
1327  /// <param name="value">The statistic value</param>
1328  public void SetSummaryStatistic(string name, string value)
1329  {
1330  SummaryStatistic(name, value);
1331  }
1332 
1333  /// <summary>
1334  /// Handles updates to the algorithm's name
1335  /// </summary>
1336  /// <param name="name">The new name</param>
1337  public virtual void AlgorithmNameUpdated(string name)
1338  {
1339  Messages.Enqueue(new AlgorithmNameUpdatePacket(AlgorithmId, name));
1340  }
1341 
1342  /// <summary>
1343  /// Handles updates to the algorithm's tags
1344  /// </summary>
1345  /// <param name="tags">The new tags</param>
1346  public virtual void AlgorithmTagsUpdated(HashSet<string> tags)
1347  {
1348  Messages.Enqueue(new AlgorithmTagsUpdatePacket(AlgorithmId, tags));
1349  }
1350  }
1351 }