This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7d4da7a [fix](rpc) fix BE crash in SendRpcResponse when high
concurrency (#7413)
7d4da7a is described below
commit 7d4da7af5cc55764e52e5701c87f370e5fd990cd
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Dec 16 20:27:24 2021 +0800
[fix](rpc) fix BE crash in SendRpcResponse when high concurrency (#7413)
The response is accessed when done->Run is called in transmit_data(),
give response a default value to avoid null pointers in high concurrency.
---
be/src/service/internal_service.cpp | 8 ++++++--
be/src/util/proto_util.h | 12 +++++-------
gensrc/proto/data.proto | 1 -
gensrc/proto/internal_service.proto | 4 ++++
4 files changed, 15 insertions(+), 10 deletions(-)
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 90d1488..ee6cf7e 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -62,14 +62,18 @@ void
PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt
<< " node=" << request->node_id();
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
attachment_transfer_request_row_batch<PTransmitDataParams>(request, cntl);
- auto st = _exec_env->stream_mgr()->transmit_data(request, &done);
+ // The response is accessed when done->Run is called in transmit_data(),
+ // give response a default value to avoid null pointers in high
concurrency.
+ Status st;
+ st.to_protobuf(response->mutable_status());
+ st = _exec_env->stream_mgr()->transmit_data(request, &done);
if (!st.ok()) {
LOG(WARNING) << "transmit_data failed, message=" << st.get_error_msg()
<< ", fragment_instance_id=" <<
print_id(request->finst_id())
<< ", node=" << request->node_id();
}
- st.to_protobuf(response->mutable_status());
if (done != nullptr) {
+ st.to_protobuf(response->mutable_status());
done->Run();
}
}
diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h
index a9da2e1..9677a82 100644
--- a/be/src/util/proto_util.h
+++ b/be/src/util/proto_util.h
@@ -27,11 +27,11 @@ inline void request_row_batch_transfer_attachment(Params*
brpc_request, Closure*
if (brpc_request->has_row_batch() &&
config::transfer_data_by_brpc_attachment == true) {
butil::IOBuf attachment;
auto row_batch = brpc_request->mutable_row_batch();
- row_batch->set_transfer_by_attachment(true);
attachment.append(row_batch->tuple_data());
row_batch->clear_tuple_data();
row_batch->set_tuple_data("");
closure->cntl.request_attachment().swap(attachment);
+ brpc_request->set_transfer_by_attachment(true);
}
}
@@ -40,13 +40,11 @@ template <typename Params>
inline void attachment_transfer_request_row_batch(const Params* brpc_request,
brpc::Controller* cntl) {
Params* req = const_cast<Params*>(brpc_request);
- if (req->has_row_batch()) {
+ if (req->has_row_batch() && req->transfer_by_attachment()) {
auto rb = req->mutable_row_batch();
- if (rb->transfer_by_attachment()) {
- DCHECK(cntl->request_attachment().size() > 0);
- const butil::IOBuf& io_buf = cntl->request_attachment();
- io_buf.copy_to(rb->mutable_tuple_data(), io_buf.size(), 0);
- }
+ DCHECK(cntl->request_attachment().size() > 0);
+ const butil::IOBuf& io_buf = cntl->request_attachment();
+ io_buf.copy_to(rb->mutable_tuple_data(), io_buf.size(), 0);
}
}
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index 547afff..6e4ae7d 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -40,7 +40,6 @@ message PRowBatch {
repeated int32 tuple_offsets = 3;
required bytes tuple_data = 4;
required bool is_compressed = 5;
- optional bool transfer_by_attachment = 6 [default = false];
}
message PColumn {
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index b88b01d..ef43625 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -45,6 +45,8 @@ message PTransmitDataParams {
optional PQueryStatistics query_statistics = 8;
optional PBlock block = 9;
+ // transfer the RowBatch to the Controller Attachment
+ optional bool transfer_by_attachment = 10 [default = false];
};
message PTransmitDataResult {
@@ -96,6 +98,8 @@ message PTabletWriterAddBatchRequest {
repeated int64 partition_ids = 8;
// the backend which send this request
optional int64 backend_id = 9 [default = -1];
+ // transfer the RowBatch to the Controller Attachment
+ optional bool transfer_by_attachment = 10 [default = false];
};
message PTabletWriterAddBatchResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]