Lean  $LEAN_TAG$
BusyCollection.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Concurrent;
19 using System.Collections.Generic;
20 using System.Threading;
22 
23 namespace QuantConnect.Util
24 {
25  /// <summary>
26  /// A non blocking <see cref="IBusyCollection{T}"/> implementation
27  /// </summary>
28  /// <typeparam name="T">The item type being processed</typeparam>
29  public class BusyCollection<T> : IBusyCollection<T>
30  {
31  private readonly ConcurrentQueue<T> _collection = new ConcurrentQueue<T>();
32  private readonly ManualResetEventSlim _processingCompletedEvent = new ManualResetEventSlim(true);
33  private bool _completedAdding;
34 
35  /// <summary>
36  /// Gets a wait handle that can be used to wait until this instance is done
37  /// processing all of it's item
38  /// </summary>
39  public WaitHandle WaitHandle => _processingCompletedEvent.WaitHandle;
40 
41  /// <summary>
42  /// Gets the number of items held within this collection
43  /// </summary>
44  public int Count => _collection.Count;
45 
46  /// <summary>
47  /// Returns true if processing, false otherwise
48  /// </summary>
49  public bool IsBusy => !_collection.IsEmpty || !_processingCompletedEvent.IsSet;
50 
51  /// <summary>
52  /// Adds the items to this collection
53  /// </summary>
54  /// <param name="item">The item to be added</param>
55  public void Add(T item)
56  {
57  Add(item, CancellationToken.None);
58  }
59 
60  /// <summary>
61  /// Adds the items to this collection
62  /// </summary>
63  /// <param name="item">The item to be added</param>
64  /// <param name="cancellationToken">A cancellation token to observer</param>
65  public void Add(T item, CancellationToken cancellationToken)
66  {
67  if (_completedAdding)
68  {
69  throw new InvalidOperationException("Collection has already been marked as not " +
70  $"accepting more additions, see {nameof(CompleteAdding)}");
71  }
72 
73  // locking to avoid race condition with GetConsumingEnumerable()
74  lock (_processingCompletedEvent)
75  {
76  // we're adding work to be done, mark us as busy
77  _processingCompletedEvent.Reset();
78  _collection.Enqueue(item);
79  }
80  }
81 
82  /// <summary>
83  /// Provides a consuming enumerable for items in this collection.
84  /// </summary>
85  /// <returns>An enumerable that removes and returns items from the collection</returns>
86  public IEnumerable<T> GetConsumingEnumerable()
87  {
88  return GetConsumingEnumerable(CancellationToken.None);
89  }
90 
91  /// <summary>
92  /// Provides a consuming enumerable for items in this collection.
93  /// </summary>
94  /// <param name="cancellationToken">A cancellation token to observer</param>
95  /// <returns>An enumerable that removes and returns items from the collection</returns>
96  public IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken)
97  {
98  T item;
99  while (_collection.TryDequeue(out item))
100  {
101  yield return item;
102  }
103 
104  // locking to avoid race condition with Add()
105  lock (_processingCompletedEvent)
106  {
107  if (!_collection.TryPeek(out item))
108  {
109  _processingCompletedEvent.Set();
110  }
111  }
112  }
113 
114  /// <summary>
115  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
116  /// </summary>
117  /// <filterpriority>2</filterpriority>
118  public void Dispose()
119  {
120  _collection.Clear();
121  _processingCompletedEvent.Dispose();
122  }
123 
124  /// <summary>
125  /// Marks the collection as not accepting any more additions
126  /// </summary>
127  public void CompleteAdding()
128  {
129  _completedAdding = true;
130  }
131  }
132 }