This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 75d68f6ad64 [fix](core) Fix core caused by
VDataStreamMgr::transmit_block (#50560)
75d68f6ad64 is described below
commit 75d68f6ad64403b49af09ab32a336d7c4c97f74a
Author: Mryange <[email protected]>
AuthorDate: Wed Apr 30 19:42:33 2025 +0800
[fix](core) Fix core caused by VDataStreamMgr::transmit_block (#50560)
### What problem does this PR solve?
https://github.com/apache/doris/pull/50113
transmit_block should be marked as done when transmitting the last
block.
Otherwise, the previous block might reach a memory limit and set done to
null.
```
F20250430 11:49:29.413803 2176884 vdata_stream_recvr.cpp:200] Check failed:
*done != nullptr
*** Check failure stack trace: ***
@ 0x558196be1956 google::LogMessage::SendToLog()
@ 0x558196bde3a0 google::LogMessage::Flush()
@ 0x558196be2199 google::LogMessageFatal::~LogMessageFatal()
@ 0x558193b1659c
doris::vectorized::VDataStreamRecvr::SenderQueue::add_block()
@ 0x558193b1f7b5 doris::vectorized::VDataStreamRecvr::add_block()
@ 0x558193af6cf8
doris::vectorized::VDataStreamMgr::transmit_block()
@ 0x558157f7a685
doris::pipeline::DataStreamRecvrTest_transmit_block_Test::TestBody()
@ 0x5581973c8c0b
testing::internal::HandleSehExceptionsInMethodIfSupported<>()
@ 0x5581973c2a69
testing::internal::HandleExceptionsInMethodIfSupported<>()
@ 0x55819739943a testing::Test::Run()
@ 0x558197399e5e testing::TestInfo::Run()
@ 0x55819739a71e testing::TestSuite::Run()
@ 0x5581973a9dde testing::internal::UnitTestImpl::RunAllTests()
@ 0x5581973c9a56
testing::internal::HandleSehExceptionsInMethodIfSupported<>()
@ 0x5581973c3a61
testing::internal::HandleExceptionsInMethodIfSupported<>()
@ 0x5581973a85d3 testing::UnitTest::Run()
@ 0x5581593d8653 RUN_ALL_TESTS()
```
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [x] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [x] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/vec/runtime/vdata_stream_mgr.cpp | 16 ++++++-
be/src/vec/runtime/vdata_stream_mgr.h | 8 ++--
be/src/vec/runtime/vdata_stream_recvr.cpp | 4 +-
be/src/vec/runtime/vdata_stream_recvr.h | 2 +-
be/test/pipeline/exec/vdata_stream_recvr_test.cpp | 56 ++++++++++++++++++++++-
5 files changed, 78 insertions(+), 8 deletions(-)
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 2a4f4e22861..c81fa21fa34 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -150,9 +150,23 @@ Status VDataStreamMgr::transmit_block(const
PTransmitDataParams* request,
for (int i = 0; i < request->blocks_size(); i++) {
std::unique_ptr<PBlock> pblock_ptr = std::make_unique<PBlock>();
pblock_ptr->Swap(const_cast<PBlock*>(&request->blocks(i)));
+ auto pass_done = [&]() -> ::google::protobuf::Closure** {
+ // If it is eos, no callback is needed, done can be nullptr
+ if (eos) {
+ return nullptr;
+ }
+ // If it is the last block, a callback is needed, pass done
+ if (i == request->blocks_size() - 1) {
+ return done;
+ } else {
+ // If it is not the last block, the blocks in the request
currently belong to the same queue,
+ // and the callback is handled by the done of the last
block
+ return nullptr;
+ }
+ };
RETURN_IF_ERROR(recvr->add_block(
std::move(pblock_ptr), request->sender_id(),
request->be_number(),
- request->packet_seq() - request->blocks_size() + i, eos ?
nullptr : done,
+ request->packet_seq() - request->blocks_size() + i,
pass_done(),
wait_for_worker, cpu_time_stop_watch.elapsed_time()));
}
}
diff --git a/be/src/vec/runtime/vdata_stream_mgr.h
b/be/src/vec/runtime/vdata_stream_mgr.h
index f9b5bbe5bcd..a9266d02d96 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.h
+++ b/be/src/vec/runtime/vdata_stream_mgr.h
@@ -27,6 +27,7 @@
#include <unordered_map>
#include <utility>
+#include "common/be_mock_util.h"
#include "common/global_types.h"
#include "common/status.h"
#include "util/runtime_profile.h"
@@ -52,15 +53,16 @@ class VDataStreamRecvr;
class VDataStreamMgr {
public:
VDataStreamMgr();
- ~VDataStreamMgr();
+ MOCK_FUNCTION ~VDataStreamMgr();
std::shared_ptr<VDataStreamRecvr> create_recvr(
RuntimeState* state, RuntimeProfile::HighWaterMarkCounter*
memory_used_counter,
const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
int num_senders,
RuntimeProfile* profile, bool is_merging, size_t
data_queue_capacity);
- Status find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId
node_id,
- std::shared_ptr<VDataStreamRecvr>* res, bool
acquire_lock = true);
+ MOCK_FUNCTION Status find_recvr(const TUniqueId& fragment_instance_id,
PlanNodeId node_id,
+ std::shared_ptr<VDataStreamRecvr>* res,
+ bool acquire_lock = true);
Status deregister_recvr(const TUniqueId& fragment_instance_id, PlanNodeId
node_id);
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 603270b7206..5bcd7dd1ef9 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -489,7 +489,9 @@ void VDataStreamRecvr::close() {
}
// Remove this receiver from the DataStreamMgr that created it.
// TODO: log error msg
- static_cast<void>(_mgr->deregister_recvr(fragment_instance_id(),
dest_node_id()));
+ if (_mgr) {
+ static_cast<void>(_mgr->deregister_recvr(fragment_instance_id(),
dest_node_id()));
+ }
_mgr = nullptr;
_merger.reset();
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index c311668ae82..9325f4ada3f 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -111,7 +111,7 @@ public:
// Careful: stream sender will call this function for a local receiver,
// accessing members of receiver that are allocated by Object pool
// in this function is not safe.
- bool exceeds_limit(size_t block_byte_size);
+ MOCK_FUNCTION bool exceeds_limit(size_t block_byte_size);
bool queue_exceeds_limit(size_t byte_size) const;
bool is_closed() const { return _is_closed; }
diff --git a/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
index 636772361c9..bed8be3e4f3 100644
--- a/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
+++ b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
@@ -28,6 +28,7 @@
#include "testutil/column_helper.h"
#include "testutil/mock/mock_runtime_state.h"
#include "vec/data_types/data_type_number.h"
+#include "vec/runtime/vdata_stream_mgr.h"
namespace doris::pipeline {
using namespace vectorized;
@@ -37,6 +38,14 @@ struct MockVDataStreamRecvr : public VDataStreamRecvr {
RuntimeProfile* profile, int num_senders, bool
is_merging)
: VDataStreamRecvr(nullptr, counter, state, TUniqueId(), 0,
num_senders, is_merging,
profile, 1) {};
+
+ bool exceeds_limit(size_t block_byte_size) override {
+ if (always_exceeds_limit) {
+ return true;
+ }
+ return VDataStreamRecvr::exceeds_limit(block_byte_size);
+ }
+ bool always_exceeds_limit = false;
};
class DataStreamRecvrTest : public testing::Test {
@@ -50,12 +59,12 @@ public:
std::make_unique<RuntimeProfile::HighWaterMarkCounter>(TUnit::UNIT, 0, "test");
_mock_state = std::make_unique<MockRuntimeState>();
_mock_profile = std::make_unique<RuntimeProfile>("test");
- recvr = std::make_unique<MockVDataStreamRecvr>(_mock_state.get(),
_mock_counter.get(),
+ recvr = std::make_shared<MockVDataStreamRecvr>(_mock_state.get(),
_mock_counter.get(),
_mock_profile.get(),
num_senders,
is_merging);
}
- std::unique_ptr<MockVDataStreamRecvr> recvr;
+ std::shared_ptr<MockVDataStreamRecvr> recvr;
std::unique_ptr<RuntimeProfile::HighWaterMarkCounter> _mock_counter;
@@ -564,6 +573,49 @@ TEST_F(DataStreamRecvrTest, TestRemoteLocalMultiSender) {
input3.join();
output.join();
}
+
+struct MockVDataStreamMgr : public VDataStreamMgr {
+ ~MockVDataStreamMgr() override = default;
+ Status find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId
node_id,
+ std::shared_ptr<VDataStreamRecvr>* res, bool
acquire_lock = true) override {
+ *res = recvr;
+ return Status::OK();
+ }
+
+ std::shared_ptr<VDataStreamRecvr> recvr;
+};
+
+TEST_F(DataStreamRecvrTest, transmit_block) {
+ create_recvr(1, true);
+ recvr->always_exceeds_limit = true;
+
+ MockVDataStreamMgr mgr;
+ mgr.recvr = recvr;
+
+ MockClosure closure;
+ closure._cb = [&]() { std::cout << "cb" << std::endl; };
+ google::protobuf::Closure* done = &closure;
+
+ PTransmitDataParams request;
+ {
+ auto* pblock = request.add_blocks();
+ auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+ to_pblock(block, pblock);
+ }
+
+ {
+ auto* pblock = request.add_blocks();
+ auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+ to_pblock(block, pblock);
+ }
+
+ {
+ auto st = mgr.transmit_block(&request, &done, 1000);
+ EXPECT_TRUE(st) << st.msg();
+ }
+ recvr->close();
+}
+
// ./run-be-ut.sh --run --filter=DataStreamRecvrTest.*
} // namespace doris::pipeline
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]