This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 21ef82206a4 [test](beut) add VDataStreamRecvr beut (#48188)
21ef82206a4 is described below
commit 21ef82206a4c44685e7d360fe1bafce0fb3fb07b
Author: Mryange <[email protected]>
AuthorDate: Tue Feb 25 12:51:41 2025 +0800
[test](beut) add VDataStreamRecvr beut (#48188)
### What problem does this PR solve?
add VDataStreamRecvr beut
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [x] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [x] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [x] Confirm the release note
- [x] Confirm test cases
- [x] Confirm document
- [x] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/pipeline/exec/exchange_source_operator.cpp | 4 +-
be/src/vec/runtime/vdata_stream_mgr.cpp | 9 +-
be/src/vec/runtime/vdata_stream_mgr.h | 5 +-
be/src/vec/runtime/vdata_stream_recvr.cpp | 80 ++-
be/src/vec/runtime/vdata_stream_recvr.h | 20 +-
be/test/pipeline/exec/vdata_stream_recvr_test.cpp | 569 +++++++++++++++++++++
be/test/pipeline/pipeline_test.cpp | 5 +-
.../{mock_runtime_state.h => mock_query_context.h} | 31 +-
be/test/testutil/mock/mock_runtime_state.h | 7 +-
9 files changed, 643 insertions(+), 87 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index bbec3d92703..2aefdf8cd3d 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -65,8 +65,8 @@ Status ExchangeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<ExchangeSourceOperatorX>();
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
- state, _memory_used_counter, p.input_row_desc(),
state->fragment_instance_id(),
- p.node_id(), p.num_senders(), profile(), p.is_merging(),
+ state, _memory_used_counter, state->fragment_instance_id(),
p.node_id(),
+ p.num_senders(), profile(), p.is_merging(),
std::max(20480, config::exchg_node_buffer_size_bytes /
(p.is_merging() ? p.num_senders() : 1)));
const auto& queues = stream_recvr->sender_queues();
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 16d38ce2ea0..e31a2fbfea5 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -66,15 +66,14 @@ inline uint32_t VDataStreamMgr::get_hash_value(const
TUniqueId& fragment_instanc
std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr(
RuntimeState* state, RuntimeProfile::HighWaterMarkCounter*
memory_used_counter,
- const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id,
- PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile,
bool is_merging,
- size_t data_queue_capacity) {
+ const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int
num_senders,
+ RuntimeProfile* profile, bool is_merging, size_t data_queue_capacity) {
DCHECK(profile != nullptr);
VLOG_FILE << "creating receiver for fragment=" <<
print_id(fragment_instance_id)
<< ", node=" << dest_node_id;
std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
- this, memory_used_counter, state, row_desc, fragment_instance_id,
dest_node_id,
- num_senders, is_merging, profile, data_queue_capacity));
+ this, memory_used_counter, state, fragment_instance_id,
dest_node_id, num_senders,
+ is_merging, profile, data_queue_capacity));
uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
std::unique_lock l(_lock);
_fragment_stream_set.insert(std::make_pair(fragment_instance_id,
dest_node_id));
diff --git a/be/src/vec/runtime/vdata_stream_mgr.h
b/be/src/vec/runtime/vdata_stream_mgr.h
index 9bf54e94cb8..f9b5bbe5bcd 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.h
+++ b/be/src/vec/runtime/vdata_stream_mgr.h
@@ -56,9 +56,8 @@ public:
std::shared_ptr<VDataStreamRecvr> create_recvr(
RuntimeState* state, RuntimeProfile::HighWaterMarkCounter*
memory_used_counter,
- const RowDescriptor& row_desc, const TUniqueId&
fragment_instance_id,
- PlanNodeId dest_node_id, int num_senders, RuntimeProfile* profile,
bool is_merging,
- size_t data_queue_capacity);
+ const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
int num_senders,
+ RuntimeProfile* profile, bool is_merging, size_t
data_queue_capacity);
Status find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId
node_id,
std::shared_ptr<VDataStreamRecvr>* res, bool
acquire_lock = true);
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 252104031e0..603270b7206 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -44,7 +44,7 @@ namespace doris::vectorized {
#include "common/compile_check_begin.h"
VDataStreamRecvr::SenderQueue::SenderQueue(
- VDataStreamRecvr* parent_recvr, int num_senders, RuntimeProfile*
profile,
+ VDataStreamRecvr* parent_recvr, int num_senders,
std::shared_ptr<pipeline::Dependency> local_channel_dependency)
: _recvr(parent_recvr),
_is_cancelled(false),
@@ -80,7 +80,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block,
bool* eos) {
#endif
BlockItem block_item;
{
- std::lock_guard<std::mutex> l(_lock);
+ INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
//check and get block_item from data_queue
if (_is_cancelled) {
RETURN_IF_ERROR(_cancel_status);
@@ -104,7 +104,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block*
block, bool* eos) {
COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
_recvr->_memory_used_counter->update(-(int64_t)block_byte_size);
- std::lock_guard<std::mutex> l(_lock);
+ INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
sub_blocks_memory_usage(block_byte_size);
_record_debug_info();
if (_block_queue.empty() && _source_dependency) {
@@ -131,7 +131,9 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block*
block, bool* eos) {
return Status::OK();
}
-void VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() {
+void
VDataStreamRecvr::SenderQueue::set_source_ready(std::lock_guard<std::mutex>&) {
+ // Here, it is necessary to check if _source_dependency is not nullptr.
+ // This is because the queue might be closed before setting the source
dependency.
if (!_source_dependency) {
return;
}
@@ -147,7 +149,7 @@ Status
VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock,
const int64_t wait_for_worker,
const uint64_t
time_to_find_recvr) {
{
- std::lock_guard<std::mutex> l(_lock);
+ INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
if (_is_cancelled) {
return Status::OK();
}
@@ -171,7 +173,7 @@ Status
VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock,
}
}
- std::lock_guard<std::mutex> l(_lock);
+ INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
if (_is_cancelled) {
return Status::OK();
}
@@ -189,7 +191,7 @@ Status
VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock,
_block_queue.emplace_back(std::move(pblock), block_byte_size);
COUNTER_UPDATE(_recvr->_remote_bytes_received_counter, block_byte_size);
_record_debug_info();
- try_set_dep_ready_without_lock();
+ set_source_ready(l);
// if done is nullptr, this function can't delay this response
if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
@@ -209,10 +211,14 @@ void VDataStreamRecvr::SenderQueue::add_block(Block*
block, bool use_move) {
return;
}
{
- std::unique_lock<std::mutex> l(_lock);
+ INJECT_MOCK_SLEEP(std::unique_lock<std::mutex> l(_lock));
if (_is_cancelled) {
return;
}
+ DCHECK(_num_remaining_senders >= 0);
+ if (_num_remaining_senders == 0) {
+ return;
+ }
}
BlockUPtr nblock =
Block::create_unique(block->get_columns_with_type_and_name());
@@ -230,13 +236,13 @@ void VDataStreamRecvr::SenderQueue::add_block(Block*
block, bool use_move) {
auto block_mem_size = nblock->allocated_bytes();
{
- std::unique_lock<std::mutex> l(_lock);
+ INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
if (_is_cancelled) {
return;
}
_block_queue.emplace_back(std::move(nblock), block_mem_size);
_record_debug_info();
- try_set_dep_ready_without_lock();
+ set_source_ready(l);
COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
_recvr->_memory_used_counter->update(block_mem_size);
add_blocks_memory_usage(block_mem_size);
@@ -244,7 +250,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block,
bool use_move) {
}
void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
- std::lock_guard<std::mutex> l(_lock);
+ INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
if (_sender_eos_set.end() != _sender_eos_set.find(be_number)) {
return;
}
@@ -256,25 +262,25 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int
be_number) {
<< print_id(_recvr->fragment_instance_id()) << " node_id=" <<
_recvr->dest_node_id()
<< " #senders=" << _num_remaining_senders;
if (_num_remaining_senders == 0) {
- try_set_dep_ready_without_lock();
+ set_source_ready(l);
}
}
void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) {
{
- std::lock_guard<std::mutex> l(_lock);
+ INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
if (_is_cancelled) {
return;
}
_is_cancelled = true;
_cancel_status = cancel_status;
- try_set_dep_ready_without_lock();
+ set_source_ready(l);
VLOG_QUERY << "cancelled stream: _fragment_instance_id="
<< print_id(_recvr->fragment_instance_id())
<< " node_id=" << _recvr->dest_node_id();
}
{
- std::lock_guard<std::mutex> l(_lock);
+ INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
for (auto closure_pair : _pending_closures) {
closure_pair.first->Run();
int64_t elapse_time = closure_pair.second.elapsed_time();
@@ -287,34 +293,30 @@ void VDataStreamRecvr::SenderQueue::cancel(Status
cancel_status) {
}
void VDataStreamRecvr::SenderQueue::close() {
- {
- // If _is_cancelled is not set to true, there may be concurrent send
- // which add batch to _block_queue. The batch added after _block_queue
- // is clear will be memory leak
- std::lock_guard<std::mutex> l(_lock);
- _is_cancelled = true;
- try_set_dep_ready_without_lock();
+ // If _is_cancelled is not set to true, there may be concurrent send
+ // which add batch to _block_queue. The batch added after _block_queue
+ // is clear will be memory leak
+ INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
+ _is_cancelled = true;
+ set_source_ready(l);
- for (auto closure_pair : _pending_closures) {
- closure_pair.first->Run();
- int64_t elapse_time = closure_pair.second.elapsed_time();
- if (_recvr->_max_wait_to_process_time->value() < elapse_time) {
- _recvr->_max_wait_to_process_time->set(elapse_time);
- }
+ for (auto closure_pair : _pending_closures) {
+ closure_pair.first->Run();
+ int64_t elapse_time = closure_pair.second.elapsed_time();
+ if (_recvr->_max_wait_to_process_time->value() < elapse_time) {
+ _recvr->_max_wait_to_process_time->set(elapse_time);
}
- _pending_closures.clear();
}
-
+ _pending_closures.clear();
// Delete any batches queued in _block_queue
_block_queue.clear();
}
VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr,
RuntimeProfile::HighWaterMarkCounter*
memory_used_counter,
- RuntimeState* state, const RowDescriptor&
row_desc,
- const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id,
- int num_senders, bool is_merging,
RuntimeProfile* profile,
- size_t data_queue_capacity)
+ RuntimeState* state, const TUniqueId&
fragment_instance_id,
+ PlanNodeId dest_node_id, int num_senders,
bool is_merging,
+ RuntimeProfile* profile, size_t
data_queue_capacity)
: HasTaskExecutionCtx(state),
_mgr(stream_mgr),
_memory_used_counter(memory_used_counter),
@@ -322,7 +324,6 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr*
stream_mgr,
_query_context(state->get_query_ctx()->shared_from_this()),
_fragment_instance_id(fragment_instance_id),
_dest_node_id(dest_node_id),
- _row_desc(row_desc),
_is_merging(is_merging),
_is_closed(false),
_sender_queue_mem_limit(data_queue_capacity),
@@ -344,7 +345,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr*
stream_mgr,
int num_sender_per_queue = is_merging ? 1 : num_senders;
for (int i = 0; i < num_queues; ++i) {
SenderQueue* queue = nullptr;
- queue = _sender_queue_pool.add(new SenderQueue(this,
num_sender_per_queue, profile,
+ queue = _sender_queue_pool.add(new SenderQueue(this,
num_sender_per_queue,
_sender_to_local_channel_dependency[i]));
_sender_queues.push_back(queue);
}
@@ -495,11 +496,8 @@ void VDataStreamRecvr::close() {
}
void VDataStreamRecvr::set_sink_dep_always_ready() const {
- for (auto* sender_queues : sender_queues()) {
- auto dep = sender_queues->local_channel_dependency();
- if (dep) {
- dep->set_always_ready();
- }
+ for (auto dep : _sender_to_local_channel_dependency) {
+ dep->set_always_ready();
}
}
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 0d444dbb397..85cfe08005a 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -73,10 +73,9 @@ class VDataStreamRecvr : public HasTaskExecutionCtx {
public:
class SenderQueue;
VDataStreamRecvr(VDataStreamMgr* stream_mgr,
RuntimeProfile::HighWaterMarkCounter* counter,
- RuntimeState* state, const RowDescriptor& row_desc,
- const TUniqueId& fragment_instance_id, PlanNodeId
dest_node_id,
- int num_senders, bool is_merging, RuntimeProfile* profile,
- size_t data_queue_capacity);
+ RuntimeState* state, const TUniqueId&
fragment_instance_id,
+ PlanNodeId dest_node_id, int num_senders, bool is_merging,
+ RuntimeProfile* profile, size_t data_queue_capacity);
~VDataStreamRecvr() override;
@@ -97,7 +96,6 @@ public:
const TUniqueId& fragment_instance_id() const { return
_fragment_instance_id; }
PlanNodeId dest_node_id() const { return _dest_node_id; }
- const RowDescriptor& row_desc() const { return _row_desc; }
// Indicate that a particular sender is done. Delegated to the appropriate
// sender queue. Called from DataStreamMgr.
@@ -176,15 +174,11 @@ private:
class VDataStreamRecvr::SenderQueue {
public:
- SenderQueue(VDataStreamRecvr* parent_recvr, int num_senders,
RuntimeProfile* profile,
+ SenderQueue(VDataStreamRecvr* parent_recvr, int num_senders,
std::shared_ptr<pipeline::Dependency>
local_channel_dependency);
~SenderQueue();
- std::shared_ptr<pipeline::Dependency> local_channel_dependency() {
- return _local_channel_dependency;
- }
-
Status get_batch(Block* next_block, bool* eos);
Status add_block(std::unique_ptr<PBlock> pblock, int be_number, int64_t
packet_seq,
@@ -203,15 +197,15 @@ public:
_source_dependency = dependency;
}
+protected:
void add_blocks_memory_usage(int64_t size);
void sub_blocks_memory_usage(int64_t size);
bool exceeds_limit();
-
-protected:
friend class pipeline::ExchangeLocalState;
- void try_set_dep_ready_without_lock();
+
+ void set_source_ready(std::lock_guard<std::mutex>&);
// To record information about several variables in the event of a DCHECK
failure.
// DCHECK(_is_cancelled || !_block_queue.empty() ||
_num_remaining_senders == 0)
diff --git a/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
new file mode 100644
index 00000000000..636772361c9
--- /dev/null
+++ b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
@@ -0,0 +1,569 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vdata_stream_recvr.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <thread>
+#include <vector>
+
+#include "pipeline/dependency.h"
+#include "pipeline/exec/multi_cast_data_streamer.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "vec/data_types/data_type_number.h"
+
+namespace doris::pipeline {
+using namespace vectorized;
+
+struct MockVDataStreamRecvr : public VDataStreamRecvr {
+ MockVDataStreamRecvr(RuntimeState* state,
RuntimeProfile::HighWaterMarkCounter* counter,
+ RuntimeProfile* profile, int num_senders, bool
is_merging)
+ : VDataStreamRecvr(nullptr, counter, state, TUniqueId(), 0,
num_senders, is_merging,
+ profile, 1) {};
+};
+
+class DataStreamRecvrTest : public testing::Test {
+public:
+ DataStreamRecvrTest() = default;
+ ~DataStreamRecvrTest() override = default;
+ void SetUp() override {}
+
+ void create_recvr(int num_senders, bool is_merging) {
+ _mock_counter =
+
std::make_unique<RuntimeProfile::HighWaterMarkCounter>(TUnit::UNIT, 0, "test");
+ _mock_state = std::make_unique<MockRuntimeState>();
+ _mock_profile = std::make_unique<RuntimeProfile>("test");
+ recvr = std::make_unique<MockVDataStreamRecvr>(_mock_state.get(),
_mock_counter.get(),
+ _mock_profile.get(),
num_senders,
+ is_merging);
+ }
+
+ std::unique_ptr<MockVDataStreamRecvr> recvr;
+
+ std::unique_ptr<RuntimeProfile::HighWaterMarkCounter> _mock_counter;
+
+ std::unique_ptr<MockRuntimeState> _mock_state;
+
+ std::unique_ptr<RuntimeProfile> _mock_profile;
+};
+
+TEST_F(DataStreamRecvrTest, TestCreateSenderQueue) {
+ {
+ create_recvr(3, false);
+ EXPECT_EQ(recvr->sender_queues().size(), 1);
+ EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+ }
+
+ {
+ create_recvr(3, true);
+ EXPECT_EQ(recvr->sender_queues().size(), 3);
+ for (auto& queue : recvr->sender_queues()) {
+ EXPECT_EQ(queue->_num_remaining_senders, 1);
+ }
+ }
+}
+
+TEST_F(DataStreamRecvrTest, TestSender) {
+ create_recvr(3, false);
+ EXPECT_EQ(recvr->sender_queues().size(), 1);
+ EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+
+ auto* sender = recvr->sender_queues().back();
+
+ auto sink_dep = sender->_local_channel_dependency;
+ auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
+ sender->set_dependency(source_dep);
+
+ EXPECT_EQ(sink_dep->ready(), true);
+ EXPECT_EQ(source_dep->ready(), false);
+
+ EXPECT_EQ(sender->_num_remaining_senders, 3);
+ EXPECT_EQ(sender->_block_queue.size(), 0);
+
+ {
+ auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+ sender->add_block(&block, false);
+ }
+
+ EXPECT_EQ(sink_dep->ready(), false);
+
+ {
+ EXPECT_EQ(sender->_block_queue.size(), 1);
+ Block block;
+ bool eos = false;
+ auto st = sender->get_batch(&block, &eos);
+ EXPECT_TRUE(st) << st.msg();
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block, ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5})));
+ EXPECT_FALSE(eos);
+ }
+
+ {
+ sender->decrement_senders(1);
+ EXPECT_EQ(sender->_num_remaining_senders, 2);
+ sender->decrement_senders(2);
+ EXPECT_EQ(sender->_num_remaining_senders, 1);
+ sender->decrement_senders(3);
+ EXPECT_EQ(sender->_num_remaining_senders, 0);
+
+ EXPECT_EQ(sender->_block_queue.size(), 0);
+ auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+ sender->add_block(&block, false);
+ EXPECT_EQ(sender->_block_queue.size(), 0);
+ }
+
+ {
+ EXPECT_EQ(sender->_block_queue.size(), 0);
+ Block block;
+ bool eos = false;
+ auto st = sender->get_batch(&block, &eos);
+ EXPECT_TRUE(st) << st.msg();
+ EXPECT_TRUE(eos);
+ }
+
+ sender->close();
+}
+
+TEST_F(DataStreamRecvrTest, TestSenderClose) {
+ create_recvr(3, false);
+ EXPECT_EQ(recvr->sender_queues().size(), 1);
+ EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+
+ auto* sender = recvr->sender_queues().back();
+
+ auto sink_dep = sender->_local_channel_dependency;
+ sender->close();
+}
+
+TEST_F(DataStreamRecvrTest, TestRandomSender) {
+ create_recvr(3, false);
+ EXPECT_EQ(recvr->sender_queues().size(), 1);
+ EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+
+ auto* sender = recvr->sender_queues().back();
+
+ auto sink_dep = sender->_local_channel_dependency;
+ auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
+ sender->set_dependency(source_dep);
+
+ EXPECT_EQ(sink_dep->ready(), true);
+ EXPECT_EQ(source_dep->ready(), false);
+
+ auto input_func = [&](int id) {
+ mock_random_sleep();
+ int input_block = 0;
+ while (true) {
+ if (sink_dep->ready()) {
+ auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2,
3, 4, 5});
+ sender->add_block(&block, false);
+ input_block++;
+ if (input_block == 100) {
+ sender->decrement_senders(id);
+ break;
+ }
+ }
+ }
+ };
+
+ auto outout_func = [&]() {
+ mock_random_sleep();
+ int output_block = 0;
+ while (true) {
+ if (source_dep->ready()) {
+ Block block;
+ bool eos = false;
+ auto st = sender->get_batch(&block, &eos);
+ EXPECT_TRUE(st) << st.msg();
+ if (!block.empty()) {
+ output_block++;
+ }
+ if (eos) {
+ EXPECT_EQ(output_block, 3 * 100);
+ break;
+ }
+ }
+ }
+ };
+
+ std::thread input1(input_func, 1);
+ std::thread input2(input_func, 2);
+ std::thread input3(input_func, 3);
+ std::thread output(outout_func);
+
+ input1.join();
+ input2.join();
+ input3.join();
+ output.join();
+}
+
+TEST_F(DataStreamRecvrTest, TestRandomCloseSender) {
+ create_recvr(3, false);
+ EXPECT_EQ(recvr->sender_queues().size(), 1);
+ EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+
+ auto* sender = recvr->sender_queues().back();
+
+ auto sink_dep = sender->_local_channel_dependency;
+ auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
+ sender->set_dependency(source_dep);
+
+ EXPECT_EQ(sink_dep->ready(), true);
+ EXPECT_EQ(source_dep->ready(), false);
+
+ auto input_func = [&](int id) {
+ mock_random_sleep();
+ int input_block = 0;
+ while (true) {
+ if (sink_dep->ready()) {
+ auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2,
3, 4, 5});
+ sender->add_block(&block, false);
+ input_block++;
+ if (input_block == 100) {
+ sender->decrement_senders(id);
+ break;
+ }
+ }
+ }
+ std::cout << "input func : " << id << " end " << std::endl;
+ };
+
+ auto outout_func = [&]() {
+ mock_random_sleep();
+ try {
+ while (true) {
+ if (source_dep->ready()) {
+ Block block;
+ bool eos = false;
+ auto st = sender->get_batch(&block, &eos);
+
+ if (!st.ok()) {
+ std::cout << "get_batch error: " << st.msg() <<
std::endl;
+ break;
+ }
+ if (eos) {
+ break;
+ }
+ }
+ }
+ } catch (std::exception& e) {
+ std::cout << "exception: " << e.what() << std::endl;
+ }
+ std::cout << "output func end" << std::endl;
+ };
+
+ auto close_func = [&]() {
+ try {
+ mock_random_sleep();
+ std::cout << "close_func start" << std::endl;
+ recvr->close();
+ std::cout << "close_func end" << std::endl;
+ } catch (const std::exception& e) {
+ std::cout << "close exception: " << e.what() << std::endl;
+ }
+ };
+
+ std::vector<std::thread> threads;
+ threads.emplace_back(input_func, 1);
+ threads.emplace_back(input_func, 2);
+ threads.emplace_back(input_func, 3);
+ threads.emplace_back(outout_func);
+ threads.emplace_back(close_func);
+
+ for (auto& t : threads) {
+ if (t.joinable()) {
+ try {
+ t.join();
+ } catch (const std::system_error& e) {
+ std::cout << "Thread join error: " << e.what() << std::endl;
+ }
+ }
+ }
+}
+
+class MockClosure : public google::protobuf::Closure {
+ MockClosure() = default;
+
+ ~MockClosure() override = default;
+
+ void Run() override { _cb(); }
+ std::function<void()> _cb;
+};
+
+void to_pblock(Block& block, PBlock* pblock) {
+ size_t uncompressed_bytes = 0;
+ size_t compressed_bytes = 0;
+ EXPECT_TRUE(block.serialize(BeExecVersionManager::get_newest_version(),
pblock,
+ &uncompressed_bytes, &compressed_bytes,
+
segment_v2::CompressionTypePB::NO_COMPRESSION));
+}
+
+TEST_F(DataStreamRecvrTest, TestRemoteSender) {
+ create_recvr(3, false);
+ EXPECT_EQ(recvr->sender_queues().size(), 1);
+ EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+
+ auto* sender = recvr->sender_queues().back();
+
+ auto sink_dep = sender->_local_channel_dependency;
+ auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
+ sender->set_dependency(source_dep);
+
+ EXPECT_EQ(sink_dep->ready(), true);
+ EXPECT_EQ(source_dep->ready(), false);
+
+ EXPECT_EQ(sender->_num_remaining_senders, 3);
+ EXPECT_EQ(sender->_block_queue.size(), 0);
+
+ {
+ auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+ auto pblock = std::make_unique<PBlock>();
+ to_pblock(block, pblock.get());
+ MockClosure closure;
+ closure._cb = [&]() { std::cout << "cb" << std::endl; };
+ google::protobuf::Closure* done = &closure;
+ auto st = sender->add_block(std::move(pblock), 1, 1, &done, 0, 0);
+ if (done != nullptr) {
+ done->Run();
+ }
+ }
+ sender->close();
+}
+
+TEST_F(DataStreamRecvrTest, TestRemoteMemLimitSender) {
+ create_recvr(3, false);
+ EXPECT_EQ(recvr->sender_queues().size(), 1);
+ EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+
+ auto* sender = recvr->sender_queues().back();
+
+ auto sink_dep = sender->_local_channel_dependency;
+ auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
+ sender->set_dependency(source_dep);
+
+ EXPECT_EQ(sink_dep->ready(), true);
+ EXPECT_EQ(source_dep->ready(), false);
+
+ EXPECT_EQ(sender->_num_remaining_senders, 3);
+ EXPECT_EQ(sender->_block_queue.size(), 0);
+
+ config::exchg_node_buffer_size_bytes = 1;
+
+ Defer set_([&]() { config::exchg_node_buffer_size_bytes = 20485760; });
+
+ {
+ auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+ auto pblock = std::make_unique<PBlock>();
+ to_pblock(block, pblock.get());
+ MockClosure closure;
+ bool flag = false;
+ closure._cb = [&]() {
+ std::cout << "cb" << std::endl;
+ EXPECT_TRUE(flag);
+ };
+ google::protobuf::Closure* done = &closure;
+ auto st = sender->add_block(std::move(pblock), 1, 1, &done, 0, 0);
+ EXPECT_EQ(done, nullptr);
+ flag = true;
+
+ {
+ Block block;
+ bool eos = false;
+ auto st = sender->get_batch(&block, &eos);
+ }
+ }
+ sender->close();
+}
+
+TEST_F(DataStreamRecvrTest, TestRemoteMultiSender) {
+ create_recvr(3, false);
+ EXPECT_EQ(recvr->sender_queues().size(), 1);
+ EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+
+ auto* sender = recvr->sender_queues().back();
+
+ auto sink_dep = sender->_local_channel_dependency;
+ auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
+ sender->set_dependency(source_dep);
+
+ EXPECT_EQ(sink_dep->ready(), true);
+ EXPECT_EQ(source_dep->ready(), false);
+
+ std::vector<std::shared_ptr<MockClosure>> closures {10};
+
+ for (auto i = 0; i < 10; i++) {
+ closures[i] = std::make_shared<MockClosure>();
+ }
+
+ auto input_func = [&](int id) {
+ mock_random_sleep();
+ int input_block = 0;
+ auto closure = closures[id];
+ std::atomic_bool cb_flag = true;
+ closure->_cb = [&]() { cb_flag = true; };
+ while (true) {
+ if (sink_dep->ready() && cb_flag) {
+ auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2,
3, 4, 5});
+ auto pblock = std::make_unique<PBlock>();
+ to_pblock(block, pblock.get());
+ google::protobuf::Closure* done = closure.get();
+ cb_flag = false;
+ auto st = sender->add_block(std::move(pblock), id,
input_block, &done, 0, 0);
+ EXPECT_TRUE(st) << st.msg();
+ input_block++;
+ if (done != nullptr) {
+ done->Run();
+ }
+ if (input_block == 100) {
+ sender->decrement_senders(id);
+ break;
+ }
+ }
+ }
+ std::cout << "input func : " << id << " end "
+ << "input_block : " << input_block << std::endl;
+ };
+
+ auto outout_func = [&]() {
+ mock_random_sleep();
+ int output_block = 0;
+ while (true) {
+ if (source_dep->ready()) {
+ Block block;
+ bool eos = false;
+ auto st = sender->get_batch(&block, &eos);
+ EXPECT_TRUE(st) << st.msg();
+ if (!block.empty()) {
+ output_block++;
+ }
+ if (eos) {
+ EXPECT_EQ(output_block, 3 * 100);
+ break;
+ }
+ }
+ }
+ };
+
+ std::thread input1(input_func, 1);
+ std::thread input2(input_func, 2);
+ std::thread input3(input_func, 3);
+ std::thread output(outout_func);
+
+ input1.join();
+ input2.join();
+ input3.join();
+ output.join();
+}
+
+TEST_F(DataStreamRecvrTest, TestRemoteLocalMultiSender) {
+ create_recvr(3, false);
+ EXPECT_EQ(recvr->sender_queues().size(), 1);
+ EXPECT_EQ(recvr->sender_queues().back()->_num_remaining_senders, 3);
+
+ auto* sender = recvr->sender_queues().back();
+
+ auto sink_dep = sender->_local_channel_dependency;
+ auto source_dep = std::make_shared<Dependency>(0, 0, "test", false);
+ sender->set_dependency(source_dep);
+
+ EXPECT_EQ(sink_dep->ready(), true);
+ EXPECT_EQ(source_dep->ready(), false);
+
+ std::vector<std::shared_ptr<MockClosure>> closures {10};
+
+ for (auto i = 0; i < 10; i++) {
+ closures[i] = std::make_shared<MockClosure>();
+ }
+
+ auto input_remote_func = [&](int id) {
+ mock_random_sleep();
+ int input_block = 0;
+ auto closure = closures[id];
+ std::atomic_bool cb_flag = true;
+ closure->_cb = [&]() { cb_flag = true; };
+ while (true) {
+ if (sink_dep->ready() && cb_flag) {
+ auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2,
3, 4, 5});
+ auto pblock = std::make_unique<PBlock>();
+ to_pblock(block, pblock.get());
+ google::protobuf::Closure* done = closure.get();
+ cb_flag = false;
+ auto st = sender->add_block(std::move(pblock), id,
input_block, &done, 0, 0);
+ EXPECT_TRUE(st) << st.msg();
+ input_block++;
+ if (done != nullptr) {
+ done->Run();
+ }
+ if (input_block == 100) {
+ sender->decrement_senders(id);
+ break;
+ }
+ }
+ }
+ };
+
+ auto input_local_func = [&](int id) {
+ mock_random_sleep();
+ int input_block = 0;
+ while (true) {
+ if (sink_dep->ready()) {
+ auto block = ColumnHelper::create_block<DataTypeInt32>({1, 2,
3, 4, 5});
+ sender->add_block(&block, false);
+ input_block++;
+ if (input_block == 100) {
+ sender->decrement_senders(id);
+ break;
+ }
+ }
+ }
+ };
+
+ auto outout_func = [&]() {
+ mock_random_sleep();
+ int output_block = 0;
+ while (true) {
+ if (source_dep->ready()) {
+ Block block;
+ bool eos = false;
+ auto st = sender->get_batch(&block, &eos);
+ EXPECT_TRUE(st) << st.msg();
+ if (!block.empty()) {
+ output_block++;
+ }
+ if (eos) {
+ EXPECT_EQ(output_block, 3 * 100);
+ break;
+ }
+ }
+ }
+ };
+
+ std::thread input1(input_remote_func, 1);
+ std::thread input2(input_local_func, 2);
+ std::thread input3(input_remote_func, 3);
+ std::thread output(outout_func);
+
+ input1.join();
+ input2.join();
+ input3.join();
+ output.join();
+}
+// ./run-be-ut.sh --run --filter=DataStreamRecvrTest.*
+
+} // namespace doris::pipeline
diff --git a/be/test/pipeline/pipeline_test.cpp
b/be/test/pipeline/pipeline_test.cpp
index 1c76f2d3c9c..b915dba0fa9 100644
--- a/be/test/pipeline/pipeline_test.cpp
+++ b/be/test/pipeline/pipeline_test.cpp
@@ -309,7 +309,7 @@ TEST_F(PipelineTest, HAPPY_PATH) {
EXPECT_GT(block_mem_usage - 1, 0);
auto downstream_recvr = ExecEnv::GetInstance()->_vstream_mgr->create_recvr(
- downstream_runtime_state.get(), memory_used_counter,
op->row_desc(), dest0, 1, 1,
+ downstream_runtime_state.get(), memory_used_counter, dest0, 1, 1,
downstream_pipeline_profile.get(), false, block_mem_usage - 1);
std::vector<TScanRangeParams> scan_ranges;
EXPECT_EQ(_pipeline_tasks[cur_pipe->id()].back()->prepare(scan_ranges, 0,
tsink,
@@ -991,8 +991,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
auto* memory_used_counter =
downstream_pipeline_profile->AddHighWaterMarkCounter(
"MemoryUsage", TUnit::BYTES, "", 1);
downstream_recvr = ExecEnv::GetInstance()->_vstream_mgr->create_recvr(
- downstream_runtime_state.get(), memory_used_counter,
- _pipelines.front()->operators().back()->row_desc(),
dest_ins_id, dest_node_id,
+ downstream_runtime_state.get(), memory_used_counter,
dest_ins_id, dest_node_id,
parallelism, downstream_pipeline_profile.get(), false,
2048000);
}
for (size_t i = 0; i < _pipelines.size(); i++) {
diff --git a/be/test/testutil/mock/mock_runtime_state.h
b/be/test/testutil/mock/mock_query_context.h
similarity index 56%
copy from be/test/testutil/mock/mock_runtime_state.h
copy to be/test/testutil/mock/mock_query_context.h
index 506fafc2105..bd6b47483e4 100644
--- a/be/test/testutil/mock/mock_runtime_state.h
+++ b/be/test/testutil/mock/mock_query_context.h
@@ -16,28 +16,21 @@
// under the License.
#pragma once
-#include "runtime/runtime_state.h"
+#include "runtime/query_context.h"
namespace doris {
-class MockContext : public TaskExecutionContext {};
-
-class MockRuntimeState : public RuntimeState {
-public:
- MockRuntimeState() { set_task_execution_context(_mock_context); };
-
- int batch_size() const override { return batsh_size; }
-
- bool enable_shared_exchange_sink_buffer() const override {
- return _enable_shared_exchange_sink_buffer;
- }
-
- bool enable_local_exchange() const override { return true; }
-
- // default batch size
- int batsh_size = 4096;
- bool _enable_shared_exchange_sink_buffer = true;
- std::shared_ptr<MockContext> _mock_context =
std::make_shared<MockContext>();
+inline TQueryOptions create_fake_query_options() {
+ TQueryOptions query_options;
+ query_options.query_type = TQueryType::EXTERNAL;
+ return query_options;
+}
+
+struct MockQueryContext : public QueryContext {
+ MockQueryContext()
+ : QueryContext(TUniqueId {}, ExecEnv::GetInstance(),
create_fake_query_options(),
+ TNetworkAddress {}, true, TNetworkAddress {},
+ QuerySource::GROUP_COMMIT_LOAD) {}
};
} // namespace doris
diff --git a/be/test/testutil/mock/mock_runtime_state.h
b/be/test/testutil/mock/mock_runtime_state.h
index 506fafc2105..b62a29c45eb 100644
--- a/be/test/testutil/mock/mock_runtime_state.h
+++ b/be/test/testutil/mock/mock_runtime_state.h
@@ -16,6 +16,7 @@
// under the License.
#pragma once
+#include "mock_query_context.h"
#include "runtime/runtime_state.h"
namespace doris {
@@ -24,7 +25,10 @@ class MockContext : public TaskExecutionContext {};
class MockRuntimeState : public RuntimeState {
public:
- MockRuntimeState() { set_task_execution_context(_mock_context); };
+ MockRuntimeState() {
+ set_task_execution_context(_mock_context);
+ _query_ctx = _query_ctx_uptr.get();
+ }
int batch_size() const override { return batsh_size; }
@@ -38,6 +42,7 @@ public:
int batsh_size = 4096;
bool _enable_shared_exchange_sink_buffer = true;
std::shared_ptr<MockContext> _mock_context =
std::make_shared<MockContext>();
+ std::unique_ptr<MockQueryContext> _query_ctx_uptr =
std::make_unique<MockQueryContext>();
};
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]