Lean  $LEAN_TAG$
FakeDataQueue.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using QuantConnect.Data;
19 using QuantConnect.Util;
20 using QuantConnect.Logging;
21 using QuantConnect.Packets;
25 using System.Collections.Generic;
26 using Timer = System.Timers.Timer;
28 
30 {
31  /// <summary>
32  /// This is an implementation of <see cref="IDataQueueHandler"/> used for testing. <see cref="FakeHistoryProvider"/>
33  /// </summary>
35  {
36  private int _count;
37  private readonly Random _random = new Random();
38  private int _dataPointsPerSecondPerSymbol;
39 
40  private readonly Timer _timer;
41  private readonly IOptionChainProvider _optionChainProvider;
42  private readonly EventBasedDataQueueHandlerSubscriptionManager _subscriptionManager;
43  private readonly IDataAggregator _aggregator;
44  private readonly MarketHoursDatabase _marketHoursDatabase;
45  private readonly Dictionary<Symbol, TimeZoneOffsetProvider> _symbolExchangeTimeZones;
46 
47  /// <summary>
48  /// Continuous UTC time provider
49  /// </summary>
50  protected virtual ITimeProvider TimeProvider { get; } = RealTimeProvider.Instance;
51 
52 
53  /// <summary>
54  /// Initializes a new instance of the <see cref="FakeDataQueue"/> class to randomly emit data for each symbol
55  /// </summary>
56  public FakeDataQueue()
57  : this(Composer.Instance.GetExportedValueByTypeName<IDataAggregator>(nameof(AggregationManager)))
58  {
59 
60  }
61 
62  /// <summary>
63  /// Initializes a new instance of the <see cref="FakeDataQueue"/> class to randomly emit data for each symbol
64  /// </summary>
65  public FakeDataQueue(IDataAggregator dataAggregator, int dataPointsPerSecondPerSymbol = 500000)
66  {
67  _aggregator = dataAggregator;
68  _dataPointsPerSecondPerSymbol = dataPointsPerSecondPerSymbol;
69 
70  var mapFileProvider = Composer.Instance.GetPart<IMapFileProvider>();
71  var historyManager = (IHistoryProvider)Composer.Instance.GetPart<HistoryProviderManager>();
72  if (historyManager == null)
73  {
74  historyManager = Composer.Instance.GetPart<IHistoryProvider>();
75  }
76  var optionChainProvider = new LiveOptionChainProvider();
77  optionChainProvider.Initialize(new(mapFileProvider, historyManager));
78  _optionChainProvider = optionChainProvider;
79 
80  _marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
81  _symbolExchangeTimeZones = new Dictionary<Symbol, TimeZoneOffsetProvider>();
82  _subscriptionManager = new EventBasedDataQueueHandlerSubscriptionManager();
83  _subscriptionManager.SubscribeImpl += (s, t) => true;
84  _subscriptionManager.UnsubscribeImpl += (s, t) => true;
85 
86  _timer = new Timer
87  {
88  AutoReset = false,
89  Enabled = true,
90  Interval = 1000,
91  };
92 
93  var lastCount = 0;
94  var lastTime = DateTime.UtcNow;
95  _timer.Elapsed += (sender, args) =>
96  {
97  var elapsed = (DateTime.UtcNow - lastTime);
98  var ticksPerSecond = (_count - lastCount)/elapsed.TotalSeconds;
99  Log.Trace("TICKS PER SECOND:: " + ticksPerSecond.ToStringInvariant("000000.0") + " ITEMS IN QUEUE:: " + 0);
100  lastCount = _count;
101  lastTime = DateTime.UtcNow;
102  PopulateQueue();
103  try
104  {
105  _timer.Reset();
106  }
107  catch (ObjectDisposedException)
108  {
109  // pass
110  }
111  };
112  }
113 
114  /// <summary>
115  /// Subscribe to the specified configuration
116  /// </summary>
117  /// <param name="dataConfig">defines the parameters to subscribe to a data feed</param>
118  /// <param name="newDataAvailableHandler">handler to be fired on new data available</param>
119  /// <returns>The new enumerator for this subscription request</returns>
120  public IEnumerator<BaseData> Subscribe(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler)
121  {
122  var enumerator = _aggregator.Add(dataConfig, newDataAvailableHandler);
123  _subscriptionManager.Subscribe(dataConfig);
124 
125  return enumerator;
126  }
127 
128  /// <summary>
129  /// Sets the job we're subscribing for
130  /// </summary>
131  /// <param name="job">Job we're subscribing for</param>
132  public void SetJob(LiveNodePacket job)
133  {
134  }
135 
136  /// <summary>
137  /// Removes the specified configuration
138  /// </summary>
139  /// <param name="dataConfig">Subscription config to be removed</param>
140  public void Unsubscribe(SubscriptionDataConfig dataConfig)
141  {
142  _subscriptionManager.Unsubscribe(dataConfig);
143  _aggregator.Remove(dataConfig);
144  }
145 
146  /// <summary>
147  /// Returns whether the data provider is connected
148  /// </summary>
149  /// <returns>true if the data provider is connected</returns>
150  public bool IsConnected => true;
151 
152  /// <summary>
153  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
154  /// </summary>
155  public void Dispose()
156  {
157  _timer.Stop();
158  _timer.DisposeSafely();
159  }
160 
161  /// <summary>
162  /// Pumps a bunch of ticks into the queue
163  /// </summary>
164  private void PopulateQueue()
165  {
166  var symbols = _subscriptionManager.GetSubscribedSymbols();
167 
168 
169  foreach (var symbol in symbols)
170  {
171  if (symbol.IsCanonical() || symbol.Contains("UNIVERSE"))
172  {
173  continue;
174  }
175  var offsetProvider = GetTimeZoneOffsetProvider(symbol);
176  var trades = SubscriptionManager.DefaultDataTypes()[symbol.SecurityType].Contains(TickType.Trade);
177  var quotes = SubscriptionManager.DefaultDataTypes()[symbol.SecurityType].Contains(TickType.Quote);
178 
179  // emits 500k per second
180  for (var i = 0; i < _dataPointsPerSecondPerSymbol; i++)
181  {
182  var now = TimeProvider.GetUtcNow();
183  var exchangeTime = offsetProvider.ConvertFromUtc(now);
184  var lastTrade = 100 + (decimal)Math.Abs(Math.Sin(now.TimeOfDay.TotalMilliseconds));
185  if (trades)
186  {
187  _count++;
188  _aggregator.Update(new Tick
189  {
190  Time = exchangeTime,
191  Symbol = symbol,
192  Value = lastTrade,
193  TickType = TickType.Trade,
194  Quantity = _random.Next(10, (int)_timer.Interval)
195  });
196  }
197 
198  if (quotes)
199  {
200  _count++;
201  var bidPrice = lastTrade * 0.95m;
202  var askPrice = lastTrade * 1.05m;
203  var bidSize = _random.Next(10, (int) _timer.Interval);
204  var askSize = _random.Next(10, (int)_timer.Interval);
205  _aggregator.Update(new Tick(exchangeTime, symbol, "", "", bidSize: bidSize, bidPrice: bidPrice, askPrice: askPrice, askSize: askSize));
206  }
207  }
208  }
209  }
210 
211  private TimeZoneOffsetProvider GetTimeZoneOffsetProvider(Symbol symbol)
212  {
213  TimeZoneOffsetProvider offsetProvider;
214  if (!_symbolExchangeTimeZones.TryGetValue(symbol, out offsetProvider))
215  {
216  // read the exchange time zone from market-hours-database
217  var exchangeTimeZone = _marketHoursDatabase.GetExchangeHours(symbol.ID.Market, symbol, symbol.SecurityType).TimeZone;
218  _symbolExchangeTimeZones[symbol] = offsetProvider = new TimeZoneOffsetProvider(exchangeTimeZone, TimeProvider.GetUtcNow(), Time.EndOfTime);
219  }
220  return offsetProvider;
221  }
222 
223  /// <summary>
224  /// Method returns a collection of Symbols that are available at the data source.
225  /// </summary>
226  /// <param name="symbol">Symbol to lookup</param>
227  /// <param name="includeExpired">Include expired contracts</param>
228  /// <param name="securityCurrency">Expected security currency(if any)</param>
229  /// <returns>Enumerable of Symbols, that are associated with the provided Symbol</returns>
230  public IEnumerable<Symbol> LookupSymbols(Symbol symbol, bool includeExpired, string securityCurrency = null)
231  {
232  switch (symbol.SecurityType)
233  {
234  case SecurityType.Option:
235  case SecurityType.IndexOption:
236  case SecurityType.FutureOption:
237  foreach (var result in _optionChainProvider.GetOptionContractList(symbol, DateTime.UtcNow.Date))
238  {
239  yield return result;
240  }
241  break;
242  default:
243  break;
244  }
245  }
246 
247  /// <summary>
248  /// Checks if the FakeDataQueue can perform selection
249  /// </summary>
250  public bool CanPerformSelection()
251  {
252  return true;
253  }
254  }
255 }