17 using System.Collections.Generic;
19 using System.Threading;
38 private int _dataPointCount;
51 var frontier = DateTime.MinValue;
53 var universeSelectionData =
new Dictionary<Universe, BaseDataCollection>();
57 var earlyBirdTicks =
long.MaxValue;
58 var data =
new List<DataFeedPacket>();
59 foreach (var subscription
in subscriptions.Where(subscription => !subscription.EndOfStream))
61 if (subscription.Current ==
null && !subscription.MoveNext())
69 while (subscription.Current.EmitTimeUtc <= frontier)
74 packet =
new DataFeedPacket(subscription.Security, subscription.Configuration);
80 packet.
Add(subscription.Current.Data);
81 Interlocked.Increment(ref _dataPointCount);
82 if (!subscription.MoveNext())
88 if (subscription.Current !=
null)
91 earlyBirdTicks = Math.Min(earlyBirdTicks, subscription.Current.EmitTimeUtc.Ticks);
98 yield
return timeSliceFactory.Create(frontier, data,
SecurityChanges.
None, universeSelectionData).Slice;
102 if (earlyBirdTicks ==
long.MaxValue)
break;
104 frontier =
new DateTime(Math.Max(earlyBirdTicks, frontier.Ticks), DateTimeKind.Utc);
108 foreach (var subscription
in subscriptions)
110 subscription.Dispose();
119 var config = request.ToSubscriptionDataConfig();
130 var reader = history.GetEnumerator();