123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- using System;
- using System.Collections.Concurrent;
- using System.Linq;
- namespace FastGithub.FlowAnalyze
- {
- sealed class FlowAnalyzer : IFlowAnalyzer
- {
- private const int INTERVAL_SECONDS = 5;
- private readonly ConcurrentQueue<QueueItem> readQueue = new();
- private readonly ConcurrentQueue<QueueItem> writeQueue = new();
- private record QueueItem(long Ticks, int Length);
- /// <summary>
- /// 收到数据
- /// </summary>
- /// <param name="flowType"></param>
- /// <param name="length"></param>
- public void OnFlow(FlowType flowType, int length)
- {
- if (flowType == FlowType.Read)
- {
- Add(this.readQueue, length);
- }
- else
- {
- Add(this.writeQueue, length);
- }
- }
- private static void Add(ConcurrentQueue<QueueItem> quques, int length)
- {
- var ticks = Environment.TickCount64;
- while (quques.TryPeek(out var item))
- {
- if (ticks - item.Ticks < INTERVAL_SECONDS * 1000)
- {
- break;
- }
- else
- {
- quques.TryDequeue(out _);
- }
- }
- quques.Enqueue(new QueueItem(ticks, length));
- }
- /// <summary>
- /// 获取速率
- /// </summary>
- /// <returns></returns>
- public FlowRate GetFlowRate()
- {
- var readRate = (double)this.readQueue.Sum(item => item.Length) / INTERVAL_SECONDS;
- var writeRate = (double)this.writeQueue.Sum(item => item.Length) / INTERVAL_SECONDS;
- return new FlowRate { ReadRate = readRate, WriteRate = writeRate };
- }
- }
- }
|