Lean  $LEAN_TAG$
DataQueueHandlerSubscriptionManager.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
17 using QuantConnect.Logging;
18 using System;
19 using System.Collections.Concurrent;
20 using System.Collections.Generic;
21 using System.Linq;
22 
23 namespace QuantConnect.Data
24 {
25  /// <summary>
26  /// Count number of subscribers for each channel (Symbol, Socket) pair
27  /// </summary>
28  public abstract class DataQueueHandlerSubscriptionManager : IDisposable
29  {
30  /// <summary>
31  /// Counter
32  /// </summary>
33  protected ConcurrentDictionary<Channel, int> SubscribersByChannel = new ConcurrentDictionary<Channel, int>();
34 
35  /// <summary>
36  /// Increment number of subscribers for current <see cref="TickType"/>
37  /// </summary>
38  /// <param name="dataConfig">defines the subscription configuration data.</param>
39  public void Subscribe(SubscriptionDataConfig dataConfig)
40  {
41  try
42  {
43  var channel = GetChannel(dataConfig);
44  int count;
45  if (SubscribersByChannel.TryGetValue(channel, out count))
46  {
47  SubscribersByChannel.TryUpdate(channel, count + 1, count);
48  return;
49  }
50 
51  if (Subscribe(new[] { dataConfig.Symbol }, dataConfig.TickType))
52  {
53  SubscribersByChannel.AddOrUpdate(channel, 1);
54  }
55  }
56  catch (Exception exception)
57  {
58  Log.Error(exception);
59  throw;
60  }
61  }
62 
63  /// <summary>
64  /// Decrement number of subscribers for current <see cref="TickType"/>
65  /// </summary>
66  /// <param name="dataConfig">defines the subscription configuration data.</param>
67  public void Unsubscribe(SubscriptionDataConfig dataConfig)
68  {
69  try
70  {
71  var channel = GetChannel(dataConfig);
72  int count;
73  if (SubscribersByChannel.TryGetValue(channel, out count))
74  {
75  if (count > 1)
76  {
77  SubscribersByChannel.TryUpdate(channel, count - 1, count);
78  return;
79  }
80 
81  if (Unsubscribe(new[] { dataConfig.Symbol }, dataConfig.TickType))
82  {
83  SubscribersByChannel.TryRemove(channel, out count);
84  }
85  }
86  }
87  catch (Exception exception)
88  {
89  Log.Error(exception);
90  throw;
91  }
92  }
93 
94  /// <summary>
95  /// Returns subscribed symbols
96  /// </summary>
97  /// <returns>list of <see cref="Symbol"/> currently subscribed</returns>
98  public IEnumerable<Symbol> GetSubscribedSymbols()
99  {
100  return SubscribersByChannel.Keys
101  .Select(c => c.Symbol)
102  .Distinct();
103  }
104 
105  /// <summary>
106  /// Checks if there is existing subscriber for current channel
107  /// </summary>
108  /// <param name="symbol">Symbol</param>
109  /// <param name="tickType">Type of tick data</param>
110  /// <returns>return true if there is one subscriber at least; otherwise false</returns>
111  public bool IsSubscribed(Symbol symbol, TickType tickType)
112  {
113  return SubscribersByChannel.ContainsKey(GetChannel(
114  symbol,
115  tickType));
116  }
117 
118  /// <summary>
119  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
120  /// </summary>
121  public virtual void Dispose()
122  {
123  }
124 
125  /// <summary>
126  /// Describes the way <see cref="IDataQueueHandler"/> implements subscription
127  /// </summary>
128  /// <param name="symbols">Symbols to subscribe</param>
129  /// <param name="tickType">Type of tick data</param>
130  /// <returns>Returns true if subsribed; otherwise false</returns>
131  protected abstract bool Subscribe(IEnumerable<Symbol> symbols, TickType tickType);
132 
133  /// <summary>
134  /// Describes the way <see cref="IDataQueueHandler"/> implements unsubscription
135  /// </summary>
136  /// <param name="symbols">Symbols to unsubscribe</param>
137  /// <param name="tickType">Type of tick data</param>
138  /// <returns>Returns true if unsubsribed; otherwise false</returns>
139  protected abstract bool Unsubscribe(IEnumerable<Symbol> symbols, TickType tickType);
140 
141  /// <summary>
142  /// Brokerage maps <see cref="TickType"/> to real socket/api channel
143  /// </summary>
144  /// <param name="tickType">Type of tick data</param>
145  /// <returns></returns>
146  protected abstract string ChannelNameFromTickType(TickType tickType);
147 
148  private Channel GetChannel(SubscriptionDataConfig dataConfig) => GetChannel(dataConfig.Symbol, dataConfig.TickType);
149 
150  private Channel GetChannel(Symbol symbol, TickType tickType)
151  {
152  return new Channel(
153  ChannelNameFromTickType(tickType),
154  symbol);
155  }
156  }
157 }