Lean  $LEAN_TAG$
BrokerageTransactionHandler.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections.Concurrent;
18 using System.Collections.Generic;
19 using System.Linq;
20 using System.Runtime.CompilerServices;
21 using System.Threading;
29 using QuantConnect.Logging;
30 using QuantConnect.Orders;
34 using QuantConnect.Util;
35 
37 {
38  /// <summary>
39  /// Transaction handler for all brokerages
40  /// </summary>
42  {
43  private IAlgorithm _algorithm;
44  private SignalExportManager _signalExport;
45  private IBrokerage _brokerage;
46  private bool _brokerageIsBacktesting;
47  private bool _loggedFeeAdjustmentWarning;
48 
49  // Counter to keep track of total amount of processed orders
50  private int _totalOrderCount;
51 
52  // this bool is used to check if the warning message for the rounding of order quantity has been displayed for the first time
53  private bool _firstRoundOffMessage = false;
54 
55  // this value is used for determining how confident we are in our cash balance update
56  private long _lastFillTimeTicks;
57 
58  private const int MaxCashSyncAttempts = 5;
59  private int _failedCashSyncAttempts;
60 
61  /// <summary>
62  /// OrderQueue holds the newly updated orders from the user algorithm waiting to be processed. Once
63  /// orders are processed they are moved into the Orders queue awaiting the brokerage response.
64  /// </summary>
66 
67  private Thread _processingThread;
68  private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
69 
70  private readonly ConcurrentQueue<OrderEvent> _orderEvents = new ConcurrentQueue<OrderEvent>();
71 
72  /// <summary>
73  /// The _completeOrders dictionary holds all orders.
74  /// Once the transaction thread has worked on them they get put here while witing for fill updates.
75  /// </summary>
76  private readonly ConcurrentDictionary<int, Order> _completeOrders = new ConcurrentDictionary<int, Order>();
77 
78  /// <summary>
79  /// The orders dictionary holds orders which are open. Status: New, Submitted, PartiallyFilled, None, CancelPending
80  /// Once the transaction thread has worked on them they get put here while witing for fill updates.
81  /// </summary>
82  private readonly ConcurrentDictionary<int, Order> _openOrders = new ConcurrentDictionary<int, Order>();
83 
84  /// <summary>
85  /// The _openOrderTickets dictionary holds open order tickets that the algorithm can use to reference a specific order. This
86  /// includes invoking update and cancel commands. In the future, we can add more features to the ticket, such as events
87  /// and async events (such as run this code when this order fills)
88  /// </summary>
89  private readonly ConcurrentDictionary<int, OrderTicket> _openOrderTickets = new ConcurrentDictionary<int, OrderTicket>();
90 
91  /// <summary>
92  /// The _completeOrderTickets dictionary holds all order tickets that the algorithm can use to reference a specific order. This
93  /// includes invoking update and cancel commands. In the future, we can add more features to the ticket, such as events
94  /// and async events (such as run this code when this order fills)
95  /// </summary>
96  private readonly ConcurrentDictionary<int, OrderTicket> _completeOrderTickets = new ConcurrentDictionary<int, OrderTicket>();
97 
98  /// <summary>
99  /// Cache collection of price adjustment modes for each symbol
100  /// </summary>
101  private readonly Dictionary<Symbol, DataNormalizationMode> _priceAdjustmentModes = new Dictionary<Symbol, DataNormalizationMode>();
102 
103  /// <summary>
104  /// The _cancelPendingOrders instance will help to keep track of CancelPending orders and their Status
105  /// </summary>
107 
108  private IResultHandler _resultHandler;
109 
110  private readonly object _lockHandleOrderEvent = new object();
111 
112  /// <summary>
113  /// Event fired when there is a new <see cref="OrderEvent"/>
114  /// </summary>
115  public event EventHandler<OrderEvent> NewOrderEvent;
116 
117  /// <summary>
118  /// Gets the permanent storage for all orders
119  /// </summary>
120  public ConcurrentDictionary<int, Order> Orders
121  {
122  get
123  {
124  return _completeOrders;
125  }
126  }
127 
128  /// <summary>
129  /// Gets all order events
130  /// </summary>
131  public IEnumerable<OrderEvent> OrderEvents => _orderEvents;
132 
133  /// <summary>
134  /// Gets the permanent storage for all order tickets
135  /// </summary>
136  public ConcurrentDictionary<int, OrderTicket> OrderTickets
137  {
138  get
139  {
140  return _completeOrderTickets;
141  }
142  }
143 
144  /// <summary>
145  /// Gets the current number of orders that have been processed
146  /// </summary>
147  public int OrdersCount => _totalOrderCount;
148 
149  /// <summary>
150  /// Creates a new BrokerageTransactionHandler to process orders using the specified brokerage implementation
151  /// </summary>
152  /// <param name="algorithm">The algorithm instance</param>
153  /// <param name="brokerage">The brokerage implementation to process orders and fire fill events</param>
154  /// <param name="resultHandler"></param>
155  public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResultHandler resultHandler)
156  {
157  if (brokerage == null)
158  {
159  throw new ArgumentNullException(nameof(brokerage));
160  }
161  // multi threaded queue, used for live deployments
163  // we don't need to do this today because we just initialized/synced
164  _resultHandler = resultHandler;
165 
166  _brokerage = brokerage;
167  _brokerageIsBacktesting = brokerage is BacktestingBrokerage;
168 
169  _brokerage.OrdersStatusChanged += (sender, orderEvents) =>
170  {
171  HandleOrderEvents(orderEvents);
172  };
173 
174  _brokerage.AccountChanged += (sender, account) =>
175  {
176  HandleAccountChanged(account);
177  };
178 
179  _brokerage.OptionPositionAssigned += (sender, fill) =>
180  {
181  HandlePositionAssigned(fill);
182  };
183 
184  _brokerage.OptionNotification += (sender, e) =>
185  {
186  HandleOptionNotification(e);
187  };
188 
189  _brokerage.NewBrokerageOrderNotification += (sender, e) =>
190  {
191  HandleNewBrokerageSideOrder(e);
192  };
193 
194  _brokerage.DelistingNotification += (sender, e) =>
195  {
196  HandleDelistingNotification(e);
197  };
198 
199  _brokerage.OrderIdChanged += (sender, e) =>
200  {
201  HandlerBrokerageOrderIdChangedEvent(e);
202  };
203 
204  _brokerage.OrderUpdated += (sender, e) =>
205  {
206  HandleOrderUpdated(e);
207  };
208 
209  IsActive = true;
210 
211  _algorithm = algorithm;
212 
213  _signalExport = _algorithm is QCAlgorithm
214  ? (_algorithm as QCAlgorithm).SignalExport
215  : (_algorithm as AlgorithmPythonWrapper).SignalExport;
216 
217  NewOrderEvent += (s, e) => _signalExport.OnOrderEvent(e);
219  }
220 
221  /// <summary>
222  /// Create and start the transaction thread, who will be in charge of processing
223  /// the order requests
224  /// </summary>
225  protected virtual void InitializeTransactionThread()
226  {
227  _processingThread = new Thread(Run) { IsBackground = true, Name = "Transaction Thread" };
228  _processingThread.Start();
229  }
230 
231  /// <summary>
232  /// Boolean flag indicating the Run thread method is busy.
233  /// False indicates it is completely finished processing and ready to be terminated.
234  /// </summary>
235  public bool IsActive { get; private set; }
236 
237  #region Order Request Processing
238 
239  /// <summary>
240  /// Adds the specified order to be processed
241  /// </summary>
242  /// <param name="request">The order to be processed</param>
244  {
245  if (_algorithm.LiveMode)
246  {
247  Log.Trace("BrokerageTransactionHandler.Process(): " + request);
248 
249  _algorithm.Portfolio.LogMarginInformation(request);
250  }
251 
252  switch (request.OrderRequestType)
253  {
254  case OrderRequestType.Submit:
255  return AddOrder((SubmitOrderRequest)request);
256 
257  case OrderRequestType.Update:
258  return UpdateOrder((UpdateOrderRequest)request);
259 
260  case OrderRequestType.Cancel:
261  return CancelOrder((CancelOrderRequest)request);
262 
263  default:
264  throw new ArgumentOutOfRangeException();
265  }
266  }
267 
268  /// <summary>
269  /// Add an order to collection and return the unique order id or negative if an error.
270  /// </summary>
271  /// <param name="request">A request detailing the order to be submitted</param>
272  /// <returns>New unique, increasing orderid</returns>
274  {
275  var response = !_algorithm.IsWarmingUp
276  ? OrderResponse.Success(request)
277  : OrderResponse.WarmingUp(request);
278 
279  var shortable = true;
280  if (request.Quantity < 0)
281  {
282  shortable = _algorithm.Shortable(request.Symbol, request.Quantity);
283  }
284 
285  if (!shortable)
286  {
287  var message = GetShortableErrorMessage(request.Symbol, request.Quantity);
288  if (_algorithm.LiveMode)
289  {
290  // in live mode we send a warning but we wont block the order being sent to the brokerage
291  _algorithm.Debug($"Warning: {message}");
292  }
293  else
294  {
295  response = OrderResponse.Error(request, OrderResponseErrorCode.ExceedsShortableQuantity, message);
296  }
297  }
298 
299  request.SetResponse(response);
300  var ticket = new OrderTicket(_algorithm.Transactions, request);
301 
302  Interlocked.Increment(ref _totalOrderCount);
303  // send the order to be processed after creating the ticket
304  if (response.IsSuccess)
305  {
306  _openOrderTickets.TryAdd(ticket.OrderId, ticket);
307  _completeOrderTickets.TryAdd(ticket.OrderId, ticket);
308  _orderRequestQueue.Add(request);
309 
310  // wait for the transaction handler to set the order reference into the new order ticket,
311  // so we can ensure the order has already been added to the open orders,
312  // before returning the ticket to the algorithm.
313  WaitForOrderSubmission(ticket);
314  }
315  else
316  {
317  // add it to the orders collection for recall later
318  var order = Order.CreateOrder(request);
319  var orderTag = response.ErrorCode == OrderResponseErrorCode.AlgorithmWarmingUp
320  ? "Algorithm warming up."
321  : response.ErrorMessage;
322 
323  // ensure the order is tagged with a currency
324  var security = _algorithm.Securities[order.Symbol];
325  order.PriceCurrency = security.SymbolProperties.QuoteCurrency;
326 
327  order.Status = OrderStatus.Invalid;
328  order.Tag = orderTag;
329  ticket.SetOrder(order);
330  _completeOrderTickets.TryAdd(ticket.OrderId, ticket);
331  _completeOrders.TryAdd(order.Id, order);
332 
333  HandleOrderEvent(new OrderEvent(order,
334  _algorithm.UtcTime,
335  OrderFee.Zero,
336  orderTag));
337  }
338  return ticket;
339  }
340 
341  /// <summary>
342  /// Wait for the order to be handled by the <see cref="_processingThread"/>
343  /// </summary>
344  /// <param name="ticket">The <see cref="OrderTicket"/> expecting to be submitted</param>
345  protected virtual void WaitForOrderSubmission(OrderTicket ticket)
346  {
347  var orderSetTimeout = Time.OneSecond;
348  if (!ticket.OrderSet.WaitOne(orderSetTimeout))
349  {
350  Log.Error("BrokerageTransactionHandler.WaitForOrderSubmission(): " +
351  $"The order request (Id={ticket.OrderId}) was not submitted within {orderSetTimeout.TotalSeconds} second(s).");
352  }
353  }
354 
355  /// <summary>
356  /// Update an order yet to be filled such as stop or limit orders.
357  /// </summary>
358  /// <param name="request">Request detailing how the order should be updated</param>
359  /// <remarks>Does not apply if the order is already fully filled</remarks>
361  {
362  OrderTicket ticket;
363  if (!_completeOrderTickets.TryGetValue(request.OrderId, out ticket))
364  {
365  return OrderTicket.InvalidUpdateOrderId(_algorithm.Transactions, request);
366  }
367 
368  ticket.AddUpdateRequest(request);
369 
370  try
371  {
372  //Update the order from the behaviour
373  var order = GetOrderByIdInternal(request.OrderId);
374  var orderQuantity = request.Quantity ?? ticket.Quantity;
375 
376  var shortable = true;
377  if (order?.Direction == OrderDirection.Sell || orderQuantity < 0)
378  {
379  shortable = _algorithm.Shortable(ticket.Symbol, orderQuantity, order.Id);
380 
381  if (_algorithm.LiveMode && !shortable)
382  {
383  // let's override and just send warning
384  shortable = true;
385 
386  _algorithm.Debug($"Warning: {GetShortableErrorMessage(ticket.Symbol, ticket.Quantity)}");
387  }
388  }
389 
390  if (order == null)
391  {
392  // can't update an order that doesn't exist!
393  Log.Error("BrokerageTransactionHandler.Update(): Cannot update a null order");
394  request.SetResponse(OrderResponse.UnableToFindOrder(request));
395  }
396  else if (order.Status == OrderStatus.New)
397  {
398  // can't update a pending submit order
399  Log.Error("BrokerageTransactionHandler.Update(): Cannot update a pending submit order with status " + order.Status);
400  request.SetResponse(OrderResponse.InvalidNewStatus(request, order));
401  }
402  else if (order.Status.IsClosed() && !request.IsAllowedForClosedOrder())
403  {
404  // can't update a completed order
405  Log.Error("BrokerageTransactionHandler.Update(): Cannot update closed order with status " + order.Status);
406  request.SetResponse(OrderResponse.InvalidStatus(request, order));
407  }
408  else if (request.Quantity.HasValue && request.Quantity.Value == 0)
409  {
410  request.SetResponse(OrderResponse.ZeroQuantity(request));
411  }
412  else if (_algorithm.IsWarmingUp)
413  {
414  request.SetResponse(OrderResponse.WarmingUp(request));
415  }
416  else if (!shortable)
417  {
418  var shortableResponse = OrderResponse.Error(request, OrderResponseErrorCode.ExceedsShortableQuantity,
419  GetShortableErrorMessage(ticket.Symbol, ticket.Quantity));
420 
421  request.SetResponse(shortableResponse);
422  }
423  else
424  {
425  request.SetResponse(OrderResponse.Success(request), OrderRequestStatus.Processing);
426  _orderRequestQueue.Add(request);
427  }
428  }
429  catch (Exception err)
430  {
431  Log.Error(err);
432  request.SetResponse(OrderResponse.Error(request, OrderResponseErrorCode.ProcessingError, err.Message));
433  }
434 
435  return ticket;
436  }
437 
438  /// <summary>
439  /// Remove this order from outstanding queue: user is requesting a cancel.
440  /// </summary>
441  /// <param name="request">Request containing the specific order id to remove</param>
443  {
444  OrderTicket ticket;
445  if (!_completeOrderTickets.TryGetValue(request.OrderId, out ticket))
446  {
447  Log.Error("BrokerageTransactionHandler.CancelOrder(): Unable to locate ticket for order.");
448  return OrderTicket.InvalidCancelOrderId(_algorithm.Transactions, request);
449  }
450 
451  try
452  {
453  // if we couldn't set this request as the cancellation then another thread/someone
454  // else is already doing it or it in fact has already been cancelled
455  if (!ticket.TrySetCancelRequest(request))
456  {
457  // the ticket has already been cancelled
458  request.SetResponse(OrderResponse.Error(request, OrderResponseErrorCode.InvalidRequest, "Cancellation is already in progress."));
459  return ticket;
460  }
461 
462  //Error check
463  var order = GetOrderByIdInternal(request.OrderId);
464  if (order != null && request.Tag != null)
465  {
466  order.Tag = request.Tag;
467  }
468  if (order == null)
469  {
470  Log.Error("BrokerageTransactionHandler.CancelOrder(): Cannot find this id.");
471  request.SetResponse(OrderResponse.UnableToFindOrder(request));
472  }
473  else if (order.Status == OrderStatus.New)
474  {
475  Log.Error("BrokerageTransactionHandler.CancelOrder(): Cannot cancel order with status: " + order.Status);
476  request.SetResponse(OrderResponse.InvalidNewStatus(request, order));
477  }
478  else if (order.Status.IsClosed())
479  {
480  Log.Error("BrokerageTransactionHandler.CancelOrder(): Cannot cancel order already " + order.Status);
481  request.SetResponse(OrderResponse.InvalidStatus(request, order));
482  }
483  else if (_algorithm.IsWarmingUp)
484  {
485  request.SetResponse(OrderResponse.WarmingUp(request));
486  }
487  else
488  {
489  _cancelPendingOrders.Set(order.Id, order.Status);
490  // update the order status
491  order.Status = OrderStatus.CancelPending;
492 
493  // notify the algorithm with an order event
494  HandleOrderEvent(new OrderEvent(order,
495  _algorithm.UtcTime,
496  OrderFee.Zero));
497 
498  // send the request to be processed
499  request.SetResponse(OrderResponse.Success(request), OrderRequestStatus.Processing);
500  _orderRequestQueue.Add(request);
501  }
502  }
503  catch (Exception err)
504  {
505  Log.Error(err);
506  request.SetResponse(OrderResponse.Error(request, OrderResponseErrorCode.ProcessingError, err.Message));
507  }
508 
509  return ticket;
510  }
511 
512  /// <summary>
513  /// Gets and enumerable of <see cref="OrderTicket"/> matching the specified <paramref name="filter"/>
514  /// </summary>
515  /// <param name="filter">The filter predicate used to find the required order tickets</param>
516  /// <returns>An enumerable of <see cref="OrderTicket"/> matching the specified <paramref name="filter"/></returns>
517  public IEnumerable<OrderTicket> GetOrderTickets(Func<OrderTicket, bool> filter = null)
518  {
519  return _completeOrderTickets.Select(x => x.Value).Where(filter ?? (x => true));
520  }
521 
522  /// <summary>
523  /// Gets and enumerable of opened <see cref="OrderTicket"/> matching the specified <paramref name="filter"/>
524  /// </summary>
525  /// <param name="filter">The filter predicate used to find the required order tickets</param>
526  /// <returns>An enumerable of opened <see cref="OrderTicket"/> matching the specified <paramref name="filter"/></returns>
527  public IEnumerable<OrderTicket> GetOpenOrderTickets(Func<OrderTicket, bool> filter = null)
528  {
529  return _openOrderTickets.Select(x => x.Value).Where(filter ?? (x => true));
530  }
531 
532  /// <summary>
533  /// Gets the order ticket for the specified order id. Returns null if not found
534  /// </summary>
535  /// <param name="orderId">The order's id</param>
536  /// <returns>The order ticket with the specified id, or null if not found</returns>
537  public OrderTicket GetOrderTicket(int orderId)
538  {
539  OrderTicket ticket;
540  _completeOrderTickets.TryGetValue(orderId, out ticket);
541  return ticket;
542  }
543 
544  #endregion
545 
546  /// <summary>
547  /// Get the order by its id
548  /// </summary>
549  /// <param name="orderId">Order id to fetch</param>
550  /// <returns>A clone of the order with the specified id, or null if no match is found</returns>
551  public Order GetOrderById(int orderId)
552  {
553  Order order = GetOrderByIdInternal(orderId);
554  return order?.Clone();
555  }
556 
557  private Order GetOrderByIdInternal(int orderId)
558  {
559  Order order;
560  return _completeOrders.TryGetValue(orderId, out order) ? order : null;
561  }
562 
563  /// <summary>
564  /// Gets the order by its brokerage id
565  /// </summary>
566  /// <param name="brokerageId">The brokerage id to fetch</param>
567  /// <returns>The first order matching the brokerage id, or null if no match is found</returns>
568  public List<Order> GetOrdersByBrokerageId(string brokerageId)
569  {
570  var openOrders = GetOrdersByBrokerageId(brokerageId, _openOrders);
571 
572  if (openOrders.Count > 0
573  // if it's part of a group, some leg could be filled already, not part of open orders
574  && (openOrders[0].GroupOrderManager == null || openOrders[0].GroupOrderManager.Count == openOrders.Count))
575  {
576  return openOrders;
577  }
578 
579  return GetOrdersByBrokerageId(brokerageId, _completeOrders);
580  }
581 
582  private static List<Order> GetOrdersByBrokerageId(string brokerageId, ConcurrentDictionary<int, Order> orders)
583  {
584  return orders
585  .Where(x => x.Value.BrokerId.Contains(brokerageId))
586  .Select(kvp => kvp.Value.Clone())
587  .ToList();
588  }
589 
590  /// <summary>
591  /// Gets all orders matching the specified filter. Specifying null will return an enumerable
592  /// of all orders.
593  /// </summary>
594  /// <param name="filter">Delegate used to filter the orders</param>
595  /// <returns>All orders this order provider currently holds by the specified filter</returns>
596  public IEnumerable<Order> GetOrders(Func<Order, bool> filter = null)
597  {
598  if (filter != null)
599  {
600  // return a clone to prevent object reference shenanigans, you must submit a request to change the order
601  return _completeOrders.Select(x => x.Value).Where(filter).Select(x => x.Clone());
602  }
603  return _completeOrders.Select(x => x.Value).Select(x => x.Clone());
604  }
605 
606  /// <summary>
607  /// Gets open orders matching the specified filter
608  /// </summary>
609  /// <param name="filter">Delegate used to filter the orders</param>
610  /// <returns>All open orders this order provider currently holds</returns>
611  public List<Order> GetOpenOrders(Func<Order, bool> filter = null)
612  {
613  if (filter != null)
614  {
615  // return a clone to prevent object reference shenanigans, you must submit a request to change the order
616  return _openOrders.Select(x => x.Value).Where(filter).Select(x => x.Clone()).ToList();
617  }
618  return _openOrders.Select(x => x.Value).Select(x => x.Clone()).ToList();
619  }
620 
621  /// <summary>
622  /// Primary thread entry point to launch the transaction thread.
623  /// </summary>
624  protected void Run()
625  {
626  try
627  {
628  foreach (var request in _orderRequestQueue.GetConsumingEnumerable(_cancellationTokenSource.Token))
629  {
630  HandleOrderRequest(request);
632  }
633  }
634  catch (Exception err)
635  {
636  // unexpected error, we need to close down shop
637  _algorithm.SetRuntimeError(err, "HandleOrderRequest");
638  }
639 
640  if (_processingThread != null)
641  {
642  Log.Trace("BrokerageTransactionHandler.Run(): Ending Thread...");
643  IsActive = false;
644  }
645  }
646 
647  /// <summary>
648  /// Processes asynchronous events on the transaction handler's thread
649  /// </summary>
650  public virtual void ProcessAsynchronousEvents()
651  {
652  // NOP
653  }
654 
655  /// <summary>
656  /// Processes all synchronous events that must take place before the next time loop for the algorithm
657  /// </summary>
658  public virtual void ProcessSynchronousEvents()
659  {
660  // how to do synchronous market orders for real brokerages?
661 
662  // in backtesting we need to wait for orders to be removed from the queue and finished processing
663  if (!_algorithm.LiveMode)
664  {
665  if (_orderRequestQueue.IsBusy && !_orderRequestQueue.WaitHandle.WaitOne(Time.OneSecond, _cancellationTokenSource.Token))
666  {
667  Log.Error("BrokerageTransactionHandler.ProcessSynchronousEvents(): Timed out waiting for request queue to finish processing.");
668  }
669  return;
670  }
671 
672  _signalExport.Flush(CurrentTimeUtc);
673 
674  // check if the brokerage should perform cash sync now
675  if (!_algorithm.IsWarmingUp && _brokerage.ShouldPerformCashSync(CurrentTimeUtc))
676  {
677  // only perform cash syncs if we haven't had a fill for at least 10 seconds
678  if (TimeSinceLastFill > TimeSpan.FromSeconds(10))
679  {
680  if (!_brokerage.PerformCashSync(_algorithm, CurrentTimeUtc, () => TimeSinceLastFill))
681  {
682  if (++_failedCashSyncAttempts >= MaxCashSyncAttempts)
683  {
684  throw new Exception("The maximum number of attempts for brokerage cash sync has been reached.");
685  }
686  }
687  }
688  }
689 
690  // we want to remove orders older than 10k records, but only in live mode
691  const int maxOrdersToKeep = 10000;
692  if (_completeOrders.Count < maxOrdersToKeep + 1)
693  {
694  return;
695  }
696 
697  Log.Debug("BrokerageTransactionHandler.ProcessSynchronousEvents(): Start removing old orders...");
698  var max = _completeOrders.Max(x => x.Key);
699  var lowestOrderIdToKeep = max - maxOrdersToKeep;
700  foreach (var item in _completeOrders.Where(x => x.Key <= lowestOrderIdToKeep))
701  {
702  Order value;
703  OrderTicket ticket;
704  _completeOrders.TryRemove(item.Key, out value);
705  _completeOrderTickets.TryRemove(item.Key, out ticket);
706  }
707 
708  Log.Debug($"BrokerageTransactionHandler.ProcessSynchronousEvents(): New order count {_completeOrders.Count}. Exit");
709  }
710 
711  /// <summary>
712  /// Register an already open Order
713  /// </summary>
714  public void AddOpenOrder(Order order, IAlgorithm algorithm)
715  {
716  if (order.Status == OrderStatus.New || order.Status == OrderStatus.None)
717  {
718  // make sure we have a valid order status
719  order.Status = OrderStatus.Submitted;
720  }
721 
722  order.Id = algorithm.Transactions.GetIncrementOrderId();
723 
724  if (order.GroupOrderManager != null && order.GroupOrderManager.Id == 0)
725  {
727  }
728 
729  var orderTicket = order.ToOrderTicket(algorithm.Transactions);
730 
731  SetPriceAdjustmentMode(order, algorithm);
732 
733  _openOrders.AddOrUpdate(order.Id, order, (i, o) => order);
734  _completeOrders.AddOrUpdate(order.Id, order, (i, o) => order);
735  _openOrderTickets.AddOrUpdate(order.Id, orderTicket);
736  _completeOrderTickets.AddOrUpdate(order.Id, orderTicket);
737 
738  Interlocked.Increment(ref _totalOrderCount);
739  }
740 
741 
742  /// <summary>
743  /// Signal a end of thread request to stop monitoring the transactions.
744  /// </summary>
745  public void Exit()
746  {
747  var timeout = TimeSpan.FromSeconds(60);
748  if (_processingThread != null)
749  {
750  // only wait if the processing thread is running
751  if (_orderRequestQueue.IsBusy && !_orderRequestQueue.WaitHandle.WaitOne(timeout))
752  {
753  Log.Error("BrokerageTransactionHandler.Exit(): Exceed timeout: " + (int)(timeout.TotalSeconds) + " seconds.");
754  }
755  }
756 
757  _processingThread?.StopSafely(timeout, _cancellationTokenSource);
758  IsActive = false;
759  _cancellationTokenSource.DisposeSafely();
760  }
761 
762  /// <summary>
763  /// Handles a generic order request
764  /// </summary>
765  /// <param name="request"><see cref="OrderRequest"/> to be handled</param>
766  /// <returns><see cref="OrderResponse"/> for request</returns>
767  public void HandleOrderRequest(OrderRequest request)
768  {
769  OrderResponse response;
770  switch (request.OrderRequestType)
771  {
772  case OrderRequestType.Submit:
773  response = HandleSubmitOrderRequest((SubmitOrderRequest)request);
774  break;
775  case OrderRequestType.Update:
776  response = HandleUpdateOrderRequest((UpdateOrderRequest)request);
777  break;
778  case OrderRequestType.Cancel:
779  response = HandleCancelOrderRequest((CancelOrderRequest)request);
780  break;
781  default:
782  throw new ArgumentOutOfRangeException();
783  }
784 
785  // mark request as processed
786  request.SetResponse(response, OrderRequestStatus.Processed);
787  }
788 
789  /// <summary>
790  /// Handles a request to submit a new order
791  /// </summary>
792  private OrderResponse HandleSubmitOrderRequest(SubmitOrderRequest request)
793  {
794  OrderTicket ticket;
795  var order = Order.CreateOrder(request);
796 
797  // ensure the order is tagged with a currency
798  var security = _algorithm.Securities[order.Symbol];
799  order.PriceCurrency = security.SymbolProperties.QuoteCurrency;
800  if (string.IsNullOrEmpty(order.Tag))
801  {
802  order.Tag = order.GetDefaultTag();
803  }
804 
805  // rounds off the order towards 0 to the nearest multiple of lot size
806  order.Quantity = RoundOffOrder(order, security);
807 
808  if (!_openOrders.TryAdd(order.Id, order) || !_completeOrders.TryAdd(order.Id, order))
809  {
810  Log.Error("BrokerageTransactionHandler.HandleSubmitOrderRequest(): Unable to add new order, order not processed.");
811  return OrderResponse.Error(request, OrderResponseErrorCode.OrderAlreadyExists, "Cannot process submit request because order with id {0} already exists");
812  }
813  if (!_completeOrderTickets.TryGetValue(order.Id, out ticket))
814  {
815  Log.Error("BrokerageTransactionHandler.HandleSubmitOrderRequest(): Unable to retrieve order ticket, order not processed.");
816  return OrderResponse.UnableToFindOrder(request);
817  }
818 
819  var comboIsReady = order.TryGetGroupOrders(TryGetOrder, out var orders);
820  var comboSecuritiesFound = orders.TryGetGroupOrdersSecurities(_algorithm.Portfolio, out var securities);
821 
822  // rounds the order prices
823  RoundOrderPrices(order, security, comboIsReady, securities);
824 
825  // save current security prices
826  order.OrderSubmissionData = new OrderSubmissionData(security.BidPrice, security.AskPrice, security.Close);
827 
828  // Set order price adjustment mode
829  SetPriceAdjustmentMode(order, _algorithm);
830 
831  // update the ticket's internal storage with this new order reference
832  ticket.SetOrder(order);
833 
834  if (!comboIsReady)
835  {
836  // an Order of the group is missing
837  return OrderResponse.Success(request);
838  }
839 
840  if (orders.Any(o => o.Quantity == 0))
841  {
842  var response = OrderResponse.ZeroQuantity(request);
843  _algorithm.Error(response.ErrorMessage);
844 
845  InvalidateOrders(orders, response.ErrorMessage);
846  return response;
847  }
848 
849  if (!comboSecuritiesFound)
850  {
851  var response = OrderResponse.MissingSecurity(request);
852  _algorithm.Error(response.ErrorMessage);
853 
854  InvalidateOrders(orders, response.ErrorMessage);
855  return response;
856  }
857 
858  // check to see if we have enough money to place the order
859  if (!HasSufficientBuyingPowerForOrders(order, request, out var validationResult, orders, securities))
860  {
861  return validationResult;
862  }
863 
864  // verify that our current brokerage can actually take the order
865  foreach (var kvp in securities)
866  {
867  if (!_algorithm.BrokerageModel.CanSubmitOrder(kvp.Value, kvp.Key, out var message))
868  {
869  var errorMessage = $"BrokerageModel declared unable to submit order: [{string.Join(",", orders.Select(o => o.Id))}]";
870 
871  // if we couldn't actually process the order, mark it as invalid and bail
872  message ??= new BrokerageMessageEvent(BrokerageMessageType.Warning, "InvalidOrder", string.Empty);
873  var response = OrderResponse.Error(request, OrderResponseErrorCode.BrokerageModelRefusedToSubmitOrder, $"{errorMessage} {message}");
874 
875  InvalidateOrders(orders, response.ErrorMessage);
876  _algorithm.Error(response.ErrorMessage);
877  return response;
878  }
879  }
880 
881  // set the order status based on whether or not we successfully submitted the order to the market
882  bool orderPlaced;
883  try
884  {
885  orderPlaced = orders.All(o => _brokerage.PlaceOrder(o));
886  }
887  catch (Exception err)
888  {
889  Log.Error(err);
890  orderPlaced = false;
891  }
892 
893  if (!orderPlaced)
894  {
895  // we failed to submit the order, invalidate it
896  var errorMessage = $"Brokerage failed to place orders: [{string.Join(",", orders.Select(o => o.Id))}]";
897 
898  InvalidateOrders(orders, errorMessage);
899  _algorithm.Error(errorMessage);
900  return OrderResponse.Error(request, OrderResponseErrorCode.BrokerageFailedToSubmitOrder, errorMessage);
901  }
902 
903  return OrderResponse.Success(request);
904  }
905 
906  /// <summary>
907  /// Handles a request to update order properties
908  /// </summary>
909  private OrderResponse HandleUpdateOrderRequest(UpdateOrderRequest request)
910  {
911  Order order;
912  OrderTicket ticket;
913  if (!_completeOrders.TryGetValue(request.OrderId, out order) || !_completeOrderTickets.TryGetValue(request.OrderId, out ticket))
914  {
915  Log.Error("BrokerageTransactionHandler.HandleUpdateOrderRequest(): Unable to update order with ID " + request.OrderId);
916  return OrderResponse.UnableToFindOrder(request);
917  }
918 
919  if (order.Status == OrderStatus.New)
920  {
921  return OrderResponse.InvalidNewStatus(request, order);
922  }
923 
924  var isClosedOrderUpdate = false;
925 
926  if (order.Status.IsClosed())
927  {
928  if (!request.IsAllowedForClosedOrder())
929  {
930  return OrderResponse.InvalidStatus(request, order);
931  }
932 
933  isClosedOrderUpdate = true;
934  }
935 
936  // rounds off the order towards 0 to the nearest multiple of lot size
937  var security = _algorithm.Securities[order.Symbol];
938  order.Quantity = RoundOffOrder(order, security);
939 
940  // verify that our current brokerage can actually update the order
941  BrokerageMessageEvent message;
942  if (!_algorithm.LiveMode && !_algorithm.BrokerageModel.CanUpdateOrder(_algorithm.Securities[order.Symbol], order, request, out message))
943  {
944  if (message == null) message = new BrokerageMessageEvent(BrokerageMessageType.Warning, "InvalidRequest", "BrokerageModel declared unable to update order: " + order.Id);
945  var response = OrderResponse.Error(request, OrderResponseErrorCode.BrokerageModelRefusedToUpdateOrder, "OrderID: " + order.Id + " " + message);
946  _algorithm.Error(response.ErrorMessage);
947  HandleOrderEvent(new OrderEvent(order,
948  _algorithm.UtcTime,
949  OrderFee.Zero,
950  "BrokerageModel declared unable to update order"));
951  return response;
952  }
953 
954  // If the order is not part of a ComboLegLimit update, validate sufficient buying power
955  if (order.GroupOrderManager == null)
956  {
957  var updatedOrder = order.Clone();
958  updatedOrder.ApplyUpdateOrderRequest(request);
959  if (!HasSufficientBuyingPowerForOrders(updatedOrder, request, out var validationResult))
960  {
961  return validationResult;
962  }
963  }
964 
965  // modify the values of the order object
966  order.ApplyUpdateOrderRequest(request);
967 
968  // rounds the order prices
969  RoundOrderPrices(order, security);
970 
971  ticket.SetOrder(order);
972 
973  bool orderUpdated;
974  if (isClosedOrderUpdate)
975  {
976  orderUpdated = true;
977  }
978  else
979  {
980  try
981  {
982  orderUpdated = _brokerage.UpdateOrder(order);
983  }
984  catch (Exception err)
985  {
986  Log.Error(err);
987  orderUpdated = false;
988  }
989  }
990 
991  if (!orderUpdated)
992  {
993  // we failed to update the order for some reason
994  var errorMessage = "Brokerage failed to update order with id " + request.OrderId;
995  _algorithm.Error(errorMessage);
996  HandleOrderEvent(new OrderEvent(order,
997  _algorithm.UtcTime,
998  OrderFee.Zero,
999  "Brokerage failed to update order"));
1000  return OrderResponse.Error(request, OrderResponseErrorCode.BrokerageFailedToUpdateOrder, errorMessage);
1001  }
1002 
1003  return OrderResponse.Success(request);
1004  }
1005 
1006  /// <summary>
1007  /// Handles a request to cancel an order
1008  /// </summary>
1009  private OrderResponse HandleCancelOrderRequest(CancelOrderRequest request)
1010  {
1011  Order order;
1012  OrderTicket ticket;
1013  if (!_completeOrders.TryGetValue(request.OrderId, out order) || !_completeOrderTickets.TryGetValue(request.OrderId, out ticket))
1014  {
1015  Log.Error("BrokerageTransactionHandler.HandleCancelOrderRequest(): Unable to cancel order with ID " + request.OrderId + ".");
1017  return OrderResponse.UnableToFindOrder(request);
1018  }
1019 
1020  if (order.Status == OrderStatus.New)
1021  {
1023  return OrderResponse.InvalidNewStatus(request, order);
1024  }
1025 
1026  if (order.Status.IsClosed())
1027  {
1029  return OrderResponse.InvalidStatus(request, order);
1030  }
1031 
1032  ticket.SetOrder(order);
1033 
1034  bool orderCanceled;
1035  try
1036  {
1037  orderCanceled = _brokerage.CancelOrder(order);
1038  }
1039  catch (Exception err)
1040  {
1041  Log.Error(err);
1042  orderCanceled = false;
1043  }
1044 
1045  if (!orderCanceled)
1046  {
1047  // failed to cancel the order
1048  var message = "Brokerage failed to cancel order with id " + order.Id;
1049  _algorithm.Error(message);
1051  return OrderResponse.Error(request, OrderResponseErrorCode.BrokerageFailedToCancelOrder, message);
1052  }
1053 
1054  if (request.Tag != null)
1055  {
1056  // update the tag, useful for 'why' we canceled the order
1057  order.Tag = request.Tag;
1058  }
1059 
1060  return OrderResponse.Success(request);
1061  }
1062 
1063  /// <summary>
1064  /// Validates if there is sufficient buying power for the given order(s).
1065  /// Returns an error response if validation fails or an exception occurs.
1066  /// Returns null if validation passes.
1067  /// </summary>
1068  private bool HasSufficientBuyingPowerForOrders(Order order, OrderRequest request, out OrderResponse response, List<Order> orders = null, Dictionary<Order, Security> securities = null)
1069  {
1070  response = null;
1071  HasSufficientBuyingPowerForOrderResult hasSufficientBuyingPowerResult;
1072  try
1073  {
1074  hasSufficientBuyingPowerResult = _algorithm.Portfolio.HasSufficientBuyingPowerForOrder(orders ?? [order]);
1075  }
1076  catch (Exception err)
1077  {
1078  Log.Error(err);
1079  _algorithm.Error($"Order Error: id: {order.Id.ToStringInvariant()}, Error executing margin models: {err.Message}");
1080  HandleOrderEvent(new OrderEvent(order, _algorithm.UtcTime, OrderFee.Zero, "Error executing margin models"));
1081 
1082  response = OrderResponse.Error(request, OrderResponseErrorCode.ProcessingError, "An error occurred while checking sufficient buying power for the orders.");
1083  return false;
1084  }
1085 
1086  if (!hasSufficientBuyingPowerResult.IsSufficient)
1087  {
1088  var errorMessage = securities != null
1089  ? securities.GetErrorMessage(hasSufficientBuyingPowerResult)
1090  : $"Brokerage failed to update order with id: {order.Id.ToStringInvariant()}, Symbol: {order.Symbol.Value}, Insufficient buying power to complete order, Reason: {hasSufficientBuyingPowerResult.Reason}.";
1091 
1092  _algorithm.Error(errorMessage);
1093 
1094  if (request is UpdateOrderRequest)
1095  {
1096  HandleOrderEvent(new OrderEvent(order, _algorithm.UtcTime, OrderFee.Zero, errorMessage));
1097  response = OrderResponse.Error(request, OrderResponseErrorCode.BrokerageFailedToUpdateOrder, errorMessage);
1098  }
1099  else
1100  {
1101  InvalidateOrders(orders, errorMessage);
1102  response = OrderResponse.Error(request, OrderResponseErrorCode.InsufficientBuyingPower, errorMessage);
1103  }
1104  return false;
1105  }
1106 
1107  return true;
1108  }
1109 
1110  private void HandleOrderEvents(List<OrderEvent> orderEvents)
1111  {
1112  lock (_lockHandleOrderEvent)
1113  {
1114  // Get orders and tickets
1115  var orders = new List<Order>(orderEvents.Count);
1116 
1117  for (var i = 0; i < orderEvents.Count; i++)
1118  {
1119  var orderEvent = orderEvents[i];
1120 
1121  if (orderEvent.Status.IsClosed() && _openOrders.TryRemove(orderEvent.OrderId, out var order))
1122  {
1123  _completeOrders[orderEvent.OrderId] = order;
1124  }
1125  else if (!_completeOrders.TryGetValue(orderEvent.OrderId, out order))
1126  {
1127  Log.Error("BrokerageTransactionHandler.HandleOrderEvents(): Unable to locate open Combo Order with id " + orderEvent.OrderId);
1128  LogOrderEvent(orderEvent);
1129  return;
1130  }
1131  orders.Add(order);
1132 
1133  if (orderEvent.Status.IsClosed() && _openOrderTickets.TryRemove(orderEvent.OrderId, out var ticket))
1134  {
1135  _completeOrderTickets[orderEvent.OrderId] = ticket;
1136  }
1137  else if (!_completeOrderTickets.TryGetValue(orderEvent.OrderId, out ticket))
1138  {
1139  Log.Error("BrokerageTransactionHandler.HandleOrderEvents(): Unable to resolve open ticket: " + orderEvent.OrderId);
1140  LogOrderEvent(orderEvent);
1141  return;
1142  }
1143  orderEvent.Ticket = ticket;
1144  }
1145 
1146  var fillsToProcess = new List<OrderEvent>(orderEvents.Count);
1147 
1148  // now lets update the orders
1149  for (var i = 0; i < orderEvents.Count; i++)
1150  {
1151  var orderEvent = orderEvents[i];
1152  var order = orders[i];
1153  var ticket = orderEvent.Ticket;
1154 
1155  _cancelPendingOrders.UpdateOrRemove(order.Id, orderEvent.Status);
1156  // set the status of our order object based on the fill event except if the order status is filled/cancelled and the event is invalid
1157  // in live trading it can happen that we submit an update which get's rejected by the brokerage because the order is already filled
1158  // so we don't want the invalid update event to set the order status to invalid if it's already filled
1159  if (order.Status != OrderStatus.Filled && order.Status != OrderStatus.Canceled || orderEvent.Status != OrderStatus.Invalid)
1160  {
1161  order.Status = orderEvent.Status;
1162  }
1163 
1164  orderEvent.Id = order.GetNewId();
1165 
1166  // set the modified time of the order to the fill's timestamp
1167  switch (orderEvent.Status)
1168  {
1169  case OrderStatus.Canceled:
1170  order.CanceledTime = orderEvent.UtcTime;
1171  break;
1172 
1173  case OrderStatus.PartiallyFilled:
1174  case OrderStatus.Filled:
1175  order.LastFillTime = orderEvent.UtcTime;
1176  break;
1177 
1178  case OrderStatus.UpdateSubmitted:
1179  case OrderStatus.Submitted:
1180  // submit events after the initial submission are all order updates
1181  if (ticket.UpdateRequests.Count > 0)
1182  {
1183  order.LastUpdateTime = orderEvent.UtcTime;
1184  }
1185  break;
1186  }
1187 
1188  // lets always set current Quantity, Limit and Stop prices in the order event so that it's easier for consumers
1189  // to know the current state and detect any update
1190  orderEvent.Quantity = order.Quantity;
1191  switch (order.Type)
1192  {
1193  case OrderType.Limit:
1194  var limit = order as LimitOrder;
1195  orderEvent.LimitPrice = limit.LimitPrice;
1196  break;
1197  case OrderType.ComboLegLimit:
1198  var legLimitOrder = order as ComboLegLimitOrder;
1199  orderEvent.LimitPrice = legLimitOrder.LimitPrice;
1200  break;
1201  case OrderType.StopMarket:
1202  var marketOrder = order as StopMarketOrder;
1203  orderEvent.StopPrice = marketOrder.StopPrice;
1204  break;
1205  case OrderType.StopLimit:
1206  var stopLimitOrder = order as StopLimitOrder;
1207  orderEvent.LimitPrice = stopLimitOrder.LimitPrice;
1208  orderEvent.StopPrice = stopLimitOrder.StopPrice;
1209  break;
1210  case OrderType.TrailingStop:
1211  var trailingStopOrder = order as TrailingStopOrder;
1212  orderEvent.StopPrice = trailingStopOrder.StopPrice;
1213  orderEvent.TrailingAmount = trailingStopOrder.TrailingAmount;
1214  break;
1215  case OrderType.LimitIfTouched:
1216  var limitIfTouchedOrder = order as LimitIfTouchedOrder;
1217  orderEvent.LimitPrice = limitIfTouchedOrder.LimitPrice;
1218  orderEvent.TriggerPrice = limitIfTouchedOrder.TriggerPrice;
1219  break;
1220  }
1221 
1222  // check if the fill currency and the order currency match the symbol currency
1223  if (orderEvent.Status == OrderStatus.Filled || orderEvent.Status == OrderStatus.PartiallyFilled)
1224  {
1225  fillsToProcess.Add(orderEvent);
1226  Interlocked.Exchange(ref _lastFillTimeTicks, CurrentTimeUtc.Ticks);
1227 
1228  var security = _algorithm.Securities[orderEvent.Symbol];
1229 
1230  if (orderEvent.Symbol.SecurityType == SecurityType.Crypto
1231  && order.Direction == OrderDirection.Buy
1232  && CurrencyPairUtil.TryDecomposeCurrencyPair(orderEvent.Symbol, out var baseCurrency, out var quoteCurrency)
1233  && orderEvent.OrderFee.Value.Currency == baseCurrency)
1234  {
1235  // fees are in the base currency, so we have to subtract them from the filled quantity
1236  // else the virtual position will bigger than the real size and we might no be able to liquidate
1237  orderEvent.FillQuantity -= orderEvent.OrderFee.Value.Amount;
1238  orderEvent.OrderFee = new ModifiedFillQuantityOrderFee(orderEvent.OrderFee.Value, quoteCurrency, security.SymbolProperties.ContractMultiplier);
1239 
1240  if (!_loggedFeeAdjustmentWarning)
1241  {
1242  _loggedFeeAdjustmentWarning = true;
1243  const string message = "When buying currency pairs, using Cash account types, fees in base currency" +
1244  " will be deducted from the filled quantity so virtual positions reflect actual holdings.";
1245  Log.Trace($"BrokerageTransactionHandler.HandleOrderEvent(): {message}");
1246  _algorithm.Debug(message);
1247  }
1248  }
1249  }
1250  }
1251 
1252  //Apply the filled orders to our portfolio:
1253  try
1254  {
1255  _algorithm.Portfolio.ProcessFills(fillsToProcess);
1256  }
1257  catch (Exception err)
1258  {
1259  Log.Error(err);
1260  _algorithm.Error($"Fill error: error in TradeBuilder.ProcessFill: {err.Message}");
1261  }
1262 
1263  // Apply the filled orders to the trade builder
1264  for (var i = 0; i < orderEvents.Count; i++)
1265  {
1266  var orderEvent = orderEvents[i];
1267 
1268  if (orderEvent.Status == OrderStatus.Filled || orderEvent.Status == OrderStatus.PartiallyFilled)
1269  {
1270  var security = _algorithm.Securities[orderEvent.Symbol];
1271 
1272  var multiplier = security.SymbolProperties.ContractMultiplier;
1273  var securityConversionRate = security.QuoteCurrency.ConversionRate;
1274  var feeInAccountCurrency = _algorithm.Portfolio.CashBook
1275  .ConvertToAccountCurrency(orderEvent.OrderFee.Value).Amount;
1276 
1277  try
1278  {
1279  _algorithm.TradeBuilder.ProcessFill(
1280  orderEvent,
1281  securityConversionRate,
1282  feeInAccountCurrency,
1283  multiplier);
1284  }
1285  catch (Exception err)
1286  {
1287  Log.Error(err);
1288  }
1289  }
1290 
1291  // update the ticket after we've processed the fill, but before the event, this way everything is ready for user code
1292  orderEvent.Ticket.AddOrderEvent(orderEvent);
1293  }
1294  }
1295 
1296  //We have the events! :) Orders filled, send them in to be handled by algorithm portfolio.
1297  for (var i = 0; i < orderEvents.Count; i++)
1298  {
1299  var orderEvent = orderEvents[i];
1300 
1301  if (orderEvent.Status != OrderStatus.None) //order.Status != OrderStatus.Submitted
1302  {
1303  _orderEvents.Enqueue(orderEvent);
1304 
1305  //Create new order event:
1306  _resultHandler.OrderEvent(orderEvent);
1307 
1308  NewOrderEvent?.Invoke(this, orderEvent);
1309 
1310  try
1311  {
1312  //Trigger our order event handler
1313  _algorithm.OnOrderEvent(orderEvent);
1314  }
1315  catch (Exception err)
1316  {
1317  // unexpected error, we need to close down shop
1318  _algorithm.SetRuntimeError(err, "Order Event Handler");
1319  }
1320  }
1321 
1322  LogOrderEvent(orderEvent);
1323  }
1324  }
1325 
1326  private void HandleOrderEvent(OrderEvent orderEvent)
1327  {
1328  HandleOrderEvents(new List<OrderEvent> { orderEvent });
1329  }
1330 
1331  private void HandleOrderUpdated(OrderUpdateEvent e)
1332  {
1333  if (!_completeOrders.TryGetValue(e.OrderId, out var order))
1334  {
1335  Log.Error("BrokerageTransactionHandler.HandleOrderUpdated(): Unable to locate open order with id " + e.OrderId);
1336  return;
1337  }
1338 
1339  switch (order.Type)
1340  {
1341  case OrderType.TrailingStop:
1342  ((TrailingStopOrder)order).StopPrice = e.TrailingStopPrice;
1343  break;
1344 
1345  case OrderType.StopLimit:
1346  ((StopLimitOrder)order).StopTriggered = e.StopTriggered;
1347  break;
1348  }
1349  }
1350 
1351  /// <summary>
1352  /// Gets the price adjustment mode for the specified symbol from its subscription configurations
1353  /// </summary>
1354  private void SetPriceAdjustmentMode(Order order, IAlgorithm algorithm)
1355  {
1356  if (algorithm.LiveMode)
1357  {
1358  // live trading always uses raw prices
1360  return;
1361  }
1362 
1363  if (!_priceAdjustmentModes.TryGetValue(order.Symbol, out var mode))
1364  {
1365  var configs = algorithm.SubscriptionManager.SubscriptionDataConfigService
1366  .GetSubscriptionDataConfigs(order.Symbol, includeInternalConfigs: true);
1367  if (configs.Count == 0)
1368  {
1369  throw new InvalidOperationException($"Unable to locate subscription data config for {order.Symbol}");
1370  }
1371 
1372  mode = configs[0].DataNormalizationMode;
1373  _priceAdjustmentModes[order.Symbol] = mode;
1374  }
1375 
1376  order.PriceAdjustmentMode = mode;
1377  }
1378 
1379  /// <summary>
1380  /// Debug logging helper method, called after HandleOrderEvent has finished updating status, price and quantity
1381  /// </summary>
1382  /// <param name="e">The order event</param>
1383  private static void LogOrderEvent(OrderEvent e)
1384  {
1385  if (Log.DebuggingEnabled)
1386  {
1387  Log.Debug("BrokerageTransactionHandler.LogOrderEvent(): " + e);
1388  }
1389  }
1390 
1391  /// <summary>
1392  /// Brokerages can send account updates, this include cash balance updates. Since it is of
1393  /// utmost important to always have an accurate picture of reality, we'll trust this information
1394  /// as truth
1395  /// </summary>
1396  private void HandleAccountChanged(AccountEvent account)
1397  {
1398  // how close are we?
1399  var existingCashBalance = _algorithm.Portfolio.CashBook[account.CurrencySymbol].Amount;
1400  if (existingCashBalance != account.CashBalance)
1401  {
1402  Log.Trace($"BrokerageTransactionHandler.HandleAccountChanged(): {account.CurrencySymbol} Cash Lean: {existingCashBalance} Brokerage: {account.CashBalance}. Will update: {_brokerage.AccountInstantlyUpdated}");
1403  }
1404 
1405  // maybe we don't actually want to do this, this data can be delayed. Must be explicitly supported by brokerage
1406  if (_brokerage.AccountInstantlyUpdated)
1407  {
1408  // override the current cash value so we're always guaranteed to be in sync with the brokerage's push updates
1409  _algorithm.Portfolio.CashBook[account.CurrencySymbol].SetAmount(account.CashBalance);
1410  }
1411  }
1412 
1413  /// <summary>
1414  /// Brokerage order id change is applied to the target order
1415  /// </summary>
1416  private void HandlerBrokerageOrderIdChangedEvent(BrokerageOrderIdChangedEvent brokerageOrderIdChangedEvent)
1417  {
1418  var originalOrder = GetOrderByIdInternal(brokerageOrderIdChangedEvent.OrderId);
1419 
1420  if (originalOrder == null)
1421  {
1422  // shouldn't happen but let's be careful
1423  Log.Error($"BrokerageTransactionHandler.HandlerBrokerageOrderIdChangedEvent(): Lean order id {brokerageOrderIdChangedEvent.OrderId} not found");
1424  return;
1425  }
1426 
1427  // we replace the whole collection
1428  originalOrder.BrokerId = brokerageOrderIdChangedEvent.BrokerId;
1429  }
1430 
1431  /// <summary>
1432  /// Option assignment/exercise event is received and propagated to the user algo
1433  /// </summary>
1434  private void HandlePositionAssigned(OrderEvent fill)
1435  {
1436  // informing user algorithm that option position has been assigned
1437  _algorithm.OnAssignmentOrderEvent(fill);
1438  }
1439 
1440  private void HandleDelistingNotification(DelistingNotificationEventArgs e)
1441  {
1442  if (_algorithm.Securities.TryGetValue(e.Symbol, out var security))
1443  {
1444  // only log always in live trading, in backtesting log if not 0 holdings
1445  if (_algorithm.LiveMode || security.Holdings.Quantity != 0)
1446  {
1447  Log.Trace(
1448  $"BrokerageTransactionHandler.HandleDelistingNotification(): UtcTime: {CurrentTimeUtc} clearing position for delisted holding: " +
1449  $"Symbol: {e.Symbol.Value}, " +
1450  $"Quantity: {security.Holdings.Quantity}");
1451  }
1452 
1453  // Only submit an order if we have holdings
1454  var quantity = -security.Holdings.Quantity;
1455  if (quantity != 0)
1456  {
1457  var tag = "Liquidate from delisting";
1458 
1459  // Create our order and add it
1460  var order = new MarketOrder(security.Symbol, quantity, _algorithm.UtcTime, tag);
1461  AddOpenOrder(order, _algorithm);
1462 
1463  // Create our fill with the latest price
1464  var fill = new OrderEvent(order, _algorithm.UtcTime, OrderFee.Zero)
1465  {
1466  FillPrice = security.Price,
1467  Status = OrderStatus.Filled,
1468  FillQuantity = order.Quantity
1469  };
1470 
1471  // Process this order event
1472  HandleOrderEvent(fill);
1473  }
1474  }
1475  }
1476 
1477  /// <summary>
1478  /// Option notification event is received and new order events are generated
1479  /// </summary>
1480  private void HandleOptionNotification(OptionNotificationEventArgs e)
1481  {
1482  if (_algorithm.Securities.TryGetValue(e.Symbol, out var security))
1483  {
1484  // let's take the order event lock, we will be looking at orders and security holdings
1485  // and we don't want them changing mid processing because of an order event coming in at the same time
1486  // for example: DateTime/decimal order attributes are not thread safe by nature!
1487  lock (_lockHandleOrderEvent)
1488  {
1490  {
1491  if (e.Position == 0)
1492  {
1493  // only log always in live trading, in backtesting log if not 0 holdings
1494  if (_algorithm.LiveMode || security.Holdings.Quantity != 0)
1495  {
1496  Log.Trace(
1497  $"BrokerageTransactionHandler.HandleOptionNotification(): UtcTime: {CurrentTimeUtc} clearing position for expired option holding: " +
1498  $"Symbol: {e.Symbol.Value}, " +
1499  $"Holdings: {security.Holdings.Quantity}");
1500  }
1501 
1502  var quantity = -security.Holdings.Quantity;
1503 
1504  // If the quantity is already 0 for Lean and the brokerage there is nothing else todo here
1505  if (quantity != 0)
1506  {
1507  var exerciseOrder = GenerateOptionExerciseOrder(security, quantity, e.Tag);
1508 
1509  EmitOptionNotificationEvents(security, exerciseOrder);
1510  }
1511  }
1512  else
1513  {
1514  Log.Error("BrokerageTransactionHandler.HandleOptionNotification(): " +
1515  $"unexpected position ({e.Position} instead of zero) " +
1516  $"for expired option contract: {e.Symbol.Value}");
1517  }
1518  }
1519  else
1520  {
1521  // if position is reduced, could be an early exercise or early assignment
1522  if (Math.Abs(e.Position) < security.Holdings.AbsoluteQuantity)
1523  {
1524  Log.Trace("BrokerageTransactionHandler.HandleOptionNotification(): " +
1525  $"Symbol {e.Symbol.Value} EventQuantity {e.Position} Holdings {security.Holdings.Quantity}");
1526 
1527  // if we are long the option and there is an open order, assume it's an early exercise
1528  if (security.Holdings.IsLong)
1529  {
1530  // we only care about open option exercise orders, if it's closed it means we already
1531  // processed it and we wouldn't have a need to handle it here
1532  if (GetOpenOrders(x =>
1533  x.Symbol == e.Symbol &&
1534  x.Type == OrderType.OptionExercise)
1535  .FirstOrDefault() is OptionExerciseOrder exerciseOrder)
1536  {
1537  EmitOptionNotificationEvents(security, exerciseOrder);
1538  }
1539  }
1540 
1541  // if we are short the option and there are no buy orders (open or recently closed), assume it's an early assignment
1542  else if (security.Holdings.IsShort)
1543  {
1544  var nowUtc = CurrentTimeUtc;
1545  // for some brokerages (like IB) there might be a race condition between getting an option
1546  // notification event and lean processing an order event. So if we are here, there are these options:
1547  // A) holdings -10 position 5
1548  // 1) WE just BOUGHT 15 and Lean doesn't know yet
1549  // 2) WE just SOLD 15 and this notification is old
1550  // B) holdings -10 position -5
1551  // 1) WE just BOUGHT 5 and Lean doesn't know yet
1552  // 2) WE just SOLD 5 more and this notification is old
1553  // - Seen this in production already
1554  // 3) Brokerage triggered an early assignment
1555 
1556  // so we get ALL orders for this symbol that were placed or got an update in the last 'orderWindowSeconds'
1557 
1558  const int orderWindowSeconds = 10;
1559  // NOTE: We do this checks for actual live trading only to handle the race condition stated above
1560  // for actual brokerages (excluding paper trading with PaperBrokerage).
1561  // TODO: If we confirm this race condition applies for IB only, we could move this to the brokerage itself.
1562  if (_brokerageIsBacktesting ||
1563  !GetOrders(x =>
1564  x.Symbol == e.Symbol
1565  && (x.Status.IsOpen() || x.Status.IsFill() &&
1566  (Math.Abs((x.Time - nowUtc).TotalSeconds) < orderWindowSeconds
1567  || (x.LastUpdateTime.HasValue && Math.Abs((x.LastUpdateTime.Value - nowUtc).TotalSeconds) < orderWindowSeconds)
1568  || (x.LastFillTime.HasValue && Math.Abs((x.LastFillTime.Value - nowUtc).TotalSeconds) < orderWindowSeconds)))).Any())
1569  {
1570  var quantity = e.Position - security.Holdings.Quantity;
1571 
1572  var exerciseOrder = GenerateOptionExerciseOrder(security, quantity, e.Tag);
1573 
1574  EmitOptionNotificationEvents(security, exerciseOrder);
1575  }
1576  }
1577  }
1578  }
1579  }
1580  }
1581  }
1582 
1583  /// <summary>
1584  /// New brokerage-side order event handler
1585  /// </summary>
1586  private void HandleNewBrokerageSideOrder(NewBrokerageOrderNotificationEventArgs e)
1587  {
1588  void onError(IReadOnlyCollection<SecurityType> supportedSecurityTypes) =>
1589  _algorithm.Debug($"Warning: New brokerage-side order could not be processed due to " +
1590  $"it's security not being supported. Supported security types are {string.Join(", ", supportedSecurityTypes)}");
1591 
1592  if (_algorithm.BrokerageMessageHandler.HandleOrder(e) &&
1593  _algorithm.GetOrAddUnrequestedSecurity(e.Order.Symbol, out _, onError))
1594  {
1595  AddOpenOrder(e.Order, _algorithm);
1596  }
1597  }
1598 
1599  private OptionExerciseOrder GenerateOptionExerciseOrder(Security security, decimal quantity, string tag)
1600  {
1601  // generate new exercise order and ticket for the option
1602  var order = new OptionExerciseOrder(security.Symbol, quantity, CurrentTimeUtc, tag);
1603 
1604  // save current security prices
1605  order.OrderSubmissionData = new OrderSubmissionData(security.BidPrice, security.AskPrice, security.Close);
1606  order.PriceCurrency = security.SymbolProperties.QuoteCurrency;
1607 
1608  AddOpenOrder(order, _algorithm);
1609  return order;
1610  }
1611 
1612  private void EmitOptionNotificationEvents(Security security, OptionExerciseOrder order)
1613  {
1614  // generate the order events reusing the option exercise model
1615  var option = (Option)security;
1616  var orderEvents = option.OptionExerciseModel.OptionExercise(option, order);
1617 
1618  foreach (var orderEvent in orderEvents)
1619  {
1620  HandleOrderEvent(orderEvent);
1621 
1622  if (orderEvent.IsAssignment)
1623  {
1624  orderEvent.Message = order.Tag;
1625  HandlePositionAssigned(orderEvent);
1626  }
1627  }
1628  }
1629 
1630  /// <summary>
1631  /// Gets the amount of time since the last call to algorithm.Portfolio.ProcessFill(fill)
1632  /// </summary>
1633  protected virtual TimeSpan TimeSinceLastFill =>
1634  CurrentTimeUtc - new DateTime(Interlocked.Read(ref _lastFillTimeTicks));
1635 
1636  /// <summary>
1637  /// Gets current time UTC. This is here to facilitate testing
1638  /// </summary>
1639  protected virtual DateTime CurrentTimeUtc => DateTime.UtcNow;
1640 
1641  /// <summary>
1642  /// Rounds off the order towards 0 to the nearest multiple of Lot Size
1643  /// </summary>
1644  public decimal RoundOffOrder(Order order, Security security)
1645  {
1646  var orderLotMod = order.Quantity % security.SymbolProperties.LotSize;
1647 
1648  if (orderLotMod != 0)
1649  {
1650  order.Quantity = order.Quantity - orderLotMod;
1651 
1652  if (!_firstRoundOffMessage)
1653  {
1654  _algorithm.Error("Warning: Due to brokerage limitations, orders will be rounded to " +
1655  $"the nearest lot size of {security.SymbolProperties.LotSize.ToStringInvariant()}"
1656  );
1657  _firstRoundOffMessage = true;
1658  }
1659  return order.Quantity;
1660  }
1661  else
1662  {
1663  return order.Quantity;
1664  }
1665  }
1666 
1667  /// <summary>
1668  /// Rounds the order prices to its security minimum price variation.
1669  /// <remarks>
1670  /// This procedure is needed to meet brokerage precision requirements.
1671  /// </remarks>
1672  /// </summary>
1673  protected void RoundOrderPrices(Order order, Security security)
1674  {
1675  var comboIsReady = order.TryGetGroupOrders(TryGetOrder, out var orders);
1676  orders.TryGetGroupOrdersSecurities(_algorithm.Portfolio, out var securities);
1677 
1678  RoundOrderPrices(order, security, comboIsReady, securities);
1679  }
1680 
1681  /// <summary>
1682  /// Rounds the order prices to its security minimum price variation.
1683  /// <remarks>
1684  /// This procedure is needed to meet brokerage precision requirements.
1685  /// </remarks>
1686  /// </summary>
1687  protected void RoundOrderPrices(Order order, Security security, bool comboIsReady, Dictionary<Order, Security> orders)
1688  {
1689  switch (order.Type)
1690  {
1691  case OrderType.Limit:
1692  {
1693  var limitOrder = (LimitOrder)order;
1694  RoundOrderPrice(security, limitOrder.LimitPrice, "LimitPrice", (roundedPrice) => limitOrder.LimitPrice = roundedPrice);
1695  }
1696  break;
1697 
1698  case OrderType.StopMarket:
1699  {
1700  var stopMarketOrder = (StopMarketOrder)order;
1701  RoundOrderPrice(security, stopMarketOrder.StopPrice, "StopPrice", (roundedPrice) => stopMarketOrder.StopPrice = roundedPrice);
1702  }
1703  break;
1704 
1705  case OrderType.StopLimit:
1706  {
1707  var stopLimitOrder = (StopLimitOrder)order;
1708  RoundOrderPrice(security, stopLimitOrder.LimitPrice, "LimitPrice", (roundedPrice) => stopLimitOrder.LimitPrice = roundedPrice);
1709  RoundOrderPrice(security, stopLimitOrder.StopPrice, "StopPrice", (roundedPrice) => stopLimitOrder.StopPrice = roundedPrice);
1710  }
1711  break;
1712 
1713  case OrderType.TrailingStop:
1714  {
1715  var trailingStopOrder = (TrailingStopOrder)order;
1716  RoundOrderPrice(security, trailingStopOrder.StopPrice, "StopPrice",
1717  (roundedPrice) => trailingStopOrder.StopPrice = roundedPrice);
1718 
1719  if (!trailingStopOrder.TrailingAsPercentage)
1720  {
1721  RoundOrderPrice(security, trailingStopOrder.TrailingAmount, "TrailingAmount",
1722  (roundedAmount) => trailingStopOrder.TrailingAmount = roundedAmount);
1723  }
1724  }
1725  break;
1726 
1727  case OrderType.LimitIfTouched:
1728  {
1729  var limitIfTouchedOrder = (LimitIfTouchedOrder)order;
1730  RoundOrderPrice(security, limitIfTouchedOrder.LimitPrice, "LimitPrice",
1731  (roundedPrice) => limitIfTouchedOrder.LimitPrice = roundedPrice);
1732  RoundOrderPrice(security, limitIfTouchedOrder.TriggerPrice, "TriggerPrice",
1733  (roundedPrice) => limitIfTouchedOrder.TriggerPrice = roundedPrice);
1734  }
1735  break;
1736 
1737  case OrderType.ComboLegLimit:
1738  {
1739  var comboLegOrder = (ComboLegLimitOrder)order;
1740  RoundOrderPrice(security, comboLegOrder.LimitPrice, "LimitPrice",
1741  (roundedPrice) => comboLegOrder.LimitPrice = roundedPrice);
1742  }
1743  break;
1744 
1745  case OrderType.ComboLimit:
1746  {
1747  if (comboIsReady)
1748  {
1749  // all orders in the combo have been received.
1750  // we can now round the limit price of the group order,
1751  // for which we need to find the smallest price variation from each leg security
1752  var groupOrderManager = order.GroupOrderManager;
1753  var increment = 0m;
1754  foreach (var (legOrder, legSecurity) in orders)
1755  {
1756  var legIncrement = legSecurity.PriceVariationModel.GetMinimumPriceVariation(
1757  new GetMinimumPriceVariationParameters(legSecurity, legOrder.Price));
1758  if (legIncrement > 0 && (increment == 0 || legIncrement < increment))
1759  {
1760  increment = legIncrement;
1761  }
1762  }
1763 
1764  RoundOrderPrice(groupOrderManager.LimitPrice, increment, "LimitPrice",
1765  (roundedPrice) => groupOrderManager.LimitPrice = roundedPrice);
1766  }
1767 
1768  }
1769  break;
1770  }
1771  }
1772 
1773  private void RoundOrderPrice(Security security, decimal price, string priceType, Action<decimal> setPrice)
1774  {
1775  var increment = security.PriceVariationModel.GetMinimumPriceVariation(new GetMinimumPriceVariationParameters(security, price));
1776  RoundOrderPrice(price, increment, priceType, setPrice);
1777  }
1778 
1779  [MethodImpl(MethodImplOptions.AggressiveInlining)]
1780  private void RoundOrderPrice(decimal price, decimal increment, string priceType, Action<decimal> setPrice)
1781  {
1782  if (increment > 0)
1783  {
1784  var roundedPrice = Math.Round(price / increment) * increment;
1785  setPrice(roundedPrice);
1786  SendWarningOnPriceChange(priceType, roundedPrice, price);
1787  }
1788  }
1789 
1790  private Order TryGetOrder(int orderId)
1791  {
1792  _completeOrders.TryGetValue(orderId, out var order);
1793  return order;
1794  }
1795 
1796  private void InvalidateOrders(List<Order> orders, string message)
1797  {
1798  for (var i = 0; i < orders.Count; i++)
1799  {
1800  var orderInGroup = orders[i];
1801  if (!orderInGroup.Status.IsClosed())
1802  {
1803  orderInGroup.Status = OrderStatus.Invalid;
1804  }
1805  HandleOrderEvents(new List<OrderEvent> { new OrderEvent(orderInGroup, _algorithm.UtcTime, OrderFee.Zero, message) });
1806  }
1807  }
1808 
1809  private void SendWarningOnPriceChange(string priceType, decimal priceRound, decimal priceOriginal)
1810  {
1811  if (!priceOriginal.Equals(priceRound))
1812  {
1813  _algorithm.Error(
1814  $"Warning: To meet brokerage precision requirements, order {priceType.ToStringInvariant()} was rounded to {priceRound.ToStringInvariant()} from {priceOriginal.ToStringInvariant()}"
1815  );
1816  }
1817  }
1818 
1819  private string GetShortableErrorMessage(Symbol symbol, decimal quantity)
1820  {
1821  var shortableQuantity = _algorithm.ShortableQuantity(symbol);
1822  return $"Order exceeds shortable quantity {shortableQuantity} for Symbol {symbol} requested {quantity})";
1823  }
1824  }
1825 }
1826