Lean  $LEAN_TAG$
LocalObjectStore.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections;
18 using System.Collections.Concurrent;
19 using System.Collections.Generic;
20 using System.IO;
21 using System.Linq;
22 using System.Text.RegularExpressions;
23 using System.Threading;
26 using QuantConnect.Logging;
27 using QuantConnect.Packets;
28 using QuantConnect.Util;
29 
31 {
32  /// <summary>
33  /// A local disk implementation of <see cref="IObjectStore"/>.
34  /// </summary>
36  {
37  /// <summary>
38  /// No read permissions error message
39  /// </summary>
40  protected const string NoReadPermissionsError = "The current user does not have permission to read from the organization Object Store." +
41  " Please contact your organization administrator to request permission.";
42 
43  /// <summary>
44  /// No write permissions error message
45  /// </summary>
46  protected const string NoWritePermissionsError = "The current user does not have permission to write to the organization Object Store." +
47  " Please contact your organization administrator to request permission.";
48 
49  /// <summary>
50  /// Event raised each time there's an error
51  /// </summary>
52  public event EventHandler<ObjectStoreErrorRaisedEventArgs> ErrorRaised;
53 
54  /// <summary>
55  /// Gets the default object store location
56  /// </summary>
57  public static string DefaultObjectStore => Path.GetFullPath(Config.Get("object-store-root", "./storage"));
58 
59  /// <summary>
60  /// Flag indicating the state of this object storage has changed since the last <seealso cref="Persist"/> invocation
61  /// </summary>
62  private volatile bool _dirty;
63 
64  private Timer _persistenceTimer;
65  private Regex _pathRegex = new (@"^\.?[a-zA-Z0-9\\/_#\-\$= ]+\.?[a-zA-Z0-9]*$", RegexOptions.Compiled);
66  private readonly ConcurrentDictionary<string, ObjectStoreEntry> _storage = new();
67  private readonly object _persistLock = new object();
68 
69  /// <summary>
70  /// Provides access to the controls governing behavior of this instance, such as the persistence interval
71  /// </summary>
72  protected Controls Controls { get; private set; }
73 
74  /// <summary>
75  /// The root storage folder for the algorithm
76  /// </summary>
77  protected string AlgorithmStorageRoot { get; private set; }
78 
79  /// <summary>
80  /// The file handler instance to use
81  /// </summary>
82  protected FileHandler FileHandler { get; set; } = new ();
83 
84  /// <summary>
85  /// Initializes the object store
86  /// </summary>
87  /// <param name="userId">The user id</param>
88  /// <param name="projectId">The project id</param>
89  /// <param name="userToken">The user token</param>
90  /// <param name="controls">The job controls instance</param>
91  public virtual void Initialize(int userId, int projectId, string userToken, Controls controls)
92  {
94 
95  // create the root path if it does not exist
97 
98  Controls = controls;
99 
100  // if <= 0 we disable periodic persistence and make it synchronous
102  {
103  _persistenceTimer = new Timer(_ => Persist(), null, Controls.PersistenceIntervalSeconds * 1000, Timeout.Infinite);
104  }
105 
106  Log.Trace($"LocalObjectStore.Initialize(): Storage Root: {directoryInfo.FullName}. StorageFileCount {controls.StorageFileCount}. StorageLimit {BytesToMb(controls.StorageLimit)}MB");
107  }
108 
109  /// <summary>
110  /// Storage root path
111  /// </summary>
112  protected virtual string StorageRoot()
113  {
114  return DefaultObjectStore;
115  }
116 
117  /// <summary>
118  /// Loads objects from the AlgorithmStorageRoot into the ObjectStore
119  /// </summary>
120  private IEnumerable<ObjectStoreEntry> GetObjectStoreEntries(bool loadContent, bool takePersistLock = true)
121  {
122  if (Controls.StoragePermissions.HasFlag(FileAccess.Read))
123  {
124  // Acquire the persist lock to avoid yielding twice the same value, just in case
125  lock (takePersistLock ? _persistLock : new object())
126  {
127  foreach (var kvp in _storage)
128  {
129  if (!loadContent || kvp.Value.Data != null)
130  {
131  // let's first serve what we already have in memory because it might include files which are not on disk yet
132  yield return kvp.Value;
133  }
134  }
135 
136  foreach (var file in FileHandler.EnumerateFiles(AlgorithmStorageRoot, "*", SearchOption.AllDirectories, out var rootFolder))
137  {
138  var path = NormalizePath(file.FullName.RemoveFromStart(rootFolder));
139 
140  ObjectStoreEntry objectStoreEntry;
141  if (loadContent)
142  {
143  if (!_storage.TryGetValue(path, out objectStoreEntry) || objectStoreEntry.Data == null)
144  {
145  if(TryCreateObjectStoreEntry(file.FullName, path, out objectStoreEntry))
146  {
147  // load file if content is null or not present, we prioritize the version we have in memory
148  yield return _storage[path] = objectStoreEntry;
149  }
150  }
151  }
152  else
153  {
154  if (!_storage.ContainsKey(path))
155  {
156  // we do not read the file contents yet, just the name. We read the contents on demand
157  yield return _storage[path] = new ObjectStoreEntry(path, null);
158  }
159  }
160  }
161  }
162  }
163  }
164 
165  /// <summary>
166  /// Returns the file paths present in the object store. This is specially useful not to load the object store into memory
167  /// </summary>
168  public ICollection<string> Keys
169  {
170  get
171  {
172  return GetObjectStoreEntries(loadContent: false).Select(objectStoreEntry => objectStoreEntry.Path).ToList();
173  }
174  }
175 
176  /// <summary>
177  /// Will clear the object store state cache. This is useful when the object store is used concurrently by nodes which want to share information
178  /// </summary>
179  public void Clear()
180  {
181  // write to disk anything pending first
182  Persist();
183 
184  _storage.Clear();
185  }
186 
187  /// <summary>
188  /// Determines whether the store contains data for the specified path
189  /// </summary>
190  /// <param name="path">The object path</param>
191  /// <returns>True if the key was found</returns>
192  public bool ContainsKey(string path)
193  {
194  if (path == null)
195  {
196  throw new ArgumentNullException(nameof(path));
197  }
198  if (!Controls.StoragePermissions.HasFlag(FileAccess.Read))
199  {
200  throw new InvalidOperationException($"LocalObjectStore.ContainsKey(): {NoReadPermissionsError}");
201  }
202 
203  path = NormalizePath(path);
204  if (_storage.ContainsKey(path))
205  {
206  return true;
207  }
208 
209  // if we don't have the file but it exists, be friendly and register it
210  var filePath = PathForKey(path);
211  if (FileHandler.Exists(filePath))
212  {
213  _storage[path] = new ObjectStoreEntry(path, null);
214  return true;
215  }
216  return false;
217  }
218 
219  /// <summary>
220  /// Returns the object data for the specified path
221  /// </summary>
222  /// <param name="path">The object path</param>
223  /// <returns>A byte array containing the data</returns>
224  public byte[] ReadBytes(string path)
225  {
226  // Ensure we have the key, also takes care of null or improper access
227  if (!ContainsKey(path))
228  {
229  throw new KeyNotFoundException($"Object with path '{path}' was not found in the current project. " +
230  "Please use ObjectStore.ContainsKey(key) to check if an object exists before attempting to read."
231  );
232  }
233  path = NormalizePath(path);
234 
235  if(!_storage.TryGetValue(path, out var objectStoreEntry) || objectStoreEntry.Data == null)
236  {
237  var filePath = PathForKey(path);
238  if (TryCreateObjectStoreEntry(filePath, path, out objectStoreEntry))
239  {
240  // if there is no data in the cache and the file exists on disk let's load it
241  _storage[path] = objectStoreEntry;
242  }
243  }
244  return objectStoreEntry?.Data;
245  }
246 
247  /// <summary>
248  /// Saves the object data for the specified path
249  /// </summary>
250  /// <param name="path">The object path</param>
251  /// <param name="contents">The object data</param>
252  /// <returns>True if the save operation was successful</returns>
253  public bool SaveBytes(string path, byte[] contents)
254  {
255  if (path == null)
256  {
257  throw new ArgumentNullException(nameof(path));
258  }
259  else if (!Controls.StoragePermissions.HasFlag(FileAccess.Write))
260  {
261  throw new InvalidOperationException($"LocalObjectStore.SaveBytes(): {NoWritePermissionsError}");
262  }
263  else if (!_pathRegex.IsMatch(path))
264  {
265  throw new ArgumentException($"LocalObjectStore: path is not supported: '{path}'");
266  }
267  else if (path.Count(c => c == '/') > 100 || path.Count(c => c == '\\') > 100)
268  {
269  // just in case
270  throw new ArgumentException($"LocalObjectStore: path is not supported: '{path}'");
271  }
272 
273  // after we check the regex
274  path = NormalizePath(path);
275 
276  if (InternalSaveBytes(path, contents)
277  // only persist if we actually stored some new data, else can skip
278  && contents != null)
279  {
280  _dirty = true;
281  // if <= 0 we disable periodic persistence and make it synchronous
283  {
284  Persist();
285  }
286  return true;
287  }
288 
289  return false;
290  }
291 
292  /// <summary>
293  /// Won't trigger persist nor will check storage write permissions, useful on initialization since it allows read only permissions to load the object store
294  /// </summary>
295  protected bool InternalSaveBytes(string path, byte[] contents)
296  {
297  if(!IsWithinStorageLimit(path, contents, takePersistLock: true))
298  {
299  return false;
300  }
301 
302  // Add the dirty entry
303  var entry = _storage[path] = new ObjectStoreEntry(path, contents);
304  entry.SetDirty();
305  return true;
306  }
307 
308  /// <summary>
309  /// Validates storage limits are respected on a new save operation
310  /// </summary>
311  protected virtual bool IsWithinStorageLimit(string path, byte[] contents, bool takePersistLock)
312  {
313  // Before saving confirm we are abiding by the control rules
314  // Start by counting our file and its length
315  var fileCount = 1;
316  var expectedStorageSizeBytes = contents?.Length ?? 0L;
317  foreach (var kvp in GetObjectStoreEntries(loadContent: false, takePersistLock: takePersistLock))
318  {
319  if (path.Equals(kvp.Path))
320  {
321  // Skip we have already counted this above
322  // If this key was already in storage it will be replaced.
323  }
324  else
325  {
326  fileCount++;
327  if(kvp.Data != null)
328  {
329  // if the data is in memory use it
330  expectedStorageSizeBytes += kvp.Data.Length;
331  }
332  else
333  {
334  expectedStorageSizeBytes += FileHandler.TryGetFileLength(PathForKey(kvp.Path));
335  }
336  }
337  }
338 
339  // Verify we are within FileCount limit
340  if (fileCount > Controls.StorageFileCount)
341  {
342  var message = $"LocalObjectStore.InternalSaveBytes(): You have reached the ObjectStore limit for files it can save: {fileCount}. Unable to save the new file: '{path}'";
343  Log.Error(message);
345  return false;
346  }
347 
348  // Verify we are within Storage limit
349  if (expectedStorageSizeBytes > Controls.StorageLimit)
350  {
351  var message = $"LocalObjectStore.InternalSaveBytes(): at storage capacity: {BytesToMb(expectedStorageSizeBytes)}MB/{BytesToMb(Controls.StorageLimit)}MB. Unable to save: '{path}'";
352  Log.Error(message);
354  return false;
355  }
356 
357  return true;
358  }
359 
360  /// <summary>
361  /// Deletes the object data for the specified path
362  /// </summary>
363  /// <param name="path">The object path</param>
364  /// <returns>True if the delete operation was successful</returns>
365  public bool Delete(string path)
366  {
367  if (path == null)
368  {
369  throw new ArgumentNullException(nameof(path));
370  }
371  if (!Controls.StoragePermissions.HasFlag(FileAccess.Write))
372  {
373  throw new InvalidOperationException($"LocalObjectStore.Delete(): {NoWritePermissionsError}");
374  }
375 
376  path = NormalizePath(path);
377 
378  var wasInCache = _storage.TryRemove(path, out var _);
379 
380  var filePath = PathForKey(path);
381  if (FileHandler.Exists(filePath))
382  {
383  try
384  {
385  FileHandler.Delete(filePath);
386  return true;
387  }
388  catch
389  {
390  // This try sentence is to prevent a race condition with the Delete within the PersisData() method
391  }
392  }
393 
394  return wasInCache;
395  }
396 
397  /// <summary>
398  /// Returns the file path for the specified path
399  /// </summary>
400  /// <remarks>If the key is not already inserted it will just return a path associated with it
401  /// and add the key with null value</remarks>
402  /// <param name="path">The object path</param>
403  /// <returns>The path for the file</returns>
404  public virtual string GetFilePath(string path)
405  {
406  // Ensure we have an object for that key
407  if (!ContainsKey(path))
408  {
409  // Add a key with null value to tell Persist() not to delete the file created in the path associated
410  // with this key and not update it with the value associated with the key(null)
411  SaveBytes(path, null);
412  }
413  else
414  {
415  // Persist to ensure pur files are up to date
416  Persist();
417  }
418 
419  // Fetch the path to file and return it
420  return PathForKey(path);
421  }
422 
423  /// <summary>
424  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
425  /// </summary>
426  public virtual void Dispose()
427  {
428  try
429  {
430  if (_persistenceTimer != null)
431  {
432  _persistenceTimer.Change(Timeout.Infinite, Timeout.Infinite);
433 
434  Persist();
435 
436  _persistenceTimer.DisposeSafely();
437  }
438  }
439  catch (Exception err)
440  {
441  Log.Error(err, "Error deleting storage directory.");
442  }
443  }
444 
445  /// <summary>Returns an enumerator that iterates through the collection.</summary>
446  /// <returns>A <see cref="T:System.Collections.Generic.IEnumerator`1" /> that can be used to iterate through the collection.</returns>
447  /// <filterpriority>1</filterpriority>
448  public IEnumerator<KeyValuePair<string, byte[]>> GetEnumerator()
449  {
450  return GetObjectStoreEntries(loadContent: true).Select(objectStore => new KeyValuePair<string, byte[]>(objectStore.Path, objectStore.Data)).GetEnumerator();
451  }
452 
453  /// <summary>Returns an enumerator that iterates through a collection.</summary>
454  /// <returns>An <see cref="T:System.Collections.IEnumerator" /> object that can be used to iterate through the collection.</returns>
455  /// <filterpriority>2</filterpriority>
456  IEnumerator IEnumerable.GetEnumerator()
457  {
458  return GetEnumerator();
459  }
460 
461  /// <summary>
462  /// Get's a file path for a given path.
463  /// Internal use only because it does not guarantee the existence of the file.
464  /// </summary>
465  protected string PathForKey(string path)
466  {
467  return Path.Combine(AlgorithmStorageRoot, NormalizePath(path));
468  }
469 
470  /// <summary>
471  /// Invoked periodically to persist the object store's contents
472  /// </summary>
473  private void Persist()
474  {
475  // Acquire the persist lock
476  lock (_persistLock)
477  {
478  try
479  {
480  // If there are no changes we are fine
481  if (!_dirty)
482  {
483  return;
484  }
485 
486  if (PersistData())
487  {
488  _dirty = false;
489  }
490  }
491  catch (Exception err)
492  {
493  Log.Error("LocalObjectStore.Persist()", err);
494  OnErrorRaised(err);
495  }
496  finally
497  {
498  try
499  {
500  if(_persistenceTimer != null)
501  {
502  // restart timer following end of persistence
503  _persistenceTimer.Change(Time.GetSecondUnevenWait(Controls.PersistenceIntervalSeconds * 1000), Timeout.Infinite);
504  }
505  }
506  catch (ObjectDisposedException)
507  {
508  // ignored disposed
509  }
510  }
511  }
512  }
513 
514  /// <summary>
515  /// Overridable persistence function
516  /// </summary>
517  /// <returns>True if persistence was successful, otherwise false</returns>
518  protected virtual bool PersistData()
519  {
520  try
521  {
522  // Write our store data to disk
523  // Skip the key associated with null values. They are not linked to a file yet or not loaded
524  // Also skip fails which are not flagged as dirty
525  foreach (var kvp in _storage)
526  {
527  if(kvp.Value.Data != null && kvp.Value.IsDirty)
528  {
529  var filePath = PathForKey(kvp.Key);
530  // directory might not exist for custom prefix
531  var parentDirectory = Path.GetDirectoryName(filePath);
532  if (!FileHandler.DirectoryExists(parentDirectory))
533  {
534  FileHandler.CreateDirectory(parentDirectory);
535  }
536  FileHandler.WriteAllBytes(filePath, kvp.Value.Data);
537 
538  // clear the dirty flag
539  kvp.Value.SetClean();
540 
541  // This kvp could have been deleted by the Delete() method
542  if (!_storage.Contains(kvp))
543  {
544  try
545  {
546  FileHandler.Delete(filePath);
547  }
548  catch
549  {
550  // This try sentence is to prevent a race condition with the Delete() method
551  }
552  }
553  }
554  }
555 
556  return true;
557  }
558  catch (Exception err)
559  {
560  Log.Error(err, "LocalObjectStore.PersistData()");
561  OnErrorRaised(err);
562  return false;
563  }
564  }
565 
566  /// <summary>
567  /// Event invocator for the <see cref="ErrorRaised"/> event
568  /// </summary>
569  protected virtual void OnErrorRaised(Exception error)
570  {
571  ErrorRaised?.Invoke(this, new ObjectStoreErrorRaisedEventArgs(error));
572  }
573 
574  /// <summary>
575  /// Converts a number of bytes to megabytes as it's more human legible
576  /// </summary>
577  private static double BytesToMb(long bytes)
578  {
579  return bytes / 1024.0 / 1024.0;
580  }
581 
582  private static string NormalizePath(string path)
583  {
584  if (string.IsNullOrEmpty(path))
585  {
586  return path;
587  }
588  return path.TrimStart('.').TrimStart('/', '\\').Replace('\\', '/');
589  }
590 
591  private bool TryCreateObjectStoreEntry(string filePath, string path, out ObjectStoreEntry objectStoreEntry)
592  {
593  var count = 0;
594  do
595  {
596  count++;
597  try
598  {
599  if (FileHandler.Exists(filePath))
600  {
601  objectStoreEntry = new ObjectStoreEntry(path, FileHandler.ReadAllBytes(filePath));
602  return true;
603  }
604  objectStoreEntry = null;
605  return false;
606  }
607  catch (Exception)
608  {
609  if (count > 3)
610  {
611  throw;
612  }
613  else
614  {
615  // let's be resilient and retry, avoid race conditions, someone updating it or just random io failure
616  Thread.Sleep(250);
617  }
618  }
619  } while (true);
620  }
621 
622  /// <summary>
623  /// Helper class to hold the state of an object store file
624  /// </summary>
625  private class ObjectStoreEntry
626  {
627  private long _isDirty;
628  public byte[] Data { get; }
629  public string Path { get; }
630  public bool IsDirty => Interlocked.Read(ref _isDirty) != 0;
631  public ObjectStoreEntry(string path, byte[] data)
632  {
633  Path = path;
634  Data = data;
635  }
636  public void SetDirty()
637  {
638  // flag as dirty
639  Interlocked.CompareExchange(ref _isDirty, 1, 0);
640  }
641  public void SetClean()
642  {
643  Interlocked.CompareExchange(ref _isDirty, 0, 1);
644  }
645  }
646  }
647 }