Lean  $LEAN_TAG$
BrokerageSetupHandler.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections.Generic;
18 using System.Globalization;
19 using System.Linq;
20 using System.Reflection;
21 using Fasterflect;
25 using QuantConnect.Data;
31 using QuantConnect.Logging;
32 using QuantConnect.Packets;
34 using QuantConnect.Util;
35 
37 {
38  /// <summary>
39  /// Defines a set up handler that initializes the algorithm instance using values retrieved from the user's brokerage account
40  /// </summary>
42  {
43  /// <summary>
44  /// Max allocation limit configuration variable name
45  /// </summary>
46  public static string MaxAllocationLimitConfig = "max-allocation-limit";
47 
48  /// <summary>
49  /// The worker thread instance the setup handler should use
50  /// </summary>
51  public WorkerThread WorkerThread { get; set; }
52 
53  /// <summary>
54  /// Any errors from the initialization stored here:
55  /// </summary>
56  public List<Exception> Errors { get; set; }
57 
58  /// <summary>
59  /// Get the maximum runtime for this algorithm job.
60  /// </summary>
61  public TimeSpan MaximumRuntime { get; }
62 
63  /// <summary>
64  /// Algorithm starting capital for statistics calculations
65  /// </summary>
66  public decimal StartingPortfolioValue { get; private set; }
67 
68  /// <summary>
69  /// Start date for analysis loops to search for data.
70  /// </summary>
71  public DateTime StartingDate { get; private set; }
72 
73  /// <summary>
74  /// Maximum number of orders for the algorithm run -- applicable for backtests only.
75  /// </summary>
76  public int MaxOrders { get; }
77 
78  // saves ref to algo so we can call quit if runtime error encountered
79  private IBrokerageFactory _factory;
80  private IBrokerage _dataQueueHandlerBrokerage;
81 
82  /// <summary>
83  /// Initializes a new BrokerageSetupHandler
84  /// </summary>
86  {
87  Errors = new List<Exception>();
88  MaximumRuntime = TimeSpan.FromDays(10*365);
89  MaxOrders = int.MaxValue;
90  }
91 
92  /// <summary>
93  /// Create a new instance of an algorithm from a physical dll path.
94  /// </summary>
95  /// <param name="assemblyPath">The path to the assembly's location</param>
96  /// <param name="algorithmNodePacket">Details of the task required</param>
97  /// <returns>A new instance of IAlgorithm, or throws an exception if there was an error</returns>
98  public IAlgorithm CreateAlgorithmInstance(AlgorithmNodePacket algorithmNodePacket, string assemblyPath)
99  {
100  string error;
101  IAlgorithm algorithm;
102 
103  // limit load times to 10 seconds and force the assembly to have exactly one derived type
104  var loader = new Loader(false, algorithmNodePacket.Language, BaseSetupHandler.AlgorithmCreationTimeout, names => names.SingleOrAlgorithmTypeName(Config.Get("algorithm-type-name", algorithmNodePacket.AlgorithmId)), WorkerThread);
105  var complete = loader.TryCreateAlgorithmInstanceWithIsolator(assemblyPath, algorithmNodePacket.RamAllocation, out algorithm, out error);
106  if (!complete) throw new AlgorithmSetupException($"During the algorithm initialization, the following exception has occurred: {error}");
107 
108  return algorithm;
109  }
110 
111  /// <summary>
112  /// Creates the brokerage as specified by the job packet
113  /// </summary>
114  /// <param name="algorithmNodePacket">Job packet</param>
115  /// <param name="uninitializedAlgorithm">The algorithm instance before Initialize has been called</param>
116  /// <param name="factory">The brokerage factory</param>
117  /// <returns>The brokerage instance, or throws if error creating instance</returns>
118  public IBrokerage CreateBrokerage(AlgorithmNodePacket algorithmNodePacket, IAlgorithm uninitializedAlgorithm, out IBrokerageFactory factory)
119  {
120  var liveJob = algorithmNodePacket as LiveNodePacket;
121  if (liveJob == null)
122  {
123  throw new ArgumentException("BrokerageSetupHandler.CreateBrokerage requires a live node packet");
124  }
125 
126  Log.Trace($"BrokerageSetupHandler.CreateBrokerage(): creating brokerage '{liveJob.Brokerage}'");
127 
128  // find the correct brokerage factory based on the specified brokerage in the live job packet
129  _factory = Composer.Instance.Single<IBrokerageFactory>(brokerageFactory => brokerageFactory.BrokerageType.MatchesTypeName(liveJob.Brokerage));
130  factory = _factory;
131 
132  PreloadDataQueueHandler(liveJob, uninitializedAlgorithm, factory);
133 
134  // initialize the correct brokerage using the resolved factory
135  var brokerage = _factory.CreateBrokerage(liveJob, uninitializedAlgorithm);
136 
137  return brokerage;
138  }
139 
140  /// <summary>
141  /// Primary entry point to setup a new algorithm
142  /// </summary>
143  /// <param name="parameters">The parameters object to use</param>
144  /// <returns>True on successfully setting up the algorithm state, or false on error.</returns>
145  public bool Setup(SetupHandlerParameters parameters)
146  {
147  var algorithm = parameters.Algorithm;
148  var brokerage = parameters.Brokerage;
149  // verify we were given the correct job packet type
150  var liveJob = parameters.AlgorithmNodePacket as LiveNodePacket;
151  if (liveJob == null)
152  {
153  AddInitializationError("BrokerageSetupHandler requires a LiveNodePacket");
154  return false;
155  }
156 
157  algorithm.Name = liveJob.GetAlgorithmName();
158 
159  // verify the brokerage was specified
160  if (string.IsNullOrWhiteSpace(liveJob.Brokerage))
161  {
162  AddInitializationError("A brokerage must be specified");
163  return false;
164  }
165 
166  BaseSetupHandler.Setup(parameters);
167 
168  // attach to the message event to relay brokerage specific initialization messages
169  EventHandler<BrokerageMessageEvent> brokerageOnMessage = (sender, args) =>
170  {
171  if (args.Type == BrokerageMessageType.Error)
172  {
173  AddInitializationError($"Brokerage Error Code: {args.Code} - {args.Message}");
174  }
175  };
176 
177  try
178  {
179  // let the world know what we're doing since logging in can take a minute
180  parameters.ResultHandler.SendStatusUpdate(AlgorithmStatus.LoggingIn, "Logging into brokerage...");
181 
182  brokerage.Message += brokerageOnMessage;
183 
184  Log.Trace("BrokerageSetupHandler.Setup(): Connecting to brokerage...");
185  try
186  {
187  // this can fail for various reasons, such as already being logged in somewhere else
188  brokerage.Connect();
189  }
190  catch (Exception err)
191  {
192  Log.Error(err);
193  AddInitializationError(
194  $"Error connecting to brokerage: {err.Message}. " +
195  "This may be caused by incorrect login credentials or an unsupported account type.", err);
196  return false;
197  }
198 
199  if (!brokerage.IsConnected)
200  {
201  // if we're reporting that we're not connected, bail
202  AddInitializationError("Unable to connect to brokerage.");
203  return false;
204  }
205 
206  var message = $"{brokerage.Name} account base currency: {brokerage.AccountBaseCurrency ?? algorithm.AccountCurrency}";
207 
208 
209  var accountCurrency = brokerage.AccountBaseCurrency;
210  if (liveJob.BrokerageData.ContainsKey(MaxAllocationLimitConfig))
211  {
212  accountCurrency = Currencies.USD;
213  message += ". Allocation limited, will use 'USD' account currency";
214  }
215 
216  Log.Trace($"BrokerageSetupHandler.Setup(): {message}");
217 
218  algorithm.Debug(message);
219  if (accountCurrency != null && accountCurrency != algorithm.AccountCurrency)
220  {
221  algorithm.SetAccountCurrency(accountCurrency);
222  }
223 
224  Log.Trace("BrokerageSetupHandler.Setup(): Initializing algorithm...");
225 
226  parameters.ResultHandler.SendStatusUpdate(AlgorithmStatus.Initializing, "Initializing algorithm...");
227 
228  //Execute the initialize code:
229  var controls = liveJob.Controls;
230  var isolator = new Isolator();
231  var initializeComplete = isolator.ExecuteWithTimeLimit(TimeSpan.FromSeconds(300), () =>
232  {
233  try
234  {
235  //Set the default brokerage model before initialize
236  algorithm.SetBrokerageModel(_factory.GetBrokerageModel(algorithm.Transactions));
237 
238  //Margin calls are disabled by default in live mode
239  algorithm.Portfolio.MarginCallModel = MarginCallModel.Null;
240 
241  //Set our parameters
242  algorithm.SetParameters(liveJob.Parameters);
243  algorithm.SetAvailableDataTypes(BaseSetupHandler.GetConfiguredDataFeeds());
244 
245  //Algorithm is live, not backtesting:
246  algorithm.SetAlgorithmMode(liveJob.AlgorithmMode);
247 
248  //Initialize the algorithm's starting date
249  algorithm.SetDateTime(DateTime.UtcNow);
250 
251  //Set the source impl for the event scheduling
252  algorithm.Schedule.SetEventSchedule(parameters.RealTimeHandler);
253 
254  var optionChainProvider = Composer.Instance.GetPart<IOptionChainProvider>();
255  if (optionChainProvider == null)
256  {
257  var baseOptionChainProvider = new LiveOptionChainProvider();
258  baseOptionChainProvider.Initialize(new(parameters.MapFileProvider, algorithm.HistoryProvider));
259  optionChainProvider = new CachingOptionChainProvider(baseOptionChainProvider);
260  Composer.Instance.AddPart(optionChainProvider);
261  }
262  // set the option chain provider
263  algorithm.SetOptionChainProvider(optionChainProvider);
264 
265  var futureChainProvider = Composer.Instance.GetPart<IFutureChainProvider>();
266  if (futureChainProvider == null)
267  {
268  var baseFutureChainProvider = new LiveFutureChainProvider();
269  baseFutureChainProvider.Initialize(new(parameters.MapFileProvider, algorithm.HistoryProvider));
270  futureChainProvider = new CachingFutureChainProvider(baseFutureChainProvider);
271  Composer.Instance.AddPart(futureChainProvider);
272  }
273  // set the future chain provider
274  algorithm.SetFutureChainProvider(futureChainProvider);
275 
276  //Initialise the algorithm, get the required data:
277  algorithm.Initialize();
278 
279  if (liveJob.Brokerage != "PaperBrokerage")
280  {
281  //Zero the CashBook - we'll populate directly from brokerage
282  foreach (var kvp in algorithm.Portfolio.CashBook)
283  {
284  kvp.Value.SetAmount(0);
285  }
286  }
287  }
288  catch (Exception err)
289  {
290  AddInitializationError(err.ToString(), err);
291  }
292  }, controls.RamAllocation,
293  sleepIntervalMillis: 100); // entire system is waiting on this, so be as fast as possible
294 
295  if (Errors.Count != 0)
296  {
297  // if we already got an error just exit right away
298  return false;
299  }
300 
301  if (!initializeComplete)
302  {
303  AddInitializationError("Initialization timed out.");
304  return false;
305  }
306 
307  if (!LoadCashBalance(brokerage, algorithm))
308  {
309  return false;
310  }
311 
312  if (!LoadExistingHoldingsAndOrders(brokerage, algorithm, parameters))
313  {
314  return false;
315  }
316 
317  // after algorithm was initialized, should set trading days per year for our great portfolio statistics
319 
320  var dataAggregator = Composer.Instance.GetPart<IDataAggregator>();
321  dataAggregator?.Initialize(new () { AlgorithmSettings = algorithm.Settings });
322 
323  //Finalize Initialization
324  algorithm.PostInitialize();
325 
327 
328  if (algorithm.Portfolio.TotalPortfolioValue == 0)
329  {
330  algorithm.Debug("Warning: No cash balances or holdings were found in the brokerage account.");
331  }
332 
333  string maxCashLimitStr;
334  if (liveJob.BrokerageData.TryGetValue(MaxAllocationLimitConfig, out maxCashLimitStr))
335  {
336  var maxCashLimit = decimal.Parse(maxCashLimitStr, NumberStyles.Any, CultureInfo.InvariantCulture);
337 
338  // If allocation exceeded by more than $10,000; block deployment
339  if (algorithm.Portfolio.TotalPortfolioValue > (maxCashLimit + 10000m))
340  {
341  var exceptionMessage = $"TotalPortfolioValue '{algorithm.Portfolio.TotalPortfolioValue}' exceeds allocation limit '{maxCashLimit}'";
342  algorithm.Debug(exceptionMessage);
343  throw new ArgumentException(exceptionMessage);
344  }
345  }
346 
347  //Set the starting portfolio value for the strategy to calculate performance:
348  StartingPortfolioValue = algorithm.Portfolio.TotalPortfolioValue;
349  StartingDate = DateTime.Now;
350  }
351  catch (Exception err)
352  {
353  AddInitializationError(err.ToString(), err);
354  }
355  finally
356  {
357  if (brokerage != null)
358  {
359  brokerage.Message -= brokerageOnMessage;
360  }
361  }
362 
363  return Errors.Count == 0;
364  }
365 
366  private bool LoadCashBalance(IBrokerage brokerage, IAlgorithm algorithm)
367  {
368  Log.Trace("BrokerageSetupHandler.Setup(): Fetching cash balance from brokerage...");
369  try
370  {
371  // set the algorithm's cash balance for each currency
372  var cashBalance = brokerage.GetCashBalance();
373  foreach (var cash in cashBalance)
374  {
375  Log.Trace($"BrokerageSetupHandler.Setup(): Setting {cash.Currency} cash to {cash.Amount}");
376 
377  algorithm.Portfolio.SetCash(cash.Currency, cash.Amount, 0);
378  }
379  }
380  catch (Exception err)
381  {
382  Log.Error(err);
383  AddInitializationError("Error getting cash balance from brokerage: " + err.Message, err);
384  return false;
385  }
386  return true;
387  }
388 
389  /// <summary>
390  /// Loads existing holdings and orders
391  /// </summary>
392  protected bool LoadExistingHoldingsAndOrders(IBrokerage brokerage, IAlgorithm algorithm, SetupHandlerParameters parameters)
393  {
394  Log.Trace("BrokerageSetupHandler.Setup(): Fetching open orders from brokerage...");
395  try
396  {
397  GetOpenOrders(algorithm, parameters.ResultHandler, parameters.TransactionHandler, brokerage);
398  }
399  catch (Exception err)
400  {
401  Log.Error(err);
402  AddInitializationError("Error getting open orders from brokerage: " + err.Message, err);
403  return false;
404  }
405 
406  Log.Trace("BrokerageSetupHandler.Setup(): Fetching holdings from brokerage...");
407  try
408  {
409  var utcNow = DateTime.UtcNow;
410 
411  // populate the algorithm with the account's current holdings
412  var holdings = brokerage.GetAccountHoldings();
413 
414  // add options first to ensure raw data normalization mode is set on the equity underlyings
415  foreach (var holding in holdings.OrderByDescending(x => x.Type))
416  {
417  Log.Trace("BrokerageSetupHandler.Setup(): Has existing holding: " + holding);
418 
419  // verify existing holding security type
420  Security security;
421  if (!GetOrAddUnrequestedSecurity(algorithm, holding.Symbol, holding.Type, out security))
422  {
423  continue;
424  }
425 
426  var exchangeTime = utcNow.ConvertFromUtc(security.Exchange.TimeZone);
427 
428  security.Holdings.SetHoldings(holding.AveragePrice, holding.Quantity);
429 
430  if (holding.MarketPrice == 0)
431  {
432  // try warming current market price
433  holding.MarketPrice = algorithm.GetLastKnownPrice(security)?.Price ?? 0;
434  }
435 
436  if (holding.MarketPrice != 0)
437  {
438  security.SetMarketPrice(new TradeBar
439  {
440  Time = exchangeTime,
441  Open = holding.MarketPrice,
442  High = holding.MarketPrice,
443  Low = holding.MarketPrice,
444  Close = holding.MarketPrice,
445  Volume = 0,
446  Symbol = holding.Symbol,
447  DataType = MarketDataType.TradeBar
448  });
449  }
450  }
451  }
452  catch (Exception err)
453  {
454  Log.Error(err);
455  AddInitializationError("Error getting account holdings from brokerage: " + err.Message, err);
456  return false;
457  }
458 
459  return true;
460  }
461 
462  private bool GetOrAddUnrequestedSecurity(IAlgorithm algorithm, Symbol symbol, SecurityType securityType, out Security security)
463  {
464  return algorithm.GetOrAddUnrequestedSecurity(symbol, out security,
465  onError: (supportedSecurityTypes) => AddInitializationError(
466  "Found unsupported security type in existing brokerage holdings: " + securityType + ". " +
467  "QuantConnect currently supports the following security types: " + string.Join(",", supportedSecurityTypes)));
468  }
469 
470  /// <summary>
471  /// Get the open orders from a brokerage. Adds <see cref="Orders.Order"/> and <see cref="Orders.OrderTicket"/> to the transaction handler
472  /// </summary>
473  /// <param name="algorithm">Algorithm instance</param>
474  /// <param name="resultHandler">The configured result handler</param>
475  /// <param name="transactionHandler">The configurated transaction handler</param>
476  /// <param name="brokerage">Brokerage output instance</param>
477  protected void GetOpenOrders(IAlgorithm algorithm, IResultHandler resultHandler, ITransactionHandler transactionHandler, IBrokerage brokerage)
478  {
479  // populate the algorithm with the account's outstanding orders
480  var openOrders = brokerage.GetOpenOrders();
481 
482  // add options first to ensure raw data normalization mode is set on the equity underlyings
483  foreach (var order in openOrders.OrderByDescending(x => x.SecurityType))
484  {
485  // verify existing holding security type
486  Security security;
487  if (!GetOrAddUnrequestedSecurity(algorithm, order.Symbol, order.SecurityType, out security))
488  {
489  continue;
490  }
491 
492  transactionHandler.AddOpenOrder(order, algorithm);
493  order.PriceCurrency = security?.SymbolProperties.QuoteCurrency;
494 
495  Log.Trace($"BrokerageSetupHandler.Setup(): Has open order: {order}");
496  resultHandler.DebugMessage($"BrokerageSetupHandler.Setup(): Open order detected. Creating order tickets for open order {order.Symbol.Value} with quantity {order.Quantity}. Beware that this order ticket may not accurately reflect the quantity of the order if the open order is partially filled.");
497  }
498  }
499 
500  /// <summary>
501  /// Adds initialization error to the Errors list
502  /// </summary>
503  /// <param name="message">The error message to be added</param>
504  /// <param name="inner">The inner exception being wrapped</param>
505  private void AddInitializationError(string message, Exception inner = null)
506  {
507  Errors.Add(new AlgorithmSetupException("During the algorithm initialization, the following exception has occurred: " + message, inner));
508  }
509 
510  /// <summary>
511  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
512  /// </summary>
513  /// <filterpriority>2</filterpriority>
514  public void Dispose()
515  {
516  _factory?.DisposeSafely();
517 
518  if (_dataQueueHandlerBrokerage != null)
519  {
520  if (_dataQueueHandlerBrokerage.IsConnected)
521  {
522  _dataQueueHandlerBrokerage.Disconnect();
523  }
524  _dataQueueHandlerBrokerage.DisposeSafely();
525  }
526  else
527  {
528  var dataQueueHandler = Composer.Instance.GetPart<IDataQueueHandler>();
529  if (dataQueueHandler != null)
530  {
531  Log.Trace($"BrokerageSetupHandler.Setup(): Found data queue handler to dispose: {dataQueueHandler.GetType()}");
532  dataQueueHandler.DisposeSafely();
533  }
534  else
535  {
536  Log.Trace("BrokerageSetupHandler.Setup(): did not find any data queue handler to dispose");
537  }
538  }
539  }
540 
541  private void PreloadDataQueueHandler(LiveNodePacket liveJob, IAlgorithm algorithm, IBrokerageFactory factory)
542  {
543  // preload the data queue handler using custom BrokerageFactory attribute
544  var dataQueueHandlerType = Assembly.GetAssembly(typeof(Brokerage))
545  .GetTypes()
546  .FirstOrDefault(x =>
547  x.FullName != null &&
548  x.FullName.EndsWith(liveJob.DataQueueHandler) &&
549  x.HasAttribute(typeof(BrokerageFactoryAttribute)));
550 
551  if (dataQueueHandlerType != null)
552  {
553  var attribute = dataQueueHandlerType.GetCustomAttribute<BrokerageFactoryAttribute>();
554 
555  // only load the data queue handler if the factory is different from our brokerage factory
556  if (attribute.Type != factory.GetType())
557  {
558  var brokerageFactory = (BrokerageFactory)Activator.CreateInstance(attribute.Type);
559 
560  // copy the brokerage data (usually credentials)
561  foreach (var kvp in brokerageFactory.BrokerageData)
562  {
563  if (!liveJob.BrokerageData.ContainsKey(kvp.Key))
564  {
565  liveJob.BrokerageData.Add(kvp.Key, kvp.Value);
566  }
567  }
568 
569  // create the data queue handler and add it to composer
570  _dataQueueHandlerBrokerage = brokerageFactory.CreateBrokerage(liveJob, algorithm);
571 
572  // open connection for subscriptions
573  _dataQueueHandlerBrokerage.Connect();
574  }
575  }
576  }
577  }
578 }