2
0
陈国伟 3 жил өмнө
parent
commit
4f6d0d0010

+ 44 - 0
FastGithub.FlowAnalyze/DelegatingDuplexPipe.cs

@@ -0,0 +1,44 @@
+using System;
+using System.IO;
+using System.IO.Pipelines;
+using System.Threading.Tasks;
+
+namespace FastGithub.FlowAnalyze
+{
+    class DelegatingDuplexPipe<TDelegatingStream> : IDuplexPipe, IAsyncDisposable where TDelegatingStream : DelegatingStream
+    {
+        private bool disposed;
+        private readonly object syncRoot = new();
+
+        public PipeReader Input { get; }
+
+        public PipeWriter Output { get; }
+
+        public DelegatingDuplexPipe(IDuplexPipe duplexPipe, Func<Stream, TDelegatingStream> delegatingStreamFactory) :
+            this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), delegatingStreamFactory)
+        {
+        }
+
+        public DelegatingDuplexPipe(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func<Stream, TDelegatingStream> delegatingStreamFactory)
+        {
+            var delegatingStream = delegatingStreamFactory(duplexPipe.AsStream());
+            this.Input = PipeReader.Create(delegatingStream, readerOptions);
+            this.Output = PipeWriter.Create(delegatingStream, writerOptions);
+        }
+
+        public virtual async ValueTask DisposeAsync()
+        {
+            lock (this.syncRoot)
+            {
+                if (this.disposed == true)
+                {
+                    return;
+                }
+                this.disposed = true;
+            }
+
+            await this.Input.CompleteAsync();
+            await this.Output.CompleteAsync();
+        }
+    }
+}

+ 142 - 0
FastGithub.FlowAnalyze/DelegatingStream.cs

@@ -0,0 +1,142 @@
+using System;
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace FastGithub.FlowAnalyze
+{
+    abstract class DelegatingStream : Stream
+    {
+        public Stream Inner { get; }
+
+        public DelegatingStream(Stream inner)
+        {
+            this.Inner = inner;
+        }
+
+        public override bool CanRead
+        {
+            get
+            {
+                return this.Inner.CanRead;
+            }
+        }
+
+        public override bool CanSeek
+        {
+            get
+            {
+                return this.Inner.CanSeek;
+            }
+        }
+
+        public override bool CanWrite
+        {
+            get
+            {
+                return this.Inner.CanWrite;
+            }
+        }
+
+        public override long Length
+        {
+            get
+            {
+                return this.Inner.Length;
+            }
+        }
+
+        public override long Position
+        {
+            get
+            {
+                return this.Inner.Position;
+            }
+
+            set
+            {
+                this.Inner.Position = value;
+            }
+        }
+
+        public override void Flush()
+        {
+            this.Inner.Flush();
+        }
+
+        public override Task FlushAsync(CancellationToken cancellationToken)
+        {
+            return this.Inner.FlushAsync(cancellationToken);
+        }
+
+        public override int Read(byte[] buffer, int offset, int count)
+        {
+            return this.Inner.Read(buffer, offset, count);
+        }
+
+        public override int Read(Span<byte> destination)
+        {
+            return this.Inner.Read(destination);
+        }
+
+        public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+        {
+            return this.Inner.ReadAsync(buffer, offset, count, cancellationToken);
+        }
+
+        public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
+        {
+            return this.Inner.ReadAsync(destination, cancellationToken);
+        }
+
+        public override long Seek(long offset, SeekOrigin origin)
+        {
+            return this.Inner.Seek(offset, origin);
+        }
+
+        public override void SetLength(long value)
+        {
+            this.Inner.SetLength(value);
+        }
+
+        public override void Write(byte[] buffer, int offset, int count)
+        {
+            this.Inner.Write(buffer, offset, count);
+        }
+
+        public override void Write(ReadOnlySpan<byte> source)
+        {
+            this.Inner.Write(source);
+        }
+
+        public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+        {
+            return this.Inner.WriteAsync(buffer, offset, count, cancellationToken);
+        }
+
+        public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
+        {
+            return this.Inner.WriteAsync(source, cancellationToken);
+        }
+
+        public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
+        {
+            return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);
+        }
+
+        public override int EndRead(IAsyncResult asyncResult)
+        {
+            return TaskToApm.End<int>(asyncResult);
+        }
+
+        public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
+        {
+            return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);
+        }
+
+        public override void EndWrite(IAsyncResult asyncResult)
+        {
+            TaskToApm.End(asyncResult);
+        }
+    }
+}

+ 0 - 167
FastGithub.FlowAnalyze/DuplexPipeStream.cs

@@ -1,167 +0,0 @@
-using System;
-using System.Buffers;
-using System.IO;
-using System.IO.Pipelines;
-using System.Runtime.CompilerServices;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace FastGithub.FlowAnalyze
-{
-    class DuplexPipeStream : Stream
-    {
-        private readonly PipeReader _input;
-        private readonly PipeWriter _output;
-        private readonly bool _throwOnCancelled;
-        private volatile bool _cancelCalled;
-
-        public DuplexPipeStream(PipeReader input, PipeWriter output, bool throwOnCancelled = false)
-        {
-            _input = input;
-            _output = output;
-            _throwOnCancelled = throwOnCancelled;
-        }
-
-        public void CancelPendingRead()
-        {
-            _cancelCalled = true;
-            _input.CancelPendingRead();
-        }
-
-        public override bool CanRead => true;
-
-        public override bool CanSeek => false;
-
-        public override bool CanWrite => true;
-
-        public override long Length
-        {
-            get
-            {
-                throw new NotSupportedException();
-            }
-        }
-
-        public override long Position
-        {
-            get
-            {
-                throw new NotSupportedException();
-            }
-            set
-            {
-                throw new NotSupportedException();
-            }
-        }
-
-        public override long Seek(long offset, SeekOrigin origin)
-        {
-            throw new NotSupportedException();
-        }
-
-        public override void SetLength(long value)
-        {
-            throw new NotSupportedException();
-        }
-
-        public override int Read(byte[] buffer, int offset, int count)
-        {
-            ValueTask<int> vt = ReadAsyncInternal(new Memory<byte>(buffer, offset, count), default);
-            return vt.IsCompleted ?
-                vt.Result :
-                vt.AsTask().GetAwaiter().GetResult();
-        }
-
-        public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
-        {
-            return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
-        }
-
-        public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
-        {
-            return ReadAsyncInternal(destination, cancellationToken);
-        }
-
-        public override void Write(byte[] buffer, int offset, int count)
-        {
-            WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
-        }
-
-        public override async  Task WriteAsync(byte[]? buffer, int offset, int count, CancellationToken cancellationToken)
-        {
-            await  _output.WriteAsync(buffer.AsMemory(offset, count), cancellationToken) ;
-        }
-
-        public override async ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
-        {
-            await _output.WriteAsync(source, cancellationToken);
-        }
-
-        public override void Flush()
-        {
-            FlushAsync(CancellationToken.None).GetAwaiter().GetResult();
-        }
-
-        public override async Task FlushAsync(CancellationToken cancellationToken)
-        {
-            await _output.FlushAsync(cancellationToken);
-        }
-
-        [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
-        private async ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
-        {
-            while (true)
-            {
-                var result = await _input.ReadAsync(cancellationToken);
-                var readableBuffer = result.Buffer;
-                try
-                {
-                    if (_throwOnCancelled && result.IsCanceled && _cancelCalled)
-                    {
-                        // Reset the bool
-                        _cancelCalled = false;
-                        throw new OperationCanceledException();
-                    }
-
-                    if (!readableBuffer.IsEmpty)
-                    {
-                        // buffer.Count is int
-                        var count = (int)Math.Min(readableBuffer.Length, destination.Length);
-                        readableBuffer = readableBuffer.Slice(0, count);
-                        readableBuffer.CopyTo(destination.Span);
-                        return count;
-                    }
-
-                    if (result.IsCompleted)
-                    {
-                        return 0;
-                    }
-                }
-                finally
-                {
-                    _input.AdvanceTo(readableBuffer.End, readableBuffer.End);
-                }
-            }
-        }
-
-        public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
-        {
-            return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);
-        }
-
-        public override int EndRead(IAsyncResult asyncResult)
-        {
-            return TaskToApm.End<int>(asyncResult);
-        }
-
-        public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
-        {
-            return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);
-        }
-
-        public override void EndWrite(IAsyncResult asyncResult)
-        {
-            TaskToApm.End(asyncResult);
-        }
-    }
-}

+ 0 - 53
FastGithub.FlowAnalyze/DuplexPipeStreamAdapter.cs

@@ -1,53 +0,0 @@
-using System;
-using System.IO;
-using System.IO.Pipelines;
-using System.Threading.Tasks;
-
-namespace FastGithub.FlowAnalyze
-{
-    class DuplexPipeStreamAdapter<TStream> : DuplexPipeStream, IDuplexPipe where TStream : Stream
-    {
-        private bool _disposed;
-        private readonly object _disposeLock = new object();
-
-        public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func<Stream, TStream> createStream) :
-            this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream)
-        {
-        }
-
-        public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func<Stream, TStream> createStream) :
-            base(duplexPipe.Input, duplexPipe.Output)
-        {
-            var stream = createStream(this);
-            Stream = stream;
-            Input = PipeReader.Create(stream, readerOptions);
-            Output = PipeWriter.Create(stream, writerOptions);
-        }
-
-        public TStream Stream { get; }
-
-        public PipeReader Input { get; }
-
-        public PipeWriter Output { get; }
-
-        public override async ValueTask DisposeAsync()
-        {
-            lock (_disposeLock)
-            {
-                if (_disposed)
-                {
-                    return;
-                }
-                _disposed = true;
-            }
-
-            await Input.CompleteAsync();
-            await Output.CompleteAsync();
-        }
-
-        protected override void Dispose(bool disposing)
-        {
-            throw new NotSupportedException();
-        }
-    }
-}

+ 175 - 0
FastGithub.FlowAnalyze/DuplexPipeStreamExtensions.cs

@@ -0,0 +1,175 @@
+using System;
+using System.Buffers;
+using System.IO;
+using System.IO.Pipelines;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace FastGithub.FlowAnalyze
+{
+    static class DuplexPipeStreamExtensions
+    {
+        public static Stream AsStream(this IDuplexPipe duplexPipe, bool throwOnCancelled = false)
+        {
+            return new DuplexPipeStream(duplexPipe, throwOnCancelled);
+        }
+
+        private class DuplexPipeStream : Stream
+        {
+            private readonly PipeReader input;
+            private readonly PipeWriter output;
+            private readonly bool throwOnCancelled;
+            private volatile bool cancelCalled;
+
+            public DuplexPipeStream(IDuplexPipe duplexPipe, bool throwOnCancelled = false)
+            {
+                this.input = duplexPipe.Input;
+                this.output = duplexPipe.Output;
+                this.throwOnCancelled = throwOnCancelled;
+            }
+
+            public void CancelPendingRead()
+            {
+                this.cancelCalled = true;
+                this.input.CancelPendingRead();
+            }
+
+            public override bool CanRead => true;
+
+            public override bool CanSeek => false;
+
+            public override bool CanWrite => true;
+
+            public override long Length
+            {
+                get
+                {
+                    throw new NotSupportedException();
+                }
+            }
+
+            public override long Position
+            {
+                get
+                {
+                    throw new NotSupportedException();
+                }
+                set
+                {
+                    throw new NotSupportedException();
+                }
+            }
+
+            public override long Seek(long offset, SeekOrigin origin)
+            {
+                throw new NotSupportedException();
+            }
+
+            public override void SetLength(long value)
+            {
+                throw new NotSupportedException();
+            }
+
+            public override int Read(byte[] buffer, int offset, int count)
+            {
+                ValueTask<int> vt = ReadAsyncInternal(new Memory<byte>(buffer, offset, count), default);
+                return vt.IsCompleted ?
+                    vt.Result :
+                    vt.AsTask().GetAwaiter().GetResult();
+            }
+
+            public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
+            {
+                return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
+            }
+
+            public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
+            {
+                return ReadAsyncInternal(destination, cancellationToken);
+            }
+
+            public override void Write(byte[] buffer, int offset, int count)
+            {
+                WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
+            }
+
+            public override async Task WriteAsync(byte[]? buffer, int offset, int count, CancellationToken cancellationToken)
+            {
+                await this.output.WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
+            }
+
+            public override async ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
+            {
+                await this.output.WriteAsync(source, cancellationToken);
+            }
+
+            public override void Flush()
+            {
+                FlushAsync(CancellationToken.None).GetAwaiter().GetResult();
+            }
+
+            public override async Task FlushAsync(CancellationToken cancellationToken)
+            {
+                await this.output.FlushAsync(cancellationToken);
+            }
+
+            [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
+            private async ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
+            {
+                while (true)
+                {
+                    var result = await this.input.ReadAsync(cancellationToken);
+                    var readableBuffer = result.Buffer;
+                    try
+                    {
+                        if (this.throwOnCancelled && result.IsCanceled && this.cancelCalled)
+                        {
+                            // Reset the bool
+                            this.cancelCalled = false;
+                            throw new OperationCanceledException();
+                        }
+
+                        if (!readableBuffer.IsEmpty)
+                        {
+                            // buffer.Count is int
+                            var count = (int)Math.Min(readableBuffer.Length, destination.Length);
+                            readableBuffer = readableBuffer.Slice(0, count);
+                            readableBuffer.CopyTo(destination.Span);
+                            return count;
+                        }
+
+                        if (result.IsCompleted)
+                        {
+                            return 0;
+                        }
+                    }
+                    finally
+                    {
+                        this.input.AdvanceTo(readableBuffer.End, readableBuffer.End);
+                    }
+                }
+            }
+
+            public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
+            {
+                return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);
+            }
+
+            public override int EndRead(IAsyncResult asyncResult)
+            {
+                return TaskToApm.End<int>(asyncResult);
+            }
+
+            public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
+            {
+                return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);
+            }
+
+            public override void EndWrite(IAsyncResult asyncResult)
+            {
+                TaskToApm.End(asyncResult);
+            }
+        }
+    }
+}

+ 3 - 3
FastGithub.FlowAnalyze/FlowAnalyzeDuplexPipe.cs

@@ -2,10 +2,10 @@
 
 namespace FastGithub.FlowAnalyze
 {
-    sealed class FlowAnalyzeDuplexPipe : DuplexPipeStreamAdapter<FlowAnalyzeStream>
+    sealed class FlowAnalyzeDuplexPipe : DelegatingDuplexPipe<FlowAnalyzeStream>
     {
-        public FlowAnalyzeDuplexPipe(IDuplexPipe transport, IFlowAnalyzer flowAnalyzer) :
-            base(transport, stream => new FlowAnalyzeStream(stream, flowAnalyzer))
+        public FlowAnalyzeDuplexPipe(IDuplexPipe duplexPipe, IFlowAnalyzer flowAnalyzer) :
+            base(duplexPipe, stream => new FlowAnalyzeStream(stream, flowAnalyzer))
         {
         }
     }

+ 10 - 98
FastGithub.FlowAnalyze/FlowAnalyzeStream.cs

@@ -5,155 +5,67 @@ using System.Threading.Tasks;
 
 namespace FastGithub.FlowAnalyze
 {
-
-    sealed class FlowAnalyzeStream : Stream
+    sealed class FlowAnalyzeStream : DelegatingStream
     {
-        private readonly Stream inner;
         private readonly IFlowAnalyzer flowAnalyzer;
 
         public FlowAnalyzeStream(Stream inner, IFlowAnalyzer flowAnalyzer)
+            : base(inner)
         {
-            this.inner = inner;
             this.flowAnalyzer = flowAnalyzer;
         }
 
-        public override bool CanRead
-        {
-            get
-            {
-                return inner.CanRead;
-            }
-        }
-
-        public override bool CanSeek
-        {
-            get
-            {
-                return inner.CanSeek;
-            }
-        }
-
-        public override bool CanWrite
-        {
-            get
-            {
-                return inner.CanWrite;
-            }
-        }
-
-        public override long Length
-        {
-            get
-            {
-                return inner.Length;
-            }
-        }
-
-        public override long Position
-        {
-            get
-            {
-                return inner.Position;
-            }
-
-            set
-            {
-                inner.Position = value;
-            }
-        }
-
-        public override void Flush()
-        {
-            inner.Flush();
-        }
-
-        public override Task FlushAsync(CancellationToken cancellationToken)
-        {
-            return inner.FlushAsync(cancellationToken);
-        }
-
         public override int Read(byte[] buffer, int offset, int count)
         {
-            int read = inner.Read(buffer, offset, count);
+            int read = base.Read(buffer, offset, count);
             this.flowAnalyzer.OnFlow(FlowType.Read, read);
             return read;
         }
 
         public override int Read(Span<byte> destination)
         {
-            int read = inner.Read(destination);
+            int read = base.Read(destination);
             this.flowAnalyzer.OnFlow(FlowType.Read, read);
             return read;
         }
 
         public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
         {
-            int read = await inner.ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
+            int read = await base.ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
             this.flowAnalyzer.OnFlow(FlowType.Read, read);
             return read;
         }
 
         public override async ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
         {
-            int read = await inner.ReadAsync(destination, cancellationToken);
+            int read = await base.ReadAsync(destination, cancellationToken);
             this.flowAnalyzer.OnFlow(FlowType.Read, read);
             return read;
         }
 
-        public override long Seek(long offset, SeekOrigin origin)
-        {
-            return inner.Seek(offset, origin);
-        }
-
-        public override void SetLength(long value)
-        {
-            inner.SetLength(value);
-        }
 
         public override void Write(byte[] buffer, int offset, int count)
         {
             this.flowAnalyzer.OnFlow(FlowType.Wirte, count);
-            inner.Write(buffer, offset, count);
+            base.Write(buffer, offset, count);
         }
 
         public override void Write(ReadOnlySpan<byte> source)
         {
             this.flowAnalyzer.OnFlow(FlowType.Wirte, source.Length);
-            inner.Write(source);
+            base.Write(source);
         }
 
         public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
         {
             this.flowAnalyzer.OnFlow(FlowType.Wirte, count);
-            return inner.WriteAsync(buffer, offset, count, cancellationToken);
+            return base.WriteAsync(buffer, offset, count, cancellationToken);
         }
 
         public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
         {
             this.flowAnalyzer.OnFlow(FlowType.Wirte, source.Length);
-            return inner.WriteAsync(source, cancellationToken);
-        }
-
-
-        // The below APM methods call the underlying Read/WriteAsync methods which will still be logged.
-        public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
-        {
-            return TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);
-        }
-
-        public override int EndRead(IAsyncResult asyncResult)
-        {
-            return TaskToApm.End<int>(asyncResult);
-        }
-
-        public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
-        {
-            return TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);
-        }
-
-        public override void EndWrite(IAsyncResult asyncResult)
-        {
-            TaskToApm.End(asyncResult);
+            return base.WriteAsync(source, cancellationToken);
         }
     }
 }