2
0

FlowAnalyzer.cs 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Linq;
  4. using System.Threading;
  5. namespace FastGithub.FlowAnalyze
  6. {
  7. sealed class FlowAnalyzer : IFlowAnalyzer
  8. {
  9. private const int INTERVAL_SECONDS = 5;
  10. private long totalRead = 0;
  11. private long totalWrite = 0;
  12. private readonly ConcurrentQueue<QueueItem> readQueue = new();
  13. private readonly ConcurrentQueue<QueueItem> writeQueue = new();
  14. private record QueueItem(long Ticks, int Length);
  15. /// <summary>
  16. /// 收到数据
  17. /// </summary>
  18. /// <param name="flowType"></param>
  19. /// <param name="length"></param>
  20. public void OnFlow(FlowType flowType, int length)
  21. {
  22. if (flowType == FlowType.Read)
  23. {
  24. Interlocked.Add(ref this.totalRead, length);
  25. Add(this.readQueue, length);
  26. }
  27. else
  28. {
  29. Interlocked.Add(ref this.totalWrite, length);
  30. Add(this.writeQueue, length);
  31. }
  32. }
  33. private static void Add(ConcurrentQueue<QueueItem> quques, int length)
  34. {
  35. var ticks = Flush(quques);
  36. quques.Enqueue(new QueueItem(ticks, length));
  37. }
  38. /// <summary>
  39. /// 刷新队列
  40. /// </summary>
  41. /// <param name="quques"></param>
  42. /// <returns></returns>
  43. private static long Flush(ConcurrentQueue<QueueItem> quques)
  44. {
  45. var ticks = Environment.TickCount64;
  46. while (quques.TryPeek(out var item))
  47. {
  48. if (ticks - item.Ticks < INTERVAL_SECONDS * 1000)
  49. {
  50. break;
  51. }
  52. else
  53. {
  54. quques.TryDequeue(out _);
  55. }
  56. }
  57. return ticks;
  58. }
  59. /// <summary>
  60. /// 获取速率
  61. /// </summary>
  62. /// <returns></returns>
  63. public FlowRate GetFlowRate()
  64. {
  65. Flush(this.readQueue);
  66. var readRate = (double)this.readQueue.Sum(item => item.Length) / INTERVAL_SECONDS;
  67. Flush(this.writeQueue);
  68. var writeRate = (double)this.writeQueue.Sum(item => item.Length) / INTERVAL_SECONDS;
  69. return new FlowRate
  70. {
  71. TotalRead = this.totalRead,
  72. TotalWrite = this.totalWrite,
  73. ReadRate = readRate,
  74. WriteRate = writeRate
  75. };
  76. }
  77. }
  78. }