This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c3a64aa0b06 [UT](exchanger) Add UT case for shuffle exchanger (#47598)
c3a64aa0b06 is described below
commit c3a64aa0b06e8d1c32fcb483ab9bc1c127e18112
Author: Gabriel <[email protected]>
AuthorDate: Sat Feb 8 10:00:53 2025 +0800
[UT](exchanger) Add UT case for shuffle exchanger (#47598)
---
.../local_exchange_sink_operator.cpp | 13 +-
.../local_exchange/local_exchange_sink_operator.h | 5 +-
be/src/pipeline/local_exchange/local_exchanger.cpp | 22 +-
be/src/pipeline/local_exchange/local_exchanger.h | 4 +-
be/test/pipeline/local_exchanger_test.cpp | 286 +++++++++++++++++++++
5 files changed, 305 insertions(+), 25 deletions(-)
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index b22ee9fd77e..76e92428766 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -41,21 +41,17 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type,
const int num_buckets
_name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) +
")";
_type = type;
if (_type == ExchangeType::HASH_SHUFFLE) {
+ _shuffle_idx_to_instance_idx.clear();
_use_global_shuffle = use_global_hash_shuffle;
// For shuffle join, if data distribution has been broken by previous
operator, we
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To
be mentioned,
// we should use map shuffle idx to instance idx because all instances
will be
// distributed to all BEs. Otherwise, we should use shuffle idx
directly.
if (use_global_hash_shuffle) {
- std::for_each(shuffle_idx_to_instance_idx.begin(),
shuffle_idx_to_instance_idx.end(),
- [&](const auto& item) {
- DCHECK(item.first != -1);
-
_shuffle_idx_to_instance_idx.push_back({item.first, item.second});
- });
+ _shuffle_idx_to_instance_idx = shuffle_idx_to_instance_idx;
} else {
- _shuffle_idx_to_instance_idx.resize(_num_partitions);
for (int i = 0; i < _num_partitions; i++) {
- _shuffle_idx_to_instance_idx[i] = {i, i};
+ _shuffle_idx_to_instance_idx[i] = i;
}
}
_partitioner.reset(new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
@@ -147,7 +143,8 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
RETURN_IF_ERROR(local_state._exchanger->sink(
state, in_block, eos,
{local_state._compute_hash_value_timer,
local_state._distribute_timer, nullptr},
- {&local_state._channel_id, local_state._partitioner.get(),
&local_state}));
+ {&local_state._channel_id, local_state._partitioner.get(),
&local_state,
+ &_shuffle_idx_to_instance_idx}));
// If all exchange sources ended due to limit reached, current task should
also finish
if (local_state._exchanger->_running_source_operators == 0) {
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index c067f023c8d..7ef053af6f7 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -91,7 +91,7 @@ public:
_num_partitions(num_partitions),
_texprs(texprs),
_partitioned_exprs_num(texprs.size()),
- _bucket_seq_to_instance_idx(bucket_seq_to_instance_idx) {}
+ _shuffle_idx_to_instance_idx(bucket_seq_to_instance_idx) {}
Status init(const TPlanNode& tnode, RuntimeState* state) override {
return Status::InternalError("{} should not init with TPlanNode",
Base::_name);
@@ -116,8 +116,7 @@ private:
const std::vector<TExpr>& _texprs;
const size_t _partitioned_exprs_num;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
- const std::map<int, int> _bucket_seq_to_instance_idx;
- std::vector<std::pair<int, int>> _shuffle_idx_to_instance_idx;
+ std::map<int, int> _shuffle_idx_to_instance_idx;
bool _use_global_shuffle = false;
};
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 06843e18d86..8b55bd6b440 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -72,11 +72,7 @@ bool
Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState* local_st
BlockType& block, bool* eos,
vectorized::Block* data_block,
int channel_id) {
if (local_state == nullptr) {
- if (!_dequeue_data(block, eos, data_block, channel_id)) {
- throw Exception(ErrorCode::INTERNAL_ERROR, "Exchanger has no data:
{}",
- data_queue_debug_string(channel_id));
- }
- return true;
+ return _dequeue_data(block, eos, data_block, channel_id);
}
bool all_finished = _running_sink_operators == 0;
if (_data_queue[channel_id].try_dequeue(block)) {
@@ -160,7 +156,8 @@ Status ShuffleExchanger::sink(RuntimeState* state,
vectorized::Block* in_block,
{
SCOPED_TIMER(profile.distribute_timer);
RETURN_IF_ERROR(_split_rows(state,
sink_info.partitioner->get_channel_ids().get<uint32_t>(),
- in_block, *sink_info.channel_id,
sink_info.local_state));
+ in_block, *sink_info.channel_id,
sink_info.local_state,
+ sink_info.shuffle_idx_to_instance_idx));
}
return Status::OK();
@@ -214,7 +211,8 @@ Status ShuffleExchanger::get_block(RuntimeState* state,
vectorized::Block* block
Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t*
__restrict channel_ids,
vectorized::Block* block, int channel_id,
- LocalExchangeSinkLocalState* local_state)
{
+ LocalExchangeSinkLocalState* local_state,
+ std::map<int, int>*
shuffle_idx_to_instance_idx) {
if (local_state == nullptr) {
return _split_rows(state, channel_ids, block, channel_id);
}
@@ -249,8 +247,6 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
}
local_state->_shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes(),
channel_id);
- auto bucket_seq_to_instance_idx =
-
local_state->_parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx;
if (get_type() == ExchangeType::HASH_SHUFFLE) {
/**
* If type is `HASH_SHUFFLE`, data are hash-shuffled and distributed
to all instances of
@@ -258,8 +254,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
* For example, row 1 get a hash value 1 which means we should
distribute to instance 1 on
* BE 1 and row 2 get a hash value 2 which means we should distribute
to instance 1 on BE 3.
*/
- const auto& map =
local_state->_parent->cast<LocalExchangeSinkOperatorX>()
- ._shuffle_idx_to_instance_idx;
+ DCHECK(shuffle_idx_to_instance_idx &&
shuffle_idx_to_instance_idx->size() > 0);
+ const auto& map = *shuffle_idx_to_instance_idx;
new_block_wrapper->ref(cast_set<int>(map.size()));
for (const auto& it : map) {
DCHECK(it.second >= 0 && it.second < _num_partitions)
@@ -274,13 +270,13 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
}
}
} else {
- DCHECK(!bucket_seq_to_instance_idx.empty());
+ DCHECK(shuffle_idx_to_instance_idx &&
shuffle_idx_to_instance_idx->size() > 0);
new_block_wrapper->ref(_num_partitions);
for (int i = 0; i < _num_partitions; i++) {
uint32_t start = partition_rows_histogram[i];
uint32_t size = partition_rows_histogram[i + 1] - start;
if (size > 0) {
- _enqueue_data_and_set_ready(bucket_seq_to_instance_idx[i],
local_state,
+ _enqueue_data_and_set_ready((*shuffle_idx_to_instance_idx)[i],
local_state,
{new_block_wrapper, {row_idx,
start, size}});
} else {
new_block_wrapper->unref(local_state->_shared_state,
channel_id);
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index d5d53e04170..4aace54e6e3 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -41,6 +41,7 @@ struct SinkInfo {
int* channel_id;
vectorized::PartitionerBase* partitioner;
LocalExchangeSinkLocalState* local_state;
+ std::map<int, int>* shuffle_idx_to_instance_idx;
};
struct SourceInfo {
@@ -262,7 +263,8 @@ public:
protected:
Status _split_rows(RuntimeState* state, const uint32_t* __restrict
channel_ids,
vectorized::Block* block, int channel_id,
- LocalExchangeSinkLocalState* local_state);
+ LocalExchangeSinkLocalState* local_state,
+ std::map<int, int>* shuffle_idx_to_instance_idx);
Status _split_rows(RuntimeState* state, const uint32_t* __restrict
channel_ids,
vectorized::Block* block, int channel_id);
std::vector<std::vector<uint32_t>> _partition_rows_histogram;
diff --git a/be/test/pipeline/local_exchanger_test.cpp
b/be/test/pipeline/local_exchanger_test.cpp
new file mode 100644
index 00000000000..3db2375866c
--- /dev/null
+++ b/be/test/pipeline/local_exchanger_test.cpp
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "common/status.h"
+#include "pipeline/dependency.h"
+#include "pipeline/exec/exchange_source_operator.h"
+#include "pipeline/exec/hashjoin_build_sink.h"
+#include "pipeline/local_exchange/local_exchange_sink_operator.h"
+#include "pipeline/local_exchange/local_exchange_source_operator.h"
+#include "thrift_builder.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_vector.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/exprs/vslot_ref.h"
+
+namespace doris::pipeline {
+
+class LocalExchangerTest : public testing::Test {
+public:
+ LocalExchangerTest() = default;
+ ~LocalExchangerTest() override = default;
+ void SetUp() override {
+ _query_options = TQueryOptionsBuilder()
+ .set_enable_local_exchange(true)
+ .set_enable_local_shuffle(true)
+ .set_runtime_filter_max_in_num(15)
+ .build();
+ auto fe_address = TNetworkAddress();
+ fe_address.hostname = LOCALHOST;
+ fe_address.port = DUMMY_PORT;
+ _query_ctx =
+ QueryContext::create(_query_id, ExecEnv::GetInstance(),
_query_options, fe_address,
+ true, fe_address,
QuerySource::INTERNAL_FRONTEND);
+ _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+ TRuntimeFilterParamsBuilder().build());
+ _runtime_state = RuntimeState::create_unique(_query_id, _fragment_id,
_query_options,
+ _query_ctx->query_globals,
+ ExecEnv::GetInstance(),
_query_ctx.get());
+ }
+ void TearDown() override {}
+
+private:
+ std::unique_ptr<RuntimeState> _runtime_state;
+ TUniqueId _query_id = TUniqueId();
+ int _fragment_id = 0;
+ TQueryOptions _query_options;
+ std::shared_ptr<QueryContext> _query_ctx;
+
+ const std::string LOCALHOST = BackendOptions::get_localhost();
+ const int DUMMY_PORT = config::brpc_port;
+};
+
+TEST_F(LocalExchangerTest, ShuffleExchanger) {
+ int num_sink = 4;
+ int num_sources = 4;
+ int num_partitions = 4;
+ int free_block_limit = 0;
+ std::map<int, int> shuffle_idx_to_instance_idx;
+ for (int i = 0; i < num_partitions; i++) {
+ shuffle_idx_to_instance_idx[i] = i;
+ }
+
+ std::vector<std::pair<std::vector<uint32_t>, int>> hash_vals_and_value;
+ std::vector<std::unique_ptr<LocalExchangeSinkLocalState>>
_sink_local_states;
+ std::vector<std::unique_ptr<LocalExchangeSourceLocalState>> _local_states;
+ _sink_local_states.resize(num_sink);
+ _local_states.resize(num_sources);
+ auto profile = std::make_shared<RuntimeProfile>("");
+ auto shared_state =
LocalExchangeSharedState::create_shared(num_partitions);
+ shared_state->exchanger = ShuffleExchanger::create_unique(num_sink,
num_sources, num_partitions,
+
free_block_limit);
+ auto sink_dep = std::make_shared<Dependency>(0, 0,
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true);
+ sink_dep->set_shared_state(shared_state.get());
+ shared_state->sink_deps.push_back(sink_dep);
+ shared_state->create_dependencies(0);
+
+ auto* exchanger = (ShuffleExchanger*)shared_state->exchanger.get();
+ for (size_t i = 0; i < num_sink; i++) {
+ auto compute_hash_value_timer =
+ ADD_TIMER(profile, "ComputeHashValueTime" + std::to_string(i));
+ auto distribute_timer = ADD_TIMER(profile, "distribute_timer" +
std::to_string(i));
+ _sink_local_states[i].reset(new LocalExchangeSinkLocalState(nullptr,
nullptr));
+ _sink_local_states[i]->_exchanger = shared_state->exchanger.get();
+ _sink_local_states[i]->_compute_hash_value_timer =
compute_hash_value_timer;
+ _sink_local_states[i]->_distribute_timer = distribute_timer;
+ _sink_local_states[i]->_partitioner.reset(
+ new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
+ num_partitions));
+ _sink_local_states[i]->_channel_id = i;
+ _sink_local_states[i]->_shared_state = shared_state.get();
+ _sink_local_states[i]->_dependency = sink_dep.get();
+ }
+ for (size_t i = 0; i < num_sources; i++) {
+ auto get_block_failed_counter =
+ ADD_TIMER(profile, "_get_block_failed_counter" +
std::to_string(i));
+ auto copy_data_timer = ADD_TIMER(profile, "_copy_data_timer" +
std::to_string(i));
+ _local_states[i].reset(new LocalExchangeSourceLocalState(nullptr,
nullptr));
+ _local_states[i]->_exchanger = shared_state->exchanger.get();
+ _local_states[i]->_get_block_failed_counter = get_block_failed_counter;
+ _local_states[i]->_copy_data_timer = copy_data_timer;
+ _local_states[i]->_channel_id = i;
+ _local_states[i]->_shared_state = shared_state.get();
+ _local_states[i]->_dependency =
shared_state->get_dep_by_channel_id(i).front().get();
+ _local_states[i]->_memory_used_counter =
profile->AddHighWaterMarkCounter(
+ "MemoryUsage" + std::to_string(i), TUnit::BYTES, "", 1);
+ shared_state->mem_counters[i] = _local_states[i]->_memory_used_counter;
+ }
+
+ {
+ // Enqueue 2 blocks with 10 rows for each data queue.
+ for (size_t i = 0; i < num_partitions; i++) {
+ hash_vals_and_value.push_back({std::vector<uint32_t> {}, i});
+ for (size_t j = 0; j < 2; j++) {
+ vectorized::Block in_block;
+ vectorized::DataTypePtr int_type =
std::make_shared<vectorized::DataTypeInt32>();
+ auto int_col0 = vectorized::ColumnInt32::create();
+ int_col0->insert_many_vals(hash_vals_and_value.back().second,
10);
+
+ auto pre_size = hash_vals_and_value.back().first.size();
+ hash_vals_and_value.back().first.resize(pre_size + 10);
+ std::fill(hash_vals_and_value.back().first.begin() + pre_size,
+ hash_vals_and_value.back().first.end(), 0);
+
int_col0->update_crcs_with_value(hash_vals_and_value.back().first.data() +
pre_size,
+ PrimitiveType::TYPE_INT,
+
cast_set<uint32_t>(int_col0->size()), 0, nullptr);
+ in_block.insert({std::move(int_col0), int_type,
"test_int_col0"});
+ bool in_eos = false;
+ auto texpr =
+ TExprNodeBuilder(
+ TExprNodeType::SLOT_REF,
+ TTypeDescBuilder()
+ .set_types(TTypeNodeBuilder()
+
.set_type(TTypeNodeType::SCALAR)
+
.set_scalar_type(TPrimitiveType::INT)
+ .build())
+ .build(),
+ 0)
+ .set_slot_ref(TSlotRefBuilder(0, 0).build())
+ .build();
+ auto slot = doris::vectorized::VSlotRef::create_shared(texpr);
+ slot->_column_id = 0;
+
((vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>*)
+ _sink_local_states[i]
+ ->_partitioner.get())
+ ->_partition_expr_ctxs.push_back(
+
std::make_shared<doris::vectorized::VExprContext>(slot));
+ EXPECT_EQ(exchanger->sink(
+ _runtime_state.get(), &in_block, in_eos,
+
{_sink_local_states[i]->_compute_hash_value_timer,
+ _sink_local_states[i]->_distribute_timer,
nullptr},
+ {&_sink_local_states[i]->_channel_id,
+ _sink_local_states[i]->_partitioner.get(),
+ _sink_local_states[i].get(),
&shuffle_idx_to_instance_idx}),
+ Status::OK());
+ EXPECT_EQ(_sink_local_states[i]->_channel_id, i);
+ }
+ }
+ }
+
+ {
+ int64_t mem_usage = 0;
+ for (const auto& it : hash_vals_and_value) {
+ auto channel_id = it.first.back() % num_partitions;
+ EXPECT_GT(shared_state->mem_counters[channel_id]->value(), 0);
+ mem_usage += shared_state->mem_counters[channel_id]->value();
+ EXPECT_EQ(_local_states[channel_id]->_dependency->ready(), true);
+ }
+ EXPECT_EQ(shared_state->mem_usage, mem_usage);
+ // Dequeue from data queue and accumulate rows if rows is smaller than
batch_size.
+ for (const auto& it : hash_vals_and_value) {
+ bool eos = false;
+ auto channel_id = it.first.back() % num_partitions;
+ vectorized::Block block;
+ EXPECT_EQ(exchanger->get_block(
+ _runtime_state.get(), &block, &eos,
+ {nullptr, nullptr,
_local_states[channel_id]->_copy_data_timer},
+
{cast_set<int>(_local_states[channel_id]->_channel_id),
+ _local_states[channel_id].get()}),
+ Status::OK());
+ EXPECT_EQ(block.rows(), 20);
+ EXPECT_EQ(eos, false);
+ EXPECT_EQ(_local_states[channel_id]->_dependency->ready(), false);
+ }
+ EXPECT_EQ(shared_state->mem_usage, 0);
+ }
+ for (size_t i = 0; i < num_sources; i++) {
+ EXPECT_EQ(exchanger->_data_queue[i].eos, false);
+ EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0);
+ }
+ for (size_t i = 0; i < num_sink; i++) {
+ shared_state->sub_running_sink_operators();
+ }
+ for (size_t i = 0; i < num_sources; i++) {
+ bool eos = false;
+ vectorized::Block block;
+ EXPECT_EQ(exchanger->get_block(
+ _runtime_state.get(), &block, &eos,
+ {nullptr, nullptr,
_local_states[i]->_copy_data_timer},
+ {cast_set<int>(_local_states[i]->_channel_id),
_local_states[i].get()}),
+ Status::OK());
+ EXPECT_EQ(block.rows(), 0);
+ EXPECT_EQ(eos, true);
+ EXPECT_EQ(_local_states[i]->_dependency->ready(), true);
+ }
+ for (size_t i = 0; i < num_sources; i++) {
+ exchanger->close({cast_set<int>(i), nullptr});
+ }
+ for (size_t i = 0; i < num_sources; i++) {
+ shared_state->sub_running_source_operators();
+ }
+ for (size_t i = 0; i < num_sources; i++) {
+ EXPECT_EQ(exchanger->_data_queue[i].eos, true);
+ EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0);
+ }
+
+ {
+ // After exchanger closed, data will never push into data queue again.
+ hash_vals_and_value.clear();
+ for (size_t i = 0; i < num_partitions; i++) {
+ hash_vals_and_value.push_back({std::vector<uint32_t> {}, i});
+ vectorized::Block in_block;
+ vectorized::DataTypePtr int_type =
std::make_shared<vectorized::DataTypeInt32>();
+ auto int_col0 = vectorized::ColumnInt32::create();
+ int_col0->insert_many_vals(hash_vals_and_value.back().second, 10);
+
+ auto pre_size = hash_vals_and_value.back().first.size();
+ hash_vals_and_value.back().first.resize(pre_size + 10);
+ std::fill(hash_vals_and_value.back().first.begin() + pre_size,
+ hash_vals_and_value.back().first.end(), 0);
+
int_col0->update_crcs_with_value(hash_vals_and_value.back().first.data() +
pre_size,
+ PrimitiveType::TYPE_INT,
+
cast_set<uint32_t>(int_col0->size()), 0, nullptr);
+ in_block.insert({std::move(int_col0), int_type, "test_int_col0"});
+ bool in_eos = false;
+ auto texpr = TExprNodeBuilder(
+ TExprNodeType::SLOT_REF,
+ TTypeDescBuilder()
+ .set_types(TTypeNodeBuilder()
+
.set_type(TTypeNodeType::SCALAR)
+
.set_scalar_type(TPrimitiveType::INT)
+ .build())
+ .build(),
+ 0)
+ .set_slot_ref(TSlotRefBuilder(0, 0).build())
+ .build();
+ auto slot = doris::vectorized::VSlotRef::create_shared(texpr);
+ slot->_column_id = 0;
+
((vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>*)_sink_local_states[i]
+ ->_partitioner.get())
+ ->_partition_expr_ctxs.push_back(
+
std::make_shared<doris::vectorized::VExprContext>(slot));
+ EXPECT_EQ(exchanger->sink(_runtime_state.get(), &in_block, in_eos,
+
{_sink_local_states[i]->_compute_hash_value_timer,
+
_sink_local_states[i]->_distribute_timer, nullptr},
+ {&_sink_local_states[i]->_channel_id,
+
_sink_local_states[i]->_partitioner.get(),
+ _sink_local_states[i].get(),
&shuffle_idx_to_instance_idx}),
+ Status::OK());
+ EXPECT_EQ(_sink_local_states[i]->_channel_id, i);
+ }
+ for (size_t i = 0; i < num_sources; i++) {
+ EXPECT_EQ(exchanger->_data_queue[i].eos, true);
+ EXPECT_EQ(exchanger->_data_queue[i].data_queue.size_approx(), 0);
+ }
+ }
+}
+
+} // namespace doris::pipeline
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]