using System; using System.Collections.Concurrent; using System.Linq; namespace FastGithub.FlowAnalyze { sealed class FlowAnalyzer : IFlowAnalyzer { private const int INTERVAL_SECONDS = 5; private readonly ConcurrentQueue readQueue = new(); private readonly ConcurrentQueue writeQueue = new(); private record QueueItem(long Ticks, int Length); /// /// 收到数据 /// /// /// public void OnFlow(FlowType flowType, int length) { if (flowType == FlowType.Read) { Add(this.readQueue, length); } else { Add(this.writeQueue, length); } } private static void Add(ConcurrentQueue quques, int length) { var ticks = Flush(quques); quques.Enqueue(new QueueItem(ticks, length)); } /// /// 刷新队列 /// /// /// private static long Flush(ConcurrentQueue quques) { var ticks = Environment.TickCount64; while (quques.TryPeek(out var item)) { if (ticks - item.Ticks < INTERVAL_SECONDS * 1000) { break; } else { quques.TryDequeue(out _); } } return ticks; } /// /// 获取速率 /// /// public FlowRate GetFlowRate() { Flush(this.readQueue); var readRate = (double)this.readQueue.Sum(item => item.Length) / INTERVAL_SECONDS; Flush(this.writeQueue); var writeRate = (double)this.writeQueue.Sum(item => item.Length) / INTERVAL_SECONDS; return new FlowRate { ReadRate = readRate, WriteRate = writeRate }; } } }