FlowAnalyzer.cs 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Linq;
  4. namespace FastGithub.FlowAnalyze
  5. {
  6. sealed class FlowAnalyzer : IFlowAnalyzer
  7. {
  8. private const int INTERVAL_SECONDS = 5;
  9. private readonly ConcurrentQueue<QueueItem> readQueue = new();
  10. private readonly ConcurrentQueue<QueueItem> writeQueue = new();
  11. private record QueueItem(long Ticks, int Length);
  12. /// <summary>
  13. /// 收到数据
  14. /// </summary>
  15. /// <param name="flowType"></param>
  16. /// <param name="length"></param>
  17. public void OnFlow(FlowType flowType, int length)
  18. {
  19. if (flowType == FlowType.Read)
  20. {
  21. Add(this.readQueue, length);
  22. }
  23. else
  24. {
  25. Add(this.writeQueue, length);
  26. }
  27. }
  28. private static void Add(ConcurrentQueue<QueueItem> quques, int length)
  29. {
  30. var ticks = Environment.TickCount64;
  31. while (quques.TryPeek(out var item))
  32. {
  33. if (ticks - item.Ticks < INTERVAL_SECONDS * 1000)
  34. {
  35. break;
  36. }
  37. else
  38. {
  39. quques.TryDequeue(out _);
  40. }
  41. }
  42. quques.Enqueue(new QueueItem(ticks, length));
  43. }
  44. /// <summary>
  45. /// 获取速率
  46. /// </summary>
  47. /// <returns></returns>
  48. public FlowRate GetFlowRate()
  49. {
  50. var readRate = (double)this.readQueue.Sum(item => item.Length) / INTERVAL_SECONDS;
  51. var writeRate = (double)this.writeQueue.Sum(item => item.Length) / INTERVAL_SECONDS;
  52. return new FlowRate { ReadRate = readRate, WriteRate = writeRate };
  53. }
  54. }
  55. }