Lean  $LEAN_TAG$
Isolator.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Threading;
18 using System.Threading.Tasks;
19 using QuantConnect.Logging;
20 using QuantConnect.Util;
21 using static QuantConnect.StringExtensions;
22 
23 namespace QuantConnect
24 {
25  /// <summary>
26  /// Isolator class - create a new instance of the algorithm and ensure it doesn't
27  /// exceed memory or time execution limits.
28  /// </summary>
29  public class Isolator
30  {
31  /// <summary>
32  /// Algo cancellation controls - cancel source.
33  /// </summary>
35  {
36  get; private set;
37  }
38 
39  /// <summary>
40  /// Algo cancellation controls - cancellation token for algorithm thread.
41  /// </summary>
43  {
44  get { return CancellationTokenSource.Token; }
45  }
46 
47  /// <summary>
48  /// Check if this task isolator is cancelled, and exit the analysis
49  /// </summary>
50  public bool IsCancellationRequested
51  {
52  get { return CancellationTokenSource.IsCancellationRequested; }
53  }
54 
55  /// <summary>
56  /// Initializes a new instance of the <see cref="Isolator"/> class
57  /// </summary>
58  public Isolator()
59  {
61  }
62 
63  /// <summary>
64  /// Execute a code block with a maximum limit on time and memory.
65  /// </summary>
66  /// <param name="timeSpan">Timeout in timespan</param>
67  /// <param name="withinCustomLimits">Function used to determine if the codeBlock is within custom limits, such as with algorithm manager
68  /// timing individual time loops, return a non-null and non-empty string with a message indicating the error/reason for stoppage</param>
69  /// <param name="codeBlock">Action codeblock to execute</param>
70  /// <param name="memoryCap">Maximum memory allocation, default 1024Mb</param>
71  /// <param name="sleepIntervalMillis">Sleep interval between each check in ms</param>
72  /// <param name="workerThread">The worker thread instance that will execute the provided action, if null
73  /// will use a <see cref="Task"/></param>
74  /// <returns>True if algorithm exited successfully, false if cancelled because it exceeded limits.</returns>
75  public bool ExecuteWithTimeLimit(TimeSpan timeSpan, Func<IsolatorLimitResult> withinCustomLimits, Action codeBlock, long memoryCap = 1024, int sleepIntervalMillis = 1000, WorkerThread workerThread = null)
76  {
77  workerThread?.Add(codeBlock);
78 
79  var task = workerThread == null
80  //Launch task
81  ? Task.Factory.StartNew(codeBlock, CancellationTokenSource.Token)
82  // wrapper task so we can reuse MonitorTask
83  : Task.Factory.StartNew(() => workerThread.FinishedWorkItem.WaitOne(), CancellationTokenSource.Token);
84  try
85  {
86  return MonitorTask(task, timeSpan, withinCustomLimits, memoryCap, sleepIntervalMillis);
87  }
88  catch (Exception)
89  {
90  if (!task.IsCompleted)
91  {
92  // lets free the wrapper task even if the worker thread didn't finish
93  workerThread?.FinishedWorkItem.Set();
94  }
95  throw;
96  }
97  }
98 
99  private bool MonitorTask(Task task,
100  TimeSpan timeSpan,
101  Func<IsolatorLimitResult> withinCustomLimits,
102  long memoryCap = 1024,
103  int sleepIntervalMillis = 1000)
104  {
105  // default to always within custom limits
106  withinCustomLimits = withinCustomLimits ?? (() => new IsolatorLimitResult(TimeSpan.Zero, string.Empty));
107 
108  var message = string.Empty;
109  var emaPeriod = 60d;
110  var memoryUsed = 0L;
111  var utcNow = DateTime.UtcNow;
112  var end = utcNow + timeSpan;
113  var memoryLogger = utcNow + Time.OneMinute;
114  var isolatorLimitResult = new IsolatorLimitResult(TimeSpan.Zero, string.Empty);
115 
116  //Convert to bytes
117  memoryCap *= 1024 * 1024;
118  var spikeLimit = memoryCap*2;
119 
120  while (!task.IsCompleted && utcNow < end)
121  {
122  // if over 80% allocation force GC then sample
123  var sample = Convert.ToDouble(GC.GetTotalMemory(memoryUsed > memoryCap * 0.8));
124 
125  // find the EMA of the memory used to prevent spikes killing stategy
126  memoryUsed = Convert.ToInt64((emaPeriod-1)/emaPeriod * memoryUsed + (1/emaPeriod)*sample);
127 
128  // if the rolling EMA > cap; or the spike is more than 2x the allocation.
129  if (memoryUsed > memoryCap || sample > spikeLimit)
130  {
131  message = Messages.Isolator.MemoryUsageMaxedOut(PrettyFormatRam(memoryCap), PrettyFormatRam((long)sample));
132  break;
133  }
134 
135  if (utcNow > memoryLogger)
136  {
137  if (memoryUsed > memoryCap * 0.8)
138  {
139  Log.Error(Messages.Isolator.MemoryUsageOver80Percent(sample));
140  }
141 
142  Log.Trace("Isolator.ExecuteWithTimeLimit(): " +
143  Messages.Isolator.MemoryUsageInfo(
144  PrettyFormatRam(memoryUsed),
145  PrettyFormatRam((long)sample),
146  PrettyFormatRam(OS.ApplicationMemoryUsed * 1024 * 1024),
147  isolatorLimitResult.CurrentTimeStepElapsed,
148  (int)Math.Ceiling(OS.CpuUsage)));
149 
150  memoryLogger = utcNow.AddMinutes(1);
151  }
152 
153  // check to see if we're within other custom limits defined by the caller
154  isolatorLimitResult = withinCustomLimits();
155  if (!isolatorLimitResult.IsWithinCustomLimits)
156  {
157  message = isolatorLimitResult.ErrorMessage;
158  break;
159  }
160 
161  if (task.Wait(utcNow.GetSecondUnevenWait(sleepIntervalMillis)))
162  {
163  break;
164  }
165 
166  utcNow = DateTime.UtcNow;
167  }
168 
169  if (task.IsCompleted == false && string.IsNullOrEmpty(message))
170  {
171  message = Messages.Isolator.MemoryUsageMonitorTaskTimedOut(timeSpan);
172  Log.Trace($"Isolator.ExecuteWithTimeLimit(): {message}");
173  }
174 
175  if (!string.IsNullOrEmpty(message))
176  {
177  CancellationTokenSource.Cancel();
178  Log.Error($"Security.ExecuteWithTimeLimit(): {message}");
179  throw new TimeoutException(message);
180  }
181  return task.IsCompleted;
182  }
183 
184  /// <summary>
185  /// Execute a code block with a maximum limit on time and memory.
186  /// </summary>
187  /// <param name="timeSpan">Timeout in timespan</param>
188  /// <param name="codeBlock">Action codeblock to execute</param>
189  /// <param name="memoryCap">Maximum memory allocation, default 1024Mb</param>
190  /// <param name="sleepIntervalMillis">Sleep interval between each check in ms</param>
191  /// <param name="workerThread">The worker thread instance that will execute the provided action, if null
192  /// will use a <see cref="Task"/></param>
193  /// <returns>True if algorithm exited successfully, false if cancelled because it exceeded limits.</returns>
194  public bool ExecuteWithTimeLimit(TimeSpan timeSpan, Action codeBlock, long memoryCap, int sleepIntervalMillis = 1000, WorkerThread workerThread = null)
195  {
196  return ExecuteWithTimeLimit(timeSpan, null, codeBlock, memoryCap, sleepIntervalMillis, workerThread);
197  }
198 
199  /// <summary>
200  /// Convert the bytes to a MB in double format for string display
201  /// </summary>
202  /// <param name="ramInBytes"></param>
203  /// <returns></returns>
204  private static string PrettyFormatRam(long ramInBytes)
205  {
206  return Math.Round(Convert.ToDouble(ramInBytes/(1024*1024))).ToStringInvariant();
207  }
208  }
209 }