Lean  $LEAN_TAG$
DownloaderDataProvider.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using NodaTime;
19 using System.IO;
20 using System.Linq;
21 using QuantConnect.Util;
22 using QuantConnect.Data;
23 using QuantConnect.Logging;
26 using System.Collections.Generic;
28 using System.Collections.Concurrent;
30 
32 {
33  /// <summary>
34  /// Data provider which downloads data using an <see cref="IDataDownloader"/> or <see cref="IBrokerage"/> implementation
35  /// </summary>
37  {
38  /// <summary>
39  /// Synchronizer in charge of guaranteeing a single operation per file path
40  /// </summary>
41  private readonly static KeyStringSynchronizer DiskSynchronizer = new();
42 
43  private bool _customDataDownloadError;
44  private readonly ConcurrentDictionary<Symbol, Symbol> _marketHoursWarning = new();
45  private readonly MarketHoursDatabase _marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
46  private readonly IDataDownloader _dataDownloader;
47  private readonly IDataCacheProvider _dataCacheProvider = new DiskDataCacheProvider(DiskSynchronizer);
48  private readonly IMapFileProvider _mapFileProvider = Composer.Instance.GetPart<IMapFileProvider>();
49 
50  /// <summary>
51  /// Creates a new instance
52  /// </summary>
54  {
55  var dataDownloaderConfig = Config.Get("data-downloader");
56  if (!string.IsNullOrEmpty(dataDownloaderConfig))
57  {
58  _dataDownloader = Composer.Instance.GetExportedValueByTypeName<IDataDownloader>(dataDownloaderConfig);
59  }
60  else
61  {
62  throw new ArgumentException("DownloaderDataProvider(): requires 'data-downloader' to be set with a valid type name");
63  }
64  }
65 
66  /// <summary>
67  /// Creates a new instance using a target data downloader used for testing
68  /// </summary>
69  public DownloaderDataProvider(IDataDownloader dataDownloader)
70  {
71  _dataDownloader = dataDownloader;
72  }
73 
74  /// <summary>
75  /// Determines if it should downloads new data and retrieves data from disc
76  /// </summary>
77  /// <param name="key">A string representing where the data is stored</param>
78  /// <returns>A <see cref="Stream"/> of the data requested</returns>
79  public override Stream Fetch(string key)
80  {
81  return DownloadOnce(key, s =>
82  {
83  if (LeanData.TryParsePath(key, out var symbol, out var date, out var resolution, out var tickType, out var dataType))
84  {
85  if (symbol.SecurityType == SecurityType.Base)
86  {
87  if (!_customDataDownloadError)
88  {
89  _customDataDownloadError = true;
90  // lean data writter doesn't support it
91  Log.Trace($"DownloaderDataProvider.Get(): custom data is not supported, requested: {symbol}");
92  }
93  return;
94  }
95 
97  try
98  {
99  entry = _marketHoursDatabase.GetEntry(symbol.ID.Market, symbol, symbol.SecurityType);
100  }
101  catch
102  {
103  // this could happen for some sources using the data provider but with not market hours data base entry, like interest rates
104  if (_marketHoursWarning.TryAdd(symbol, symbol))
105  {
106  // log once
107  Log.Trace($"DownloaderDataProvider.Get(): failed to find market hours for {symbol}, skipping");
108  }
109  // this shouldn't happen for data we want can download
110  return;
111  }
112 
113  var dataTimeZone = entry.DataTimeZone;
114  var exchangeTimeZone = entry.ExchangeHours.TimeZone;
115  DateTime startTimeUtc;
116  DateTime endTimeUtc;
117  // we will download until yesterday so we are sure we don't get partial data
118  var endTimeUtcLimit = DateTime.UtcNow.Date.AddDays(-1);
119  if (resolution < Resolution.Hour)
120  {
121  // we can get the date from the path
122  startTimeUtc = date.ConvertToUtc(dataTimeZone);
123  // let's get the whole day
124  endTimeUtc = date.AddDays(1).ConvertToUtc(dataTimeZone);
125  if (endTimeUtc > endTimeUtcLimit)
126  {
127  // we are at the limit, avoid getting partial data
128  return;
129  }
130  }
131  else
132  {
133  // since hourly & daily are a single file we fetch the whole file
134  endTimeUtc = endTimeUtcLimit;
135  try
136  {
137  // we don't really know when Futures, FutureOptions, Cryptos, etc, start date so let's give it a good guess
138  if (symbol.SecurityType == SecurityType.Crypto)
139  {
140  // bitcoin start
141  startTimeUtc = new DateTime(2009, 1, 1);
142  }
143  else if (symbol.SecurityType.IsOption() && symbol.SecurityType != SecurityType.FutureOption)
144  {
145  // For options, an hourly or daily file contains a year of data, so we need to get the year of the date
146  startTimeUtc = new DateTime(date.Year, 1, 1);
147  endTimeUtc = startTimeUtc.AddYears(1);
148  }
149  else
150  {
151  startTimeUtc = symbol.ID.Date;
152  }
153  }
154  catch (InvalidOperationException)
155  {
156  startTimeUtc = Time.Start;
157  }
158 
159  if (startTimeUtc < Time.Start)
160  {
161  startTimeUtc = Time.Start;
162  }
163 
164  if (endTimeUtc > endTimeUtcLimit)
165  {
166  endTimeUtc = endTimeUtcLimit;
167  }
168  }
169 
170  try
171  {
172  if (dataType == typeof(OptionUniverse))
173  {
174  var processingDate = date.ConvertToUtc(dataTimeZone);
175  UniverseExtensions.RunUniverseDownloader(_dataDownloader, new DataUniverseDownloaderGetParameters(symbol, processingDate, processingDate.AddDays(1), entry.ExchangeHours));
176  return;
177  }
178 
179  LeanDataWriter writer = null;
180  var getParams = new DataDownloaderGetParameters(symbol, resolution, startTimeUtc, endTimeUtc, tickType);
181 
182  var downloaderDataParameters = getParams.GetDataDownloaderParameterForAllMappedSymbols(_mapFileProvider, exchangeTimeZone);
183 
184  var downloadedData = GetDownloadedData(downloaderDataParameters, symbol, exchangeTimeZone, dataTimeZone, dataType);
185 
186  foreach (var dataPerSymbol in downloadedData)
187  {
188  if (writer == null)
189  {
190  writer = new LeanDataWriter(resolution, symbol, Globals.DataFolder, tickType, mapSymbol: true, dataCacheProvider: _dataCacheProvider);
191  }
192  // Save the data
193  writer.Write(dataPerSymbol);
194  }
195  }
196  catch (Exception e)
197  {
198  Log.Error(e);
199  }
200  }
201  });
202  }
203 
204  /// <summary>
205  /// Retrieves downloaded data grouped by symbol based on <see cref="IDownloadProvider"/>.
206  /// </summary>
207  /// <param name="downloaderDataParameters">Parameters specifying the data to be retrieved.</param>
208  /// <param name="symbol">Represents a unique security identifier, generate by ticker name.</param>
209  /// <param name="exchangeTimeZone">The time zone of the exchange where the symbol is traded.</param>
210  /// <param name="dataTimeZone">The time zone in which the data is represented.</param>
211  /// <param name="dataType">The type of data to be retrieved. (e.g. <see cref="Data.Market.TradeBar"/>)</param>
212  /// <returns>An IEnumerable containing groups of data grouped by symbol. Each group contains data related to a specific symbol.</returns>
213  /// <exception cref="ArgumentException"> Thrown when the downloaderDataParameters collection is null or empty.</exception>
214  public IEnumerable<IGrouping<Symbol, BaseData>> GetDownloadedData(
215  IEnumerable<DataDownloaderGetParameters> downloaderDataParameters,
216  Symbol symbol,
217  DateTimeZone exchangeTimeZone,
218  DateTimeZone dataTimeZone,
219  Type dataType)
220  {
221  if (downloaderDataParameters.IsNullOrEmpty())
222  {
223  throw new ArgumentException($"{nameof(DownloaderDataProvider)}.{nameof(GetDownloadedData)}: DataDownloaderGetParameters are empty or equal to null.");
224  }
225 
226  foreach (var downloaderDataParameter in downloaderDataParameters)
227  {
228  var downloadedData = _dataDownloader.Get(downloaderDataParameter);
229 
230  if (downloadedData == null)
231  {
232  // doesn't support this download request, that's okay
233  continue;
234  }
235 
236  var groupedData = FilterAndGroupDownloadDataBySymbol(
237  downloadedData,
238  symbol,
239  dataType,
240  exchangeTimeZone,
241  dataTimeZone,
242  downloaderDataParameter.StartUtc,
243  downloaderDataParameter.EndUtc);
244 
245  foreach (var data in groupedData)
246  {
247  yield return data;
248  }
249  }
250  }
251 
252  /// <summary>
253  /// Get's the stream for a given file path
254  /// </summary>
255  protected override Stream GetStream(string key)
256  {
257  if (LeanData.TryParsePath(key, out var symbol, out var date, out var resolution, out var _) && resolution > Resolution.Minute && symbol.RequiresMapping())
258  {
259  // because the file could be updated even after it's created because of symbol mapping we can't stream from disk
260  return DiskSynchronizer.Execute(key, () =>
261  {
262  var baseStream = base.Fetch(key);
263  if (baseStream != null)
264  {
265  var result = new MemoryStream();
266  baseStream.CopyTo(result);
267  baseStream.Dispose();
268  // move position back to the start
269  result.Position = 0;
270 
271  return result;
272  }
273  return null;
274  });
275  }
276 
277  return base.Fetch(key);
278  }
279 
280  /// <summary>
281  /// Main filter to determine if this file needs to be downloaded
282  /// </summary>
283  /// <param name="filePath">File we are looking at</param>
284  /// <returns>True if should download</returns>
285  protected override bool NeedToDownload(string filePath)
286  {
287  // Ignore null and invalid data requests
288  if (filePath == null
289  || filePath.Contains("fine", StringComparison.InvariantCultureIgnoreCase) && filePath.Contains("fundamental", StringComparison.InvariantCultureIgnoreCase)
290  || filePath.Contains("map_files", StringComparison.InvariantCultureIgnoreCase)
291  || filePath.Contains("factor_files", StringComparison.InvariantCultureIgnoreCase)
292  || filePath.Contains("margins", StringComparison.InvariantCultureIgnoreCase) && filePath.Contains("future", StringComparison.InvariantCultureIgnoreCase))
293  {
294  return false;
295  }
296 
297  // Only download if it doesn't exist or is out of date.
298  // Files are only "out of date" for non date based files (hour, daily, margins, etc.) because this data is stored all in one file
299  return !File.Exists(filePath) || filePath.IsOutOfDate();
300  }
301 
302  /// <summary>
303  /// Filters and groups the provided download data by symbol, based on specified criteria.
304  /// </summary>
305  /// <param name="downloadData">The collection of download data to process.</param>
306  /// <param name="symbol">The symbol to filter the data for.</param>
307  /// <param name="dataType">The type of data to filter for.</param>
308  /// <param name="exchangeTimeZone">The time zone of the exchange.</param>
309  /// <param name="dataTimeZone">The desired time zone for the data.</param>
310  /// <param name="downloaderStartTimeUtc">The start time of data downloading in UTC.</param>
311  /// <param name="downloaderEndTimeUtc">The end time of data downloading in UTC.</param>
312  /// <returns>
313  /// An enumerable collection of groupings of download data, grouped by symbol.
314  /// </returns>
315  public static IEnumerable<IGrouping<Symbol, BaseData>> FilterAndGroupDownloadDataBySymbol(
316  IEnumerable<BaseData> downloadData,
317  Symbol symbol,
318  Type dataType,
319  DateTimeZone exchangeTimeZone,
320  DateTimeZone dataTimeZone,
321  DateTime downloaderStartTimeUtc,
322  DateTime downloaderEndTimeUtc)
323  {
324  var startDateTimeInExchangeTimeZone = downloaderStartTimeUtc.ConvertFromUtc(exchangeTimeZone);
325  var endDateTimeInExchangeTimeZone = downloaderEndTimeUtc.ConvertFromUtc(exchangeTimeZone);
326 
327  return downloadData
328  .Where(baseData =>
329  {
330  // Sometimes, external Downloader provider returns excess data
331  if (baseData.Time < startDateTimeInExchangeTimeZone || baseData.Time > endDateTimeInExchangeTimeZone)
332  {
333  return false;
334  }
335 
336  if (symbol.SecurityType == SecurityType.Base || baseData.GetType() == dataType)
337  {
338  // we need to store the data in data time zone
339  baseData.Time = baseData.Time.ConvertTo(exchangeTimeZone, dataTimeZone);
340  baseData.EndTime = baseData.EndTime.ConvertTo(exchangeTimeZone, dataTimeZone);
341  return true;
342  }
343  return false;
344  })
345  // for canonical symbols, downloader will return data for all of the chain
346  .GroupBy(baseData => baseData.Symbol);
347  }
348  }
349 }