DuplexPipeStreamAdapter.cs 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. using System;
  2. using System.IO;
  3. using System.IO.Pipelines;
  4. using System.Threading.Tasks;
  5. namespace FastGithub.FlowAnalyze
  6. {
  7. class DuplexPipeStreamAdapter<TStream> : DuplexPipeStream, IDuplexPipe where TStream : Stream
  8. {
  9. private bool _disposed;
  10. private readonly object _disposeLock = new object();
  11. public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func<Stream, TStream> createStream) :
  12. this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream)
  13. {
  14. }
  15. public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func<Stream, TStream> createStream) :
  16. base(duplexPipe.Input, duplexPipe.Output)
  17. {
  18. var stream = createStream(this);
  19. Stream = stream;
  20. Input = PipeReader.Create(stream, readerOptions);
  21. Output = PipeWriter.Create(stream, writerOptions);
  22. }
  23. public TStream Stream { get; }
  24. public PipeReader Input { get; }
  25. public PipeWriter Output { get; }
  26. public override async ValueTask DisposeAsync()
  27. {
  28. lock (_disposeLock)
  29. {
  30. if (_disposed)
  31. {
  32. return;
  33. }
  34. _disposed = true;
  35. }
  36. await Input.CompleteAsync();
  37. await Output.CompleteAsync();
  38. }
  39. protected override void Dispose(bool disposing)
  40. {
  41. throw new NotSupportedException();
  42. }
  43. }
  44. }