FlowAnalyzeStream.cs 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. using System;
  2. using System.IO;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. namespace FastGithub.FlowAnalyze
  6. {
  7. sealed class FlowAnalyzeStream : Stream
  8. {
  9. private readonly Stream inner;
  10. private readonly IFlowAnalyzer flowAnalyzer;
  11. public FlowAnalyzeStream(Stream inner, IFlowAnalyzer flowAnalyzer)
  12. {
  13. this.inner = inner;
  14. this.flowAnalyzer = flowAnalyzer;
  15. }
  16. public override bool CanRead
  17. {
  18. get
  19. {
  20. return inner.CanRead;
  21. }
  22. }
  23. public override bool CanSeek
  24. {
  25. get
  26. {
  27. return inner.CanSeek;
  28. }
  29. }
  30. public override bool CanWrite
  31. {
  32. get
  33. {
  34. return inner.CanWrite;
  35. }
  36. }
  37. public override long Length
  38. {
  39. get
  40. {
  41. return inner.Length;
  42. }
  43. }
  44. public override long Position
  45. {
  46. get
  47. {
  48. return inner.Position;
  49. }
  50. set
  51. {
  52. inner.Position = value;
  53. }
  54. }
  55. public override void Flush()
  56. {
  57. inner.Flush();
  58. }
  59. public override Task FlushAsync(CancellationToken cancellationToken)
  60. {
  61. return inner.FlushAsync(cancellationToken);
  62. }
  63. public override int Read(byte[] buffer, int offset, int count)
  64. {
  65. int read = inner.Read(buffer, offset, count);
  66. this.flowAnalyzer.OnFlow(FlowType.Read, read);
  67. return read;
  68. }
  69. public override int Read(Span<byte> destination)
  70. {
  71. int read = inner.Read(destination);
  72. this.flowAnalyzer.OnFlow(FlowType.Read, read);
  73. return read;
  74. }
  75. public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  76. {
  77. int read = await inner.ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
  78. this.flowAnalyzer.OnFlow(FlowType.Read, read);
  79. return read;
  80. }
  81. public override async ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
  82. {
  83. int read = await inner.ReadAsync(destination, cancellationToken);
  84. this.flowAnalyzer.OnFlow(FlowType.Read, read);
  85. return read;
  86. }
  87. public override long Seek(long offset, SeekOrigin origin)
  88. {
  89. return inner.Seek(offset, origin);
  90. }
  91. public override void SetLength(long value)
  92. {
  93. inner.SetLength(value);
  94. }
  95. public override void Write(byte[] buffer, int offset, int count)
  96. {
  97. this.flowAnalyzer.OnFlow(FlowType.Wirte, count);
  98. inner.Write(buffer, offset, count);
  99. }
  100. public override void Write(ReadOnlySpan<byte> source)
  101. {
  102. this.flowAnalyzer.OnFlow(FlowType.Wirte, source.Length);
  103. inner.Write(source);
  104. }
  105. public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  106. {
  107. this.flowAnalyzer.OnFlow(FlowType.Wirte, count);
  108. return inner.WriteAsync(buffer, offset, count, cancellationToken);
  109. }
  110. public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
  111. {
  112. this.flowAnalyzer.OnFlow(FlowType.Wirte, source.Length);
  113. return inner.WriteAsync(source, cancellationToken);
  114. }
  115. // The below APM methods call the underlying Read/WriteAsync methods which will still be logged.
  116. public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
  117. {
  118. return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);
  119. }
  120. public override int EndRead(IAsyncResult asyncResult)
  121. {
  122. return TaskToApm.End<int>(asyncResult);
  123. }
  124. public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
  125. {
  126. return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);
  127. }
  128. public override void EndWrite(IAsyncResult asyncResult)
  129. {
  130. TaskToApm.End(asyncResult);
  131. }
  132. }
  133. }