|
@@ -80,6 +80,30 @@ public:
|
|
|
QueueChannel(QueueChannel&&) = delete;
|
|
|
QueueChannel& operator=(QueueChannel&&) = delete;
|
|
|
|
|
|
+ template <typename FunctionIdT, typename SequenceIdT>
|
|
|
+ Error startSendMessage(const FunctionIdT &FnId, const SequenceIdT &SeqNo) {
|
|
|
+ ++InFlightOutgoingMessages;
|
|
|
+ return orc::rpc::RawByteChannel::startSendMessage(FnId, SeqNo);
|
|
|
+ }
|
|
|
+
|
|
|
+ Error endSendMessage() {
|
|
|
+ --InFlightOutgoingMessages;
|
|
|
+ ++CompletedOutgoingMessages;
|
|
|
+ return orc::rpc::RawByteChannel::endSendMessage();
|
|
|
+ }
|
|
|
+
|
|
|
+ template <typename FunctionIdT, typename SequenceNumberT>
|
|
|
+ Error startReceiveMessage(FunctionIdT &FnId, SequenceNumberT &SeqNo) {
|
|
|
+ ++InFlightIncomingMessages;
|
|
|
+ return orc::rpc::RawByteChannel::startReceiveMessage(FnId, SeqNo);
|
|
|
+ }
|
|
|
+
|
|
|
+ Error endReceiveMessage() {
|
|
|
+ --InFlightIncomingMessages;
|
|
|
+ ++CompletedIncomingMessages;
|
|
|
+ return orc::rpc::RawByteChannel::endReceiveMessage();
|
|
|
+ }
|
|
|
+
|
|
|
Error readBytes(char *Dst, unsigned Size) override {
|
|
|
std::unique_lock<std::mutex> Lock(InQueue->getMutex());
|
|
|
while (Size) {
|
|
@@ -112,7 +136,10 @@ public:
|
|
|
return Error::success();
|
|
|
}
|
|
|
|
|
|
- Error send() override { return Error::success(); }
|
|
|
+ Error send() override {
|
|
|
+ ++SendCalls;
|
|
|
+ return Error::success();
|
|
|
+ }
|
|
|
|
|
|
void close() {
|
|
|
auto ChannelClosed = []() { return make_error<QueueChannelClosedError>(); };
|
|
@@ -124,6 +151,11 @@ public:
|
|
|
|
|
|
uint64_t NumWritten = 0;
|
|
|
uint64_t NumRead = 0;
|
|
|
+ std::atomic<size_t> InFlightIncomingMessages{0};
|
|
|
+ std::atomic<size_t> CompletedIncomingMessages{0};
|
|
|
+ std::atomic<size_t> InFlightOutgoingMessages{0};
|
|
|
+ std::atomic<size_t> CompletedOutgoingMessages{0};
|
|
|
+ std::atomic<size_t> SendCalls{0};
|
|
|
|
|
|
private:
|
|
|
|