FlowAnalyzer.cs 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Linq;
  4. namespace FastGithub.FlowAnalyze
  5. {
  6. sealed class FlowAnalyzer : IFlowAnalyzer
  7. {
  8. private const int INTERVAL_SECONDS = 5;
  9. private readonly ConcurrentQueue<QueueItem> readQueue = new();
  10. private readonly ConcurrentQueue<QueueItem> writeQueue = new();
  11. private record QueueItem(long Ticks, int Length);
  12. /// <summary>
  13. /// 收到数据
  14. /// </summary>
  15. /// <param name="flowType"></param>
  16. /// <param name="length"></param>
  17. public void OnFlow(FlowType flowType, int length)
  18. {
  19. if (flowType == FlowType.Read)
  20. {
  21. Add(this.readQueue, length);
  22. }
  23. else
  24. {
  25. Add(this.writeQueue, length);
  26. }
  27. }
  28. private static void Add(ConcurrentQueue<QueueItem> quques, int length)
  29. {
  30. var ticks = Flush(quques);
  31. quques.Enqueue(new QueueItem(ticks, length));
  32. }
  33. /// <summary>
  34. /// 刷新队列
  35. /// </summary>
  36. /// <param name="quques"></param>
  37. /// <returns></returns>
  38. private static long Flush(ConcurrentQueue<QueueItem> quques)
  39. {
  40. var ticks = Environment.TickCount64;
  41. while (quques.TryPeek(out var item))
  42. {
  43. if (ticks - item.Ticks < INTERVAL_SECONDS * 1000)
  44. {
  45. break;
  46. }
  47. else
  48. {
  49. quques.TryDequeue(out _);
  50. }
  51. }
  52. return ticks;
  53. }
  54. /// <summary>
  55. /// 获取速率
  56. /// </summary>
  57. /// <returns></returns>
  58. public FlowRate GetFlowRate()
  59. {
  60. Flush(this.readQueue);
  61. var readRate = (double)this.readQueue.Sum(item => item.Length) / INTERVAL_SECONDS;
  62. Flush(this.writeQueue);
  63. var writeRate = (double)this.writeQueue.Sum(item => item.Length) / INTERVAL_SECONDS;
  64. return new FlowRate { ReadRate = readRate, WriteRate = writeRate };
  65. }
  66. }
  67. }