Slontia opened a new issue, #2772: URL: https://github.com/apache/brpc/issues/2772
**Describe the bug (描述bug)** 使用 stream RPC,从 server 端向 client 端发送消息时,在 client 端遇到了 check failed,原因是解析 message 失败,没有从 header 中解析出预期的协议名称(解析逻辑在 `ParseRpcMessage` 里)。随后一段时间后,RPC 报错超时。 因为 example 中只包括了 client 给 server 发送 stream message 的例子,我不是很确定我这里的写法是否正确。 这是日志里发现的 check failed: ``` 14473 F0926 11:37:47.618417 7fb109dfe700 /my_workspace/incubator-brpc/incubator-brpc-1.0.0/src/brpc/stream.cpp:617] Check failed: false. 14474 #0 0x0000062f041e brpc::Stream::HandleRpcResponse() 14475 #1 0x0000062f0701 brpc::Stream::Consume() 14476 #2 0x000006144db0 bthread::ExecutionQueueBase::_execute() 14477 #3 0x000006147458 bthread::ExecutionQueueBase::_execute_tasks() 14478 #4 0x000006160fd7 bthread::TaskGroup::task_runner() 14479 #5 0x0000061443d1 bthread_make_fcontext ``` 我尝试在此处加了一些日志,观察到 header 里并未包含「PRPC」。 ``` C++ ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket, bool /*read_eof*/, const void*) { char header_buf[12]; const size_t n = source->copy_to(header_buf, sizeof(header_buf)); if (n >= 4) { void* dummy = header_buf; if (*(const uint32_t*)dummy != *(const uint32_t*)"PRPC") { LOG(ERROR) << "header is not PRPC, it is " << header_buf; // 踩到了这里 return MakeParseError(PARSE_ERROR_TRY_OTHERS); } } else { if (memcmp(header_buf, "PRPC", n) != 0) { LOG(ERROR) << "header is not PRPC, it is " << header_buf; return MakeParseError(PARSE_ERROR_TRY_OTHERS); } } ... } ``` 日志报错: ``` 14453 E0926 11:37:43.566200 7fb10c9fe700 /my_workspace/incubator-brpc/incubator-brpc-1.0.0/src/brpc/policy/baidu_rpc_protocol.cpp:99] header is not PRPC, it is 14454 ^R^H<8b><80><80>¨ï<9a>½3^P ``` **To Reproduce (复现方法)** 这是我 server 端的代码,做了一定程度的简化。目前的实现的顺序是 accept stream -> 回复 response -> write stream。 ``` C++ void MyServer::MyMethod(::google::protobuf::RpcController *cntl_base, const MyRequest *request, MyResponse *response, ::google::protobuf::Closure *done) { brpc::ClosureGuard done_guard(done); brpc::StreamId stream_id; int brpc_ret = 0; if (TD_UNLIKELY(0 != (brpc_ret = brpc::StreamAccept(&stream_id, *static_cast<brpc::Controller *>(cntl_base), nullptr)))) { response->set_success(false); return; } done_guard.reset(nullptr); // 回包 butil::IOBuf serialized_message_iobuf = GenerateData(); // 准备数据 if (TD_UNLIKELY(0 != (brpc_ret = brpc::StreamWrite(stream_id, serialized_message_iobuf)))) { brpc::StreamClose(stream_id); } } ``` 这是我 client 端的简化代码,将 closure 和 stream handler 集成到了一个类里。 ``` C++ class ClientClosure : public google::protobuf::Closure, public brpc::StreamInputHandler { enum DestructBit { k_stream_closed = 0x1, k_rpc_responded = 0x2 }; public: void Run() { // We do not retry RPC for failure because we will perform deadlock detection again in next term. const auto destruct_guard = create_scope_guard([this] { Destruct_(DestructBit::k_rpc_responded); }); if (TD_UNLIKELY(cntl_.Failed())) { brpc::StreamClose(stream_id_); return; } close_stream_guard.rollback(); } int on_received_messages(const brpc::StreamId id, butil::IOBuf *const messages[], size_t size) final { assert(id == stream_id_); // handle messages ... brpc::StreamClose(stream_id_); return 0; } void on_idle_timeout(const brpc::StreamId id) final { assert(id == stream_id_); brpc::StreamClose(stream_id_); } void on_closed(const brpc::StreamId id) final { assert(id == stream_id_); Destruct_(DestructBit::k_stream_closed); } void Send(const MyRequest &request) { brpc::StreamOptions stream_options; stream_options.handler = this; int brpc_ret = 0; if (0 != (brpc_ret = brpc::StreamCreate(&stream_id_, cntl_, &stream_options))) { delete this; } else { MyService_Stub(channel_.get()).MyMethod(&cntl_, &request, &response_, this); } } private: // The closure will not be destroyed until both the RPC is finished and the stream is closed. void Destruct_(const DestructBit bit) { if ((k_stream_closed | k_rpc_responded) == (destruct_bitset_ |= bit)) { delete this; } } bthread::CountdownEvent &latch_; Channel channel_; MyResponse response_; brpc::Controller cntl_; brpc::StreamId stream_id_{brpc::INVALID_STREAM_ID}; std::atomic<uint8_t> destruct_bitset_{0}; }; ``` **Expected behavior (期望行为)** 期望是 RPC 不超时,client 端能从 server 端接收到 response 和 stream message。 **Versions (各种版本)** OS: CentOS Linux release 7.2 (Final) Compiler: g++ (GCC) 10.2.1 20210130 (Red Hat 10.2.1-11) brpc: protobuf: **Additional context/screenshots (更多上下文/截图)** -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org