2
0

DelegatingDuplexPipe.cs 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. using System;
  2. using System.IO;
  3. using System.IO.Pipelines;
  4. using System.Threading.Tasks;
  5. namespace FastGithub.FlowAnalyze
  6. {
  7. class DelegatingDuplexPipe<TDelegatingStream> : IDuplexPipe, IAsyncDisposable where TDelegatingStream : DelegatingStream
  8. {
  9. private bool disposed;
  10. private readonly object syncRoot = new();
  11. public PipeReader Input { get; }
  12. public PipeWriter Output { get; }
  13. public DelegatingDuplexPipe(IDuplexPipe duplexPipe, Func<Stream, TDelegatingStream> delegatingStreamFactory) :
  14. this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), delegatingStreamFactory)
  15. {
  16. }
  17. public DelegatingDuplexPipe(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func<Stream, TDelegatingStream> delegatingStreamFactory)
  18. {
  19. var delegatingStream = delegatingStreamFactory(duplexPipe.AsStream());
  20. this.Input = PipeReader.Create(delegatingStream, readerOptions);
  21. this.Output = PipeWriter.Create(delegatingStream, writerOptions);
  22. }
  23. public virtual async ValueTask DisposeAsync()
  24. {
  25. lock (this.syncRoot)
  26. {
  27. if (this.disposed == true)
  28. {
  29. return;
  30. }
  31. this.disposed = true;
  32. }
  33. await this.Input.CompleteAsync();
  34. await this.Output.CompleteAsync();
  35. }
  36. }
  37. }