Lean  $LEAN_TAG$
KeyStringSynchronizer.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Threading;
19 using System.Collections.Generic;
20 
21 namespace QuantConnect.Util
22 {
23  /// <summary>
24  /// Helper class to synchronize execution based on a string key
25  /// </summary>
26  public class KeyStringSynchronizer
27  {
28  private readonly Dictionary<string, string> _currentStrings = new ();
29 
30  /// <summary>
31  /// Execute the given action synchronously with any other thread using the same key
32  /// </summary>
33  /// <param name="key">The synchronization key</param>
34  /// <param name="singleExecution">True if execution should happen only once at the same time for multiple threads</param>
35  /// <param name="action">The action to execute</param>
36  public void Execute(string key, bool singleExecution, Action action)
37  {
38  ExecuteImplementation(key, singleExecution, action);
39  }
40 
41  /// <summary>
42  /// Execute the given function synchronously with any other thread using the same key
43  /// </summary>
44  /// <param name="key">The synchronization key</param>
45  /// <param name="action">The function to execute</param>
46  public T Execute<T>(string key, Func<T> action)
47  {
48  T result = default;
49  ExecuteImplementation(key, singleExecution: false, () =>
50  {
51  result = action();
52  });
53  return result;
54  }
55 
56  private void ExecuteImplementation(string key, bool singleExecution, Action action)
57  {
58  lock (key)
59  {
60  while (true)
61  {
62  bool lockTaken = false;
63  string existingKey;
64  lock (_currentStrings)
65  {
66  if(!_currentStrings.TryGetValue(key, out existingKey))
67  {
68  _currentStrings[key] = existingKey = key;
69  }
70  }
71 
72  try
73  {
74  lockTaken = Monitor.TryEnter(existingKey);
75  // this way we can handle reentry with no issues
76  if (lockTaken)
77  {
78  try
79  {
80  // happy case
81  action();
82  return;
83  }
84  finally
85  {
86  lock (_currentStrings)
87  {
88  // even if we fail we need to release it
89  _currentStrings.Remove(key);
90  }
91  }
92  }
93  }
94  finally
95  {
96  if (lockTaken)
97  {
98  Monitor.Exit(existingKey);
99  }
100  }
101 
102  lock (existingKey)
103  {
104  // if we are here the thread that had the lock finished
105  if (!singleExecution)
106  {
107  // time to try again!
108  continue;
109  }
110  return;
111  }
112  }
113  }
114  }
115  }
116 }