Lean  $LEAN_TAG$
RateGate.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Threading;
18 using System.Collections.Generic;
19 
20 namespace QuantConnect.Util
21 {
22  /// <summary>
23  /// Used to control the rate of some occurrence per unit of time.
24  /// </summary>
25  /// <see href="http://www.jackleitch.net/2010/10/better-rate-limiting-with-dot-net/"/>
26  /// <remarks>
27  /// <para>
28  /// To control the rate of an action using a <see cref="RateGate"/>,
29  /// code should simply call <see cref="WaitToProceed()"/> prior to
30  /// performing the action. <see cref="WaitToProceed()"/> will block
31  /// the current thread until the action is allowed based on the rate
32  /// limit.
33  /// </para>
34  /// <para>
35  /// This class is thread safe. A single <see cref="RateGate"/> instance
36  /// may be used to control the rate of an occurrence across multiple
37  /// threads.
38  /// </para>
39  /// </remarks>
40  public class RateGate : IDisposable
41  {
42  // Semaphore used to count and limit the number of occurrences per
43  // unit time.
44  private readonly SemaphoreSlim _semaphore;
45 
46  // Times (in millisecond ticks) at which the semaphore should be exited.
47  private readonly Queue<int> _exitTimes;
48 
49  // Timer used to trigger exiting the semaphore.
50  private readonly Timer _exitTimer;
51 
52  // Whether this instance is disposed.
53  private bool _isDisposed;
54 
55  /// <summary>
56  /// Number of occurrences allowed per unit of time.
57  /// </summary>
58  public int Occurrences
59  {
60  get; private set;
61  }
62 
63  /// <summary>
64  /// The length of the time unit, in milliseconds.
65  /// </summary>
66  public int TimeUnitMilliseconds
67  {
68  get; private set;
69  }
70 
71  /// <summary>
72  /// Flag indicating we are currently being rate limited
73  /// </summary>
74  public bool IsRateLimited
75  {
76  get { return !WaitToProceed(0); }
77  }
78 
79  /// <summary>
80  /// Initializes a <see cref="RateGate"/> with a rate of <paramref name="occurrences"/>
81  /// per <paramref name="timeUnit"/>.
82  /// </summary>
83  /// <param name="occurrences">Number of occurrences allowed per unit of time.</param>
84  /// <param name="timeUnit">Length of the time unit.</param>
85  /// <exception cref="ArgumentOutOfRangeException">
86  /// If <paramref name="occurrences"/> or <paramref name="timeUnit"/> is negative.
87  /// </exception>
88  public RateGate(int occurrences, TimeSpan timeUnit)
89  {
90  // Check the arguments.
91  if (occurrences <= 0)
92  throw new ArgumentOutOfRangeException(nameof(occurrences), "Number of occurrences must be a positive integer");
93  if (timeUnit != timeUnit.Duration())
94  throw new ArgumentOutOfRangeException(nameof(timeUnit), "Time unit must be a positive span of time");
95  if (timeUnit >= TimeSpan.FromMilliseconds(UInt32.MaxValue))
96  throw new ArgumentOutOfRangeException(nameof(timeUnit), "Time unit must be less than 2^32 milliseconds");
97 
98  Occurrences = occurrences;
99  TimeUnitMilliseconds = (int)timeUnit.TotalMilliseconds;
100 
101  // Create the semaphore, with the number of occurrences as the maximum count.
102  _semaphore = new SemaphoreSlim(Occurrences, Occurrences);
103 
104  // Create a queue to hold the semaphore exit times.
105  _exitTimes = new ();
106 
107  // Create a timer to exit the semaphore. Use the time unit as the original
108  // interval length because that's the earliest we will need to exit the semaphore.
109  _exitTimer = new Timer(ExitTimerCallback, null, TimeUnitMilliseconds, -1);
110  }
111 
112  // Callback for the exit timer that exits the semaphore based on exit times
113  // in the queue and then sets the timer for the nextexit time.
114  // Credit to Jim: http://www.jackleitch.net/2010/10/better-rate-limiting-with-dot-net/#comment-3620
115  // for providing the code below, fixing issue #3499 - https://github.com/QuantConnect/Lean/issues/3499
116  private void ExitTimerCallback(object state)
117  {
118  try
119  {
120  // While there are exit times that are passed due still in the queue,
121  // exit the semaphore and dequeue the exit time.
122  var exitTime = 0;
123  var exitTimeValid = false;
124  var tickCount = Environment.TickCount;
125  lock (_exitTimes)
126  {
127  exitTimeValid = _exitTimes.TryPeek(out exitTime);
128  while (exitTimeValid)
129  {
130  if (unchecked(exitTime - tickCount) > 0)
131  {
132  break;
133  }
134  _semaphore.Release();
135  _exitTimes.Dequeue();
136  exitTimeValid = _exitTimes.TryPeek(out exitTime);
137  }
138  }
139  // we are already holding the next item from the queue, do not peek again
140  // although this exit time may have already pass by this stmt.
141  var timeUntilNextCheck = exitTimeValid
142  ? Math.Min(TimeUnitMilliseconds, Math.Max(0, exitTime - tickCount))
144 
145  _exitTimer.Change(timeUntilNextCheck, -1);
146  }
147  catch (Exception)
148  {
149  // can throw if called when disposing
150  }
151  }
152 
153  /// <summary>
154  /// Blocks the current thread until allowed to proceed or until the
155  /// specified timeout elapses.
156  /// </summary>
157  /// <param name="millisecondsTimeout">Number of milliseconds to wait, or -1 to wait indefinitely.</param>
158  /// <returns>true if the thread is allowed to proceed, or false if timed out</returns>
159  public bool WaitToProceed(int millisecondsTimeout)
160  {
161  // Check the arguments.
162  if (millisecondsTimeout < -1)
163  throw new ArgumentOutOfRangeException(nameof(millisecondsTimeout));
164 
165  CheckDisposed();
166 
167  // Block until we can enter the semaphore or until the timeout expires.
168  var entered = _semaphore.Wait(millisecondsTimeout);
169 
170  // If we entered the semaphore, compute the corresponding exit time
171  // and add it to the queue.
172  if (entered)
173  {
174  var timeToExit = unchecked(Environment.TickCount + TimeUnitMilliseconds);
175  lock(_exitTimes)
176  {
177  _exitTimes.Enqueue(timeToExit);
178  }
179  }
180 
181  return entered;
182  }
183 
184  /// <summary>
185  /// Blocks the current thread until allowed to proceed or until the
186  /// specified timeout elapses.
187  /// </summary>
188  /// <param name="timeout"></param>
189  /// <returns>true if the thread is allowed to proceed, or false if timed out</returns>
190  public bool WaitToProceed(TimeSpan timeout)
191  {
192  return WaitToProceed((int)timeout.TotalMilliseconds);
193  }
194 
195  /// <summary>
196  /// Blocks the current thread indefinitely until allowed to proceed.
197  /// </summary>
198  public void WaitToProceed()
199  {
200  WaitToProceed(Timeout.Infinite);
201  }
202 
203  // Throws an ObjectDisposedException if this object is disposed.
204  private void CheckDisposed()
205  {
206  if (_isDisposed)
207  throw new ObjectDisposedException("RateGate is already disposed");
208  }
209 
210  /// <summary>
211  /// Releases unmanaged resources held by an instance of this class.
212  /// </summary>
213  public void Dispose()
214  {
215  Dispose(true);
216  GC.SuppressFinalize(this);
217  }
218 
219  /// <summary>
220  /// Releases unmanaged resources held by an instance of this class.
221  /// </summary>
222  /// <param name="isDisposing">Whether this object is being disposed.</param>
223  protected virtual void Dispose(bool isDisposing)
224  {
225  if (!_isDisposed)
226  {
227  if (isDisposing)
228  {
229  // The semaphore and timer both implement IDisposable and
230  // therefore must be disposed.
231  _semaphore.Dispose();
232  _exitTimer.Dispose();
233 
234  _isDisposed = true;
235  }
236  }
237  }
238  }
239 }