DuplexPipeStream.cs 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. using System;
  2. using System.Buffers;
  3. using System.IO;
  4. using System.IO.Pipelines;
  5. using System.Runtime.CompilerServices;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace FastGithub.FlowAnalyze
  9. {
  10. class DuplexPipeStream : Stream
  11. {
  12. private readonly PipeReader _input;
  13. private readonly PipeWriter _output;
  14. private readonly bool _throwOnCancelled;
  15. private volatile bool _cancelCalled;
  16. public DuplexPipeStream(PipeReader input, PipeWriter output, bool throwOnCancelled = false)
  17. {
  18. _input = input;
  19. _output = output;
  20. _throwOnCancelled = throwOnCancelled;
  21. }
  22. public void CancelPendingRead()
  23. {
  24. _cancelCalled = true;
  25. _input.CancelPendingRead();
  26. }
  27. public override bool CanRead => true;
  28. public override bool CanSeek => false;
  29. public override bool CanWrite => true;
  30. public override long Length
  31. {
  32. get
  33. {
  34. throw new NotSupportedException();
  35. }
  36. }
  37. public override long Position
  38. {
  39. get
  40. {
  41. throw new NotSupportedException();
  42. }
  43. set
  44. {
  45. throw new NotSupportedException();
  46. }
  47. }
  48. public override long Seek(long offset, SeekOrigin origin)
  49. {
  50. throw new NotSupportedException();
  51. }
  52. public override void SetLength(long value)
  53. {
  54. throw new NotSupportedException();
  55. }
  56. public override int Read(byte[] buffer, int offset, int count)
  57. {
  58. ValueTask<int> vt = ReadAsyncInternal(new Memory<byte>(buffer, offset, count), default);
  59. return vt.IsCompleted ?
  60. vt.Result :
  61. vt.AsTask().GetAwaiter().GetResult();
  62. }
  63. public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
  64. {
  65. return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
  66. }
  67. public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
  68. {
  69. return ReadAsyncInternal(destination, cancellationToken);
  70. }
  71. public override void Write(byte[] buffer, int offset, int count)
  72. {
  73. WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
  74. }
  75. public override async Task WriteAsync(byte[]? buffer, int offset, int count, CancellationToken cancellationToken)
  76. {
  77. await _output.WriteAsync(buffer.AsMemory(offset, count), cancellationToken) ;
  78. }
  79. public override async ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
  80. {
  81. await _output.WriteAsync(source, cancellationToken);
  82. }
  83. public override void Flush()
  84. {
  85. FlushAsync(CancellationToken.None).GetAwaiter().GetResult();
  86. }
  87. public override async Task FlushAsync(CancellationToken cancellationToken)
  88. {
  89. await _output.FlushAsync(cancellationToken);
  90. }
  91. [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
  92. private async ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
  93. {
  94. while (true)
  95. {
  96. var result = await _input.ReadAsync(cancellationToken);
  97. var readableBuffer = result.Buffer;
  98. try
  99. {
  100. if (_throwOnCancelled && result.IsCanceled && _cancelCalled)
  101. {
  102. // Reset the bool
  103. _cancelCalled = false;
  104. throw new OperationCanceledException();
  105. }
  106. if (!readableBuffer.IsEmpty)
  107. {
  108. // buffer.Count is int
  109. var count = (int)Math.Min(readableBuffer.Length, destination.Length);
  110. readableBuffer = readableBuffer.Slice(0, count);
  111. readableBuffer.CopyTo(destination.Span);
  112. return count;
  113. }
  114. if (result.IsCompleted)
  115. {
  116. return 0;
  117. }
  118. }
  119. finally
  120. {
  121. _input.AdvanceTo(readableBuffer.End, readableBuffer.End);
  122. }
  123. }
  124. }
  125. public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
  126. {
  127. return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);
  128. }
  129. public override int EndRead(IAsyncResult asyncResult)
  130. {
  131. return TaskToApm.End<int>(asyncResult);
  132. }
  133. public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
  134. {
  135. return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);
  136. }
  137. public override void EndWrite(IAsyncResult asyncResult)
  138. {
  139. TaskToApm.End(asyncResult);
  140. }
  141. }
  142. }