Lean  $LEAN_TAG$
LeanDataWriter.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.IO;
18 using System.Linq;
19 using System.Text;
20 using QuantConnect.Util;
21 using System.Globalization;
22 using QuantConnect.Logging;
23 using System.Threading.Tasks;
26 using System.Collections.Generic;
29 
30 namespace QuantConnect.Data
31 {
32  /// <summary>
33  /// Data writer for saving an IEnumerable of BaseData into the LEAN data directory.
34  /// </summary>
35  public class LeanDataWriter
36  {
37  private static KeyStringSynchronizer _keySynchronizer = new();
38 
39  /// <summary>
40  /// The map file provider instance to use
41  /// </summary>
42  /// <remarks>Public for testing</remarks>
43  public static Lazy<IMapFileProvider> MapFileProvider { get; set; } = new(
44  Composer.Instance.GetExportedValueByTypeName<IMapFileProvider>(Config.Get("map-file-provider", "LocalDiskMapFileProvider"), forceTypeNameOnExisting: false)
45  );
46 
47  private readonly Symbol _symbol;
48  private readonly bool _mapSymbol;
49  private readonly string _dataDirectory;
50  private readonly TickType _tickType;
51  private readonly Resolution _resolution;
52  private readonly WritePolicy _writePolicy;
53  private readonly SecurityType _securityType;
54  private readonly IDataCacheProvider _dataCacheProvider;
55 
56  /// <summary>
57  /// Create a new lean data writer to this base data directory.
58  /// </summary>
59  /// <param name="symbol">Symbol string</param>
60  /// <param name="dataDirectory">Base data directory</param>
61  /// <param name="resolution">Resolution of the desired output data</param>
62  /// <param name="tickType">The tick type</param>
63  /// <param name="dataCacheProvider">The data cache provider to use</param>
64  /// <param name="writePolicy">The file write policy to use</param>
65  /// <param name="mapSymbol">True if the symbol should be mapped while writting the data</param>
66  public LeanDataWriter(Resolution resolution, Symbol symbol, string dataDirectory, TickType tickType = TickType.Trade,
67  IDataCacheProvider dataCacheProvider = null, WritePolicy? writePolicy = null, bool mapSymbol = false) : this(
68  dataDirectory,
69  resolution,
70  symbol.ID.SecurityType,
71  tickType,
72  dataCacheProvider,
73  writePolicy
74  )
75  {
76  _symbol = symbol;
77  _mapSymbol = mapSymbol;
78  // All fx data is quote data.
79  if (_securityType == SecurityType.Forex || _securityType == SecurityType.Cfd)
80  {
81  _tickType = TickType.Quote;
82  }
83 
84  if (_securityType != SecurityType.Equity && _securityType != SecurityType.Forex && _securityType != SecurityType.Cfd && _securityType != SecurityType.Crypto
85  && _securityType != SecurityType.Future && _securityType != SecurityType.Option && _securityType != SecurityType.FutureOption
86  && _securityType != SecurityType.Index && _securityType != SecurityType.IndexOption && _securityType != SecurityType.CryptoFuture)
87  {
88  throw new NotImplementedException("Sorry this security type is not yet supported by the LEAN data writer: " + _securityType);
89  }
90  }
91 
92  /// <summary>
93  /// Create a new lean data writer to this base data directory.
94  /// </summary>
95  /// <param name="dataDirectory">Base data directory</param>
96  /// <param name="resolution">Resolution of the desired output data</param>
97  /// <param name="securityType">The security type</param>
98  /// <param name="tickType">The tick type</param>
99  /// <param name="dataCacheProvider">The data cache provider to use</param>
100  /// <param name="writePolicy">The file write policy to use</param>
101  public LeanDataWriter(string dataDirectory, Resolution resolution, SecurityType securityType, TickType tickType,
102  IDataCacheProvider dataCacheProvider = null, WritePolicy? writePolicy = null)
103  {
104  _dataDirectory = dataDirectory;
105  _resolution = resolution;
106  _securityType = securityType;
107  _tickType = tickType;
108  if (writePolicy == null)
109  {
110  _writePolicy = resolution >= Resolution.Hour ? WritePolicy.Merge : WritePolicy.Overwrite;
111  }
112  else
113  {
114  _writePolicy = writePolicy.Value;
115  }
116  _dataCacheProvider = dataCacheProvider ?? new DiskDataCacheProvider();
117  }
118 
119  /// <summary>
120  /// Given the constructor parameters, write out the data in LEAN format.
121  /// </summary>
122  /// <param name="source">IEnumerable source of the data: sorted from oldest to newest.</param>
123  public void Write(IEnumerable<BaseData> source)
124  {
125  var lastTime = DateTime.MinValue;
126  var outputFile = string.Empty;
127  Symbol symbol = null;
128  var currentFileData = new List<TimedLine>();
129  var writeTasks = new Queue<Task>();
130 
131  foreach (var data in source)
132  {
133  // Ensure the data is sorted as a safety check
134  if (data.Time < lastTime) throw new Exception("The data must be pre-sorted from oldest to newest");
135 
136  // Update our output file
137  // Only do this on date change, because we know we don't have a any data zips smaller than a day, saves time
138  if (data.Time.Date != lastTime.Date)
139  {
140  var mappedSymbol = GetMappedSymbol(data.Time, data.Symbol);
141  // Get the latest file name, if it has changed, we have entered a new file, write our current data to file
142  var latestOutputFile = GetZipOutputFileName(_dataDirectory, data.Time, mappedSymbol);
143  var latestSymbol = mappedSymbol;
144  if (outputFile.IsNullOrEmpty() || outputFile != latestOutputFile)
145  {
146  if (!currentFileData.IsNullOrEmpty())
147  {
148  // Launch a write task for the current file and data set
149  var file = outputFile;
150  var fileData = currentFileData;
151  var fileSymbol = symbol;
152  writeTasks.Enqueue(Task.Run(() =>
153  {
154  WriteFile(file, fileData, fileSymbol);
155  }));
156  }
157 
158  // Reset our dictionary and store new output file
159  currentFileData = new List<TimedLine>();
160  outputFile = latestOutputFile;
161  symbol = latestSymbol;
162  }
163  }
164 
165  // Add data to our current dictionary
166  var line = LeanData.GenerateLine(data, _securityType, _resolution);
167  currentFileData.Add(new TimedLine(data.Time, line));
168 
169  // Update our time
170  lastTime = data.Time;
171  }
172 
173  // Finish off my processing the last file as well
174  if (!currentFileData.IsNullOrEmpty())
175  {
176  // we want to finish ASAP so let's do it ourselves
177  WriteFile(outputFile, currentFileData, symbol);
178  }
179 
180  // Wait for all our write tasks to finish
181  while (writeTasks.Count > 0)
182  {
183  var task = writeTasks.Dequeue();
184  task.Wait();
185  }
186  }
187 
188  /// <summary>
189  /// Downloads historical data from the brokerage and saves it in LEAN format.
190  /// </summary>
191  /// <param name="brokerage">The brokerage from where to fetch the data</param>
192  /// <param name="symbols">The list of symbols</param>
193  /// <param name="startTimeUtc">The starting date/time (UTC)</param>
194  /// <param name="endTimeUtc">The ending date/time (UTC)</param>
195  public void DownloadAndSave(IBrokerage brokerage, List<Symbol> symbols, DateTime startTimeUtc, DateTime endTimeUtc)
196  {
197  if (symbols.Count == 0)
198  {
199  throw new ArgumentException("DownloadAndSave(): The symbol list cannot be empty.");
200  }
201 
202  if (_tickType != TickType.Trade && _tickType != TickType.Quote)
203  {
204  throw new ArgumentException("DownloadAndSave(): The tick type must be Trade or Quote.");
205  }
206 
207  if (symbols.Any(x => x.SecurityType != _securityType))
208  {
209  throw new ArgumentException($"DownloadAndSave(): All symbols must have {_securityType} security type.");
210  }
211 
212  if (symbols.DistinctBy(x => x.ID.Symbol).Count() > 1)
213  {
214  throw new ArgumentException("DownloadAndSave(): All symbols must have the same root ticker.");
215  }
216 
217  var dataType = LeanData.GetDataType(_resolution, _tickType);
218 
219  var marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
220 
221  var ticker = symbols.First().ID.Symbol;
222  var market = symbols.First().ID.Market;
223 
224  var canonicalSymbol = Symbol.Create(ticker, _securityType, market);
225 
226  var exchangeHours = marketHoursDatabase.GetExchangeHours(canonicalSymbol.ID.Market, canonicalSymbol, _securityType);
227  var dataTimeZone = marketHoursDatabase.GetDataTimeZone(canonicalSymbol.ID.Market, canonicalSymbol, _securityType);
228 
229  foreach (var symbol in symbols)
230  {
231  var historyRequest = new HistoryRequest(
232  startTimeUtc,
233  endTimeUtc,
234  dataType,
235  symbol,
236  _resolution,
237  exchangeHours,
238  dataTimeZone,
239  _resolution,
240  true,
241  false,
243  _tickType
244  );
245 
246  var history = brokerage.GetHistory(historyRequest)?
247  .Select(
248  x =>
249  {
250  // Convert to date timezone before we write it
251  x.Time = x.Time.ConvertTo(exchangeHours.TimeZone, dataTimeZone);
252  return x;
253  })?
254  .ToList();
255 
256  if (history == null)
257  {
258  continue;
259  }
260 
261  // Generate a writer for this data and write it
262  var writer = new LeanDataWriter(_resolution, symbol, _dataDirectory, _tickType);
263  writer.Write(history);
264  }
265  }
266 
267  /// <summary>
268  /// Loads an existing Lean zip file into a SortedDictionary
269  /// </summary>
270  private bool TryLoadFile(string fileName, string entryName, DateTime date, out SortedDictionary<DateTime, string> rows)
271  {
272  rows = new SortedDictionary<DateTime, string>();
273 
274  using (var stream = _dataCacheProvider.Fetch($"{fileName}#{entryName}"))
275  {
276  if (stream == null)
277  {
278  return false;
279  }
280 
281  using (var reader = new StreamReader(stream))
282  {
283  string line;
284  while ((line = reader.ReadLine()) != null)
285  {
286  rows[LeanData.ParseTime(line, date, _resolution)] = line;
287  }
288  }
289 
290  return true;
291  }
292  }
293 
294  /// <summary>
295  /// Write this file to disk with the given data.
296  /// </summary>
297  /// <param name="filePath">The full path to the new file</param>
298  /// <param name="data">The data to write as a list of dates and strings</param>
299  /// <param name="symbol">The symbol associated with this data</param>
300  /// <remarks>The reason we have the data as IEnumerable(DateTime, string) is to support
301  /// a generic write that works for all resolutions. In order to merge in hour/daily case I need the
302  /// date of the data to correctly merge the two. In order to support writing ticks I need to allow
303  /// two data points to have the same time. Thus I cannot use a single list of just strings nor
304  /// a sorted dictionary of DateTimes and strings. </remarks>
305  private void WriteFile(string filePath, List<TimedLine> data, Symbol symbol)
306  {
307  filePath = FileExtension.ToNormalizedPath(filePath);
308  if (data == null || data.Count == 0)
309  {
310  return;
311  }
312 
313  // because we read & write the same file we need to take a lock per file path so we don't read something that might get outdated
314  // by someone writting to the same path at the same time
315  _keySynchronizer.Execute(filePath, singleExecution: false, () =>
316  {
317  var date = data[0].Time;
318  // Generate this csv entry name
319  var entryName = LeanData.GenerateZipEntryName(symbol, date, _resolution, _tickType);
320 
321  // Check disk once for this file ahead of time, reuse where possible
322  var fileExists = File.Exists(filePath);
323 
324  // If our file doesn't exist its possible the directory doesn't exist, make sure at least the directory exists
325  if (!fileExists)
326  {
327  Directory.CreateDirectory(Path.GetDirectoryName(filePath));
328  }
329 
330  // Handle merging of files
331  // Only merge on files with hour/daily resolution, that exist, and can be loaded
332  string finalData = null;
333  if (_writePolicy == WritePolicy.Append)
334  {
335  var streamWriter = new ZipStreamWriter(filePath, entryName);
336  foreach (var tuple in data)
337  {
338  streamWriter.WriteLine(tuple.Line);
339  }
340  streamWriter.DisposeSafely();
341  }
342  else if (_writePolicy == WritePolicy.Merge && fileExists && TryLoadFile(filePath, entryName, date, out var rows))
343  {
344  // Preform merge on loaded rows
345  foreach (var timedLine in data)
346  {
347  rows[timedLine.Time] = timedLine.Line;
348  }
349 
350  // Final merged data product
351  finalData = string.Join("\n", rows.Values);
352  }
353  else
354  {
355  // Otherwise just extract the data from the given list.
356  finalData = string.Join("\n", data.Select(x => x.Line));
357  }
358 
359  if (finalData != null)
360  {
361  var bytes = Encoding.UTF8.GetBytes(finalData);
362  _dataCacheProvider.Store($"{filePath}#{entryName}", bytes);
363  }
364 
365  if (Log.DebuggingEnabled)
366  {
367  var from = data[0].Time.Date.ToString(DateFormat.EightCharacter, CultureInfo.InvariantCulture);
368  var to = data[data.Count - 1].Time.Date.ToString(DateFormat.EightCharacter, CultureInfo.InvariantCulture);
369  Log.Debug($"LeanDataWriter.Write({symbol.ID}): Appended: {filePath} @ {entryName} {from}->{to}");
370  }
371  });
372  }
373 
374  /// <summary>
375  /// Get the output zip file
376  /// </summary>
377  /// <param name="baseDirectory">Base output directory for the zip file</param>
378  /// <param name="time">Date/time for the data we're writing</param>
379  /// <param name="symbol">The associated symbol. For example for options/futures it will be different than the canonical at <see cref="_symbol"/></param>
380  /// <returns>The full path to the output zip file</returns>
381  private string GetZipOutputFileName(string baseDirectory, DateTime time, Symbol symbol)
382  {
383  return LeanData.GenerateZipFilePath(baseDirectory, symbol, time, _resolution, _tickType);
384  }
385 
386  /// <summary>
387  /// Helper method to map a symbol if required at the given date
388  /// </summary>
389  private Symbol GetMappedSymbol(DateTime time, Symbol symbol)
390  {
391  if (!_mapSymbol)
392  {
393  return _symbol;
394  }
395  if (symbol.RequiresMapping())
396  {
397  var mapFileResolver = MapFileProvider.Value.Get(AuxiliaryDataKey.Create(symbol.ID));
398  var mapFile = mapFileResolver.ResolveMapFile(symbol);
399  var mappedTicker = mapFile.GetMappedSymbol(time);
400  if(!string.IsNullOrEmpty(mappedTicker))
401  {
402  // only update if we got something to map to
403  symbol = symbol.UpdateMappedSymbol(mappedTicker);
404  }
405  }
406 
407  return symbol;
408  }
409 
410  private class TimedLine
411  {
412  public string Line { get; }
413  public DateTime Time { get; }
414  public TimedLine(DateTime time, string line)
415  {
416  Line = line;
417  Time = time;
418  }
419  }
420  }
421 }