Lean  $LEAN_TAG$
BusyBlockingCollection.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Concurrent;
19 using System.Collections.Generic;
20 using System.Threading;
22 
24 {
25  /// <summary>
26  /// A small wrapper around <see cref="BlockingCollection{T}"/> used to communicate busy state of the items
27  /// being processed
28  /// </summary>
29  /// <typeparam name="T">The item type being processed</typeparam>
31  {
32  private readonly BlockingCollection<T> _collection;
33  private readonly ManualResetEventSlim _processingCompletedEvent;
34  private readonly object _lock = new object();
35 
36  /// <summary>
37  /// Gets a wait handle that can be used to wait until this instance is done
38  /// processing all of it's item
39  /// </summary>
40  public WaitHandle WaitHandle
41  {
42  get { return _processingCompletedEvent.WaitHandle; }
43  }
44 
45  /// <summary>
46  /// Gets the number of items held within this collection
47  /// </summary>
48  public int Count
49  {
50  get { return _collection.Count; }
51  }
52 
53  /// <summary>
54  /// Returns true if processing, false otherwise
55  /// </summary>
56  public bool IsBusy
57  {
58  get
59  {
60  lock (_lock)
61  {
62  return _collection.Count > 0 || !_processingCompletedEvent.IsSet;
63  }
64  }
65  }
66 
67  /// <summary>
68  /// Initializes a new instance of the <see cref="BusyBlockingCollection{T}"/> class
69  /// with a bounded capacity of <see cref="int.MaxValue"/>
70  /// </summary>
72  : this(int.MaxValue)
73  {
74  }
75 
76  /// <summary>
77  /// Initializes a new instance of the <see cref="BusyBlockingCollection{T}"/> class
78  /// with the specified <paramref name="boundedCapacity"/>
79  /// </summary>
80  /// <param name="boundedCapacity">The maximum number of items allowed in the collection</param>
81  public BusyBlockingCollection(int boundedCapacity)
82  {
83  _collection = new BlockingCollection<T>(boundedCapacity);
84 
85  // initialize as not busy
86  _processingCompletedEvent = new ManualResetEventSlim(true);
87  }
88 
89  /// <summary>
90  /// Adds the items to this collection
91  /// </summary>
92  /// <param name="item">The item to be added</param>
93  public void Add(T item)
94  {
95  Add(item, CancellationToken.None);
96  }
97 
98  /// <summary>
99  /// Adds the items to this collection
100  /// </summary>
101  /// <param name="item">The item to be added</param>
102  /// <param name="cancellationToken">A cancellation token to observer</param>
103  public void Add(T item, CancellationToken cancellationToken)
104  {
105  bool added;
106  lock (_lock)
107  {
108  // we're adding work to be done, mark us as busy
109  _processingCompletedEvent.Reset();
110  added = _collection.TryAdd(item, 0, cancellationToken);
111  }
112 
113  if (!added)
114  {
115  _collection.Add(item, cancellationToken);
116  }
117  }
118 
119  /// <summary>
120  /// Marks the <see cref="BusyBlockingCollection{T}"/> as not accepting any more additions
121  /// </summary>
122  public void CompleteAdding()
123  {
124  _collection.CompleteAdding();
125  }
126 
127  /// <summary>
128  /// Provides a consuming enumerable for items in this collection.
129  /// </summary>
130  /// <returns>An enumerable that removes and returns items from the collection</returns>
131  public IEnumerable<T> GetConsumingEnumerable()
132  {
133  return GetConsumingEnumerable(CancellationToken.None);
134  }
135 
136  /// <summary>
137  /// Provides a consuming enumerable for items in this collection.
138  /// </summary>
139  /// <param name="cancellationToken">A cancellation token to observer</param>
140  /// <returns>An enumerable that removes and returns items from the collection</returns>
141  public IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken)
142  {
143  while (!_collection.IsCompleted)
144  {
145  T item;
146 
147  // check to see if something is immediately available
148  bool tookItem;
149 
150  try
151  {
152  tookItem = _collection.TryTake(out item, 0, cancellationToken);
153  }
154  catch (OperationCanceledException)
155  {
156  // if the operation was canceled, just bail on the enumeration
157  yield break;
158  }
159 
160  if (tookItem)
161  {
162  // something was immediately available, emit it
163  yield return item;
164  continue;
165  }
166 
167 
168  // we need to lock this with the Add method since we need to model the act of
169  // taking/flipping the switch and adding/flipping the switch as one operation
170  lock (_lock)
171  {
172  // double check that there's nothing in the collection within a lock, it's possible
173  // that between the TryTake above and this statement, the Add method was called, so we
174  // don't want to flip the switch if there's something in the collection
175  if (_collection.Count == 0)
176  {
177  // nothing was immediately available, mark us as idle
178  _processingCompletedEvent.Set();
179  }
180  }
181 
182  try
183  {
184  // now block until something is available
185  tookItem = _collection.TryTake(out item, Timeout.Infinite, cancellationToken);
186  }
187  catch (OperationCanceledException)
188  {
189  // if the operation was canceled, just bail on the enumeration
190  yield break;
191  }
192 
193  if (tookItem)
194  {
195  // emit the item we found
196  yield return item;
197  }
198  }
199 
200  // no more items to process
201  _processingCompletedEvent.Set();
202  }
203 
204  /// <summary>
205  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
206  /// </summary>
207  /// <filterpriority>2</filterpriority>
208  public void Dispose()
209  {
210  _collection.Dispose();
211  _processingCompletedEvent.Dispose();
212  }
213  }
214 }