Lean  $LEAN_TAG$
WorkerThread.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections.Concurrent;
18 using System.Threading;
19 using QuantConnect.Logging;
20 
21 namespace QuantConnect.Util
22 {
23  /// <summary>
24  /// This worker tread is required to guarantee all python operations are
25  /// executed by the same thread, to enable complete debugging functionality.
26  /// We don't use the main thread, to avoid any chance of blocking the process
27  /// </summary>
28  public class WorkerThread : IDisposable
29  {
30  private readonly BlockingCollection<Action> _blockingCollection;
31  private readonly CancellationTokenSource _threadCancellationTokenSource;
32  private readonly Thread _workerThread;
33 
34  /// <summary>
35  /// The worker thread instance
36  /// </summary>
37  public static WorkerThread Instance = new WorkerThread();
38 
39  /// <summary>
40  /// Will be set when the worker thread finishes a work item
41  /// </summary>
42  public AutoResetEvent FinishedWorkItem { get; }
43 
44  /// <summary>
45  /// Creates a new instance, which internally launches a new worker thread
46  /// </summary>
47  /// <remarks><see cref="Dispose"/></remarks>
48  protected WorkerThread()
49  {
50  _threadCancellationTokenSource = new CancellationTokenSource();
51  FinishedWorkItem = new AutoResetEvent(false);
52  _blockingCollection = new BlockingCollection<Action>();
53  _workerThread = new Thread(() =>
54  {
55  try
56  {
57  foreach (var action in _blockingCollection.GetConsumingEnumerable(_threadCancellationTokenSource.Token))
58  {
59  FinishedWorkItem.Reset();
60  try
61  {
62  action();
63  }
64  catch (Exception exception)
65  {
66  Log.Error(exception, "WorkerThread(): exception thrown when running task");
67  }
68  FinishedWorkItem.Set();
69  }
70  }
71  catch (OperationCanceledException)
72  {
73  // pass, when the token gets cancelled
74  }
75  })
76  {
77  IsBackground = true,
78  Name = "Isolator Thread",
79  Priority = ThreadPriority.Highest
80  };
81  _workerThread.Start();
82  }
83 
84  /// <summary>
85  /// Adds a new item of work
86  /// </summary>
87  /// <param name="action">The work item to add</param>
88  public void Add(Action action)
89  {
90  _blockingCollection.Add(action);
91  }
92 
93  /// <summary>
94  /// Disposes the worker thread.
95  /// </summary>
96  /// <remarks>Note that the worker thread is a background thread,
97  /// so it won't block the process from terminating even if not disposed</remarks>
98  public void Dispose()
99  {
100  try
101  {
102  _blockingCollection.CompleteAdding();
103  _workerThread.StopSafely(TimeSpan.FromMilliseconds(50), _threadCancellationTokenSource);
104  _threadCancellationTokenSource.DisposeSafely();
105  }
106  catch (Exception exception)
107  {
108  Log.Error(exception);
109  }
110  }
111  }
112 }