using System; using System.Collections.Concurrent; using System.Linq; namespace FastGithub.FlowAnalyze { sealed class FlowAnalyzer : IFlowAnalyzer { private const int INTERVAL_SECONDS = 5; private readonly ConcurrentQueue readQueue = new(); private readonly ConcurrentQueue writeQueue = new(); private record QueueItem(long Ticks, int Length); /// /// 收到数据 /// /// /// public void OnFlow(FlowType flowType, int length) { if (flowType == FlowType.Read) { Add(this.readQueue, length); } else { Add(this.writeQueue, length); } } private static void Add(ConcurrentQueue quques, int length) { var ticks = Environment.TickCount64; while (quques.TryPeek(out var item)) { if (ticks - item.Ticks < INTERVAL_SECONDS * 1000) { break; } else { quques.TryDequeue(out _); } } quques.Enqueue(new QueueItem(ticks, length)); } /// /// 获取速率 /// /// public FlowRate GetFlowRate() { var readRate = (double)this.readQueue.Sum(item => item.Length) / INTERVAL_SECONDS; var writeRate = (double)this.writeQueue.Sum(item => item.Length) / INTERVAL_SECONDS; return new FlowRate { ReadRate = readRate, WriteRate = writeRate }; } } }