This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 815134fd33c [feat]CTE Spill (#44014)
815134fd33c is described below
commit 815134fd33c84004ff5d1824aa8ae96b9c8b31e9
Author: Jerry Hu <[email protected]>
AuthorDate: Sun Nov 17 00:26:09 2024 +0800
[feat]CTE Spill (#44014)
---
be/src/pipeline/dependency.cpp | 6 +-
be/src/pipeline/dependency.h | 10 +-
.../pipeline/exec/multi_cast_data_stream_sink.cpp | 30 ++-
be/src/pipeline/exec/multi_cast_data_stream_sink.h | 14 +-
.../exec/multi_cast_data_stream_source.cpp | 14 +-
.../pipeline/exec/multi_cast_data_stream_source.h | 9 +-
be/src/pipeline/exec/multi_cast_data_streamer.cpp | 275 +++++++++++++++++++--
be/src/pipeline/exec/multi_cast_data_streamer.h | 82 +++++-
.../exec/partitioned_hash_join_sink_operator.cpp | 16 +-
.../exec/partitioned_hash_join_sink_operator.h | 1 +
be/src/vec/spill/spill_reader.cpp | 7 +
be/src/vec/spill/spill_stream.cpp | 8 +
be/src/vec/spill/spill_stream.h | 5 +
13 files changed, 429 insertions(+), 48 deletions(-)
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 9ad0ff1b57f..a7198a97da4 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -392,9 +392,11 @@ void SpillSortSharedState::close() {
}
MultiCastSharedState::MultiCastSharedState(const RowDescriptor& row_desc,
ObjectPool* pool,
- int cast_sender_count)
+ int cast_sender_count, int node_id)
:
multi_cast_data_streamer(std::make_unique<pipeline::MultiCastDataStreamer>(
- row_desc, pool, cast_sender_count, true)) {}
+ row_desc, this, pool, cast_sender_count, node_id, true)) {}
+
+void MultiCastSharedState::update_spill_stream_profiles(RuntimeProfile*
source_profile) {}
int AggSharedState::get_slot_column_id(const vectorized::AggFnEvaluator*
evaluator) {
auto ctxs = evaluator->input_exprs_ctxs();
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index c9a16a88614..1d79331096c 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -570,10 +570,14 @@ public:
class MultiCastDataStreamer;
-struct MultiCastSharedState : public BasicSharedState {
-public:
- MultiCastSharedState(const RowDescriptor& row_desc, ObjectPool* pool, int
cast_sender_count);
+struct MultiCastSharedState : public BasicSharedState,
+ public BasicSpillSharedState,
+ public
std::enable_shared_from_this<MultiCastSharedState> {
+ MultiCastSharedState(const RowDescriptor& row_desc, ObjectPool* pool, int
cast_sender_count,
+ int node_id);
std::unique_ptr<pipeline::MultiCastDataStreamer> multi_cast_data_streamer;
+
+ void update_spill_stream_profiles(RuntimeProfile* source_profile) override;
};
struct BlockRowPos {
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
index eb72e9601e1..4af54ec221c 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
@@ -23,9 +23,9 @@
namespace doris::pipeline {
#include "common/compile_check_begin.h"
std::string MultiCastDataStreamSinkLocalState::name_suffix() {
- auto& sinks =
static_cast<MultiCastDataStreamSinkOperatorX*>(_parent)->sink_node().sinks;
+ const auto& sinks =
static_cast<MultiCastDataStreamSinkOperatorX*>(_parent)->sink_node().sinks;
std::string id_name = " (dst id : ";
- for (auto& sink : sinks) {
+ for (const auto& sink : sinks) {
id_name += std::to_string(sink.dest_node_id) + ",";
}
id_name += ")";
@@ -34,19 +34,39 @@ std::string
MultiCastDataStreamSinkLocalState::name_suffix() {
std::shared_ptr<BasicSharedState>
MultiCastDataStreamSinkOperatorX::create_shared_state() const {
std::shared_ptr<BasicSharedState> ss =
- std::make_shared<MultiCastSharedState>(_row_desc, _pool,
_cast_sender_count);
+ std::make_shared<MultiCastSharedState>(_row_desc, _pool,
_cast_sender_count, _node_id);
ss->id = operator_id();
- for (auto& dest : dests_id()) {
+ for (const auto& dest : dests_id()) {
ss->related_op_ids.insert(dest);
}
return ss;
}
+std::vector<Dependency*> MultiCastDataStreamSinkLocalState::dependencies()
const {
+ auto dependencies = Base::dependencies();
+
dependencies.emplace_back(_shared_state->multi_cast_data_streamer->get_spill_dependency());
+ return dependencies;
+}
+
+Status MultiCastDataStreamSinkLocalState::open(RuntimeState* state) {
+ RETURN_IF_ERROR(Base::open(state));
+ _shared_state->multi_cast_data_streamer->set_sink_profile(profile());
+ _shared_state->setup_shared_profile(profile());
+ _shared_state->multi_cast_data_streamer->set_write_dependency(_dependency);
+ return Status::OK();
+}
+
+std::string MultiCastDataStreamSinkLocalState::debug_string(int
indentation_level) const {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer, "{}, ",
Base::debug_string(indentation_level),
+ _shared_state->multi_cast_data_streamer->debug_string());
+ return fmt::to_string(debug_string_buffer);
+}
+
Status MultiCastDataStreamSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
- COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
if (in_block->rows() > 0 || eos) {
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
auto st =
local_state._shared_state->multi_cast_data_streamer->push(state, in_block, eos);
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
index 57b5974064b..e0c454d8f10 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
@@ -17,22 +17,32 @@
#pragma once
+#include <vector>
+
+#include "common/status.h"
#include "operator.h"
+#include "pipeline/exec/data_queue.h"
namespace doris::pipeline {
class MultiCastDataStreamSinkOperatorX;
class MultiCastDataStreamSinkLocalState final
- : public PipelineXSinkLocalState<MultiCastSharedState> {
+ : public PipelineXSpillSinkLocalState<MultiCastSharedState> {
ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState);
MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
: Base(parent, state) {}
friend class MultiCastDataStreamSinkOperatorX;
friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
- using Base = PipelineXSinkLocalState<MultiCastSharedState>;
+ using Base = PipelineXSpillSinkLocalState<MultiCastSharedState>;
using Parent = MultiCastDataStreamSinkOperatorX;
std::string name_suffix() override;
+ Status open(RuntimeState* state) override;
+
+ std::vector<Dependency*> dependencies() const override;
+
+ std::string debug_string(int indentation_level) const override;
+
private:
std::shared_ptr<pipeline::MultiCastDataStreamer> _multi_cast_data_streamer;
};
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index e45e59d17e2..61adfed7573 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -38,6 +38,8 @@ Status
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<Parent>();
+ _shared_state->multi_cast_data_streamer->set_source_profile(p._consumer_id,
+
_runtime_profile.get());
_shared_state->multi_cast_data_streamer->set_dep_by_sender_idx(p._consumer_id,
_dependency);
_wait_for_rf_timer = ADD_TIMER(_runtime_profile, "WaitForRuntimeFilter");
_filter_timer = ADD_TIMER(_runtime_profile, "FilterTime");
@@ -50,6 +52,14 @@ Status
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
return Status::OK();
}
+std::vector<Dependency*> MultiCastDataStreamSourceLocalState::dependencies()
const {
+ auto dependencies = Base::dependencies();
+ auto& p = _parent->cast<Parent>();
+ dependencies.emplace_back(
+
_shared_state->multi_cast_data_streamer->get_spill_read_dependency(p._consumer_id));
+ return dependencies;
+}
+
Status MultiCastDataStreamSourceLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
@@ -92,9 +102,9 @@ Status
MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
{
SCOPED_TIMER(local_state._get_data_timer);
RETURN_IF_ERROR(local_state._shared_state->multi_cast_data_streamer->pull(
- _consumer_id, output_block, eos));
+ state, _consumer_id, output_block, eos));
}
- if (!local_state._conjuncts.empty()) {
+ if (!local_state._conjuncts.empty() && !output_block->empty()) {
SCOPED_TIMER(local_state._filter_timer);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
output_block,
output_block->columns()));
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 57410bf8d95..c1af8c5b21c 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -37,11 +37,12 @@ namespace pipeline {
class MultiCastDataStreamer;
class MultiCastDataStreamerSourceOperatorX;
-class MultiCastDataStreamSourceLocalState final : public
PipelineXLocalState<MultiCastSharedState>,
- public RuntimeFilterConsumer
{
+class MultiCastDataStreamSourceLocalState final
+ : public PipelineXSpillLocalState<MultiCastSharedState>,
+ public RuntimeFilterConsumer {
public:
ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState);
- using Base = PipelineXLocalState<MultiCastSharedState>;
+ using Base = PipelineXSpillLocalState<MultiCastSharedState>;
using Parent = MultiCastDataStreamerSourceOperatorX;
MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase*
parent);
Status init(RuntimeState* state, LocalStateInfo& info) override;
@@ -62,6 +63,8 @@ public:
return res;
}
+ std::vector<Dependency*> dependencies() const override;
+
private:
friend class MultiCastDataStreamerSourceOperatorX;
vectorized::VExprContextSPtrs _output_expr_contexts;
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index 3e629093e23..f1e399a3289 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -17,9 +17,26 @@
#include "multi_cast_data_streamer.h"
+#include <fmt/format.h>
+#include <glog/logging.h>
+
+#include <iterator>
+#include <memory>
+#include <vector>
+
+#include "common/config.h"
+#include "common/exception.h"
+#include "common/logging.h"
+#include "common/status.h"
#include "pipeline/dependency.h"
#include "pipeline/exec/multi_cast_data_stream_source.h"
+#include "pipeline/exec/spill_utils.h"
+#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
+#include "util/pretty_printer.h"
+#include "util/uid_util.h"
+#include "vec/core/block.h"
+#include "vec/spill/spill_stream_manager.h"
namespace doris::pipeline {
#include "common/compile_check_begin.h"
@@ -30,37 +47,115 @@ MultiCastBlock::MultiCastBlock(vectorized::Block* block,
int used_count, int un_
block->clear();
}
-Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block*
block, bool* eos) {
+Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx,
vectorized::Block* block,
+ bool* eos) {
int* un_finish_copy = nullptr;
int use_count = 0;
+ size_t mem_size = 0;
+ bool spilled = false;
{
std::lock_guard l(_mutex);
+
+ if (!_cached_blocks[sender_idx].empty()) {
+ *block = std::move(_cached_blocks[sender_idx].front());
+
_cached_blocks[sender_idx].erase(_cached_blocks[sender_idx].begin());
+ return Status::OK();
+ }
+
+ for (auto it = _spill_readers[sender_idx].begin();
+ it != _spill_readers[sender_idx].end();) {
+ if ((*it)->all_data_read) {
+ it = _spill_readers[sender_idx].erase(it);
+ } else {
+ it++;
+ }
+ }
+
+ if (!_spill_readers[sender_idx].empty()) {
+ auto reader_item = _spill_readers[sender_idx].front();
+ if (!reader_item->stream->ready_for_reading()) {
+ return Status::OK();
+ }
+
+ auto& reader = reader_item->reader;
+ RETURN_IF_ERROR(reader->open());
+ if (reader_item->block_offset != 0) {
+ reader->seek(reader_item->block_offset);
+ reader_item->block_offset = 0;
+ }
+
+ auto spill_func = [this, reader_item, sender_idx]() {
+ vectorized::Block block;
+ bool spill_eos = false;
+ size_t read_size = 0;
+ while (!spill_eos) {
+ RETURN_IF_ERROR(reader_item->reader->read(&block,
&spill_eos));
+ if (!block.empty()) {
+ std::lock_guard l(_mutex);
+ read_size += block.allocated_bytes();
+
_cached_blocks[sender_idx].emplace_back(std::move(block));
+ if (_cached_blocks[sender_idx].size() >= 32 ||
+ read_size > 2 * 1024 * 1024) {
+ break;
+ }
+ }
+ }
+
+ if (spill_eos || !_cached_blocks[sender_idx].empty()) {
+ reader_item->all_data_read = spill_eos;
+ _set_ready_for_read(sender_idx);
+ }
+ return Status::OK();
+ };
+
+ auto catch_exception_func = [spill_func = std::move(spill_func)]()
{
+ RETURN_IF_CATCH_EXCEPTION(return spill_func(););
+ };
+
+ _spill_read_dependencies[sender_idx]->block();
+ auto spill_runnable = std::make_shared<SpillRecoverRunnable>(
+ state, _spill_read_dependencies[sender_idx],
_source_profiles[sender_idx],
+ _shared_state->shared_from_this(), catch_exception_func);
+ auto* thread_pool =
+
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
+ RETURN_IF_ERROR(thread_pool->submit(std::move(spill_runnable)));
+ return Status::OK();
+ }
+
auto& pos_to_pull = _sender_pos_to_read[sender_idx];
const auto end = _multi_cast_blocks.end();
- DCHECK(pos_to_pull != end);
+ if (pos_to_pull == end) {
+ _block_reading(sender_idx);
+ VLOG_DEBUG << "query: " << print_id(state->query_id())
+ << ", pos_to_pull end: " << (void*)(_write_dependency);
+ *eos = _eos;
+ return Status::OK();
+ }
*block = *pos_to_pull->_block;
- _cumulative_mem_size -= pos_to_pull->_mem_size;
-
pos_to_pull->_used_count--;
use_count = pos_to_pull->_used_count;
+ mem_size = pos_to_pull->_mem_size;
un_finish_copy = &pos_to_pull->_un_finish_copy;
pos_to_pull++;
if (pos_to_pull == end) {
_block_reading(sender_idx);
+ *eos = _eos;
+ RETURN_IF_ERROR(_trigger_spill_if_need(state, &spilled));
}
- *eos = _eos and pos_to_pull == end;
- }
-
- if (use_count == 0) {
- // will clear _multi_cast_blocks
- _wait_copy_block(block, *un_finish_copy);
- } else {
- _copy_block(block, *un_finish_copy);
+ if (use_count == 0) {
+ _cumulative_mem_size.fetch_sub(mem_size);
+ _multi_cast_blocks.pop_front();
+ _write_dependency->set_ready();
+ VLOG_DEBUG << "**** query: " << print_id(state->query_id())
+ << ", set ready: " << (void*)(_write_dependency);
+ } else {
+ _copy_block(block, *un_finish_copy);
+ }
}
return Status::OK();
@@ -71,13 +166,6 @@ void MultiCastDataStreamer::_copy_block(vectorized::Block*
block, int& un_finish
for (int i = 0; i < block->columns(); ++i) {
block->get_by_position(i).column =
block->get_by_position(i).column->clone_resized(rows);
}
-
- std::unique_lock l(_mutex);
- un_finish_copy--;
- if (un_finish_copy == 0) {
- l.unlock();
- _cv.notify_one();
- }
}
void MultiCastDataStreamer::_wait_copy_block(vectorized::Block* block, int&
un_finish_copy) {
@@ -86,16 +174,153 @@ void
MultiCastDataStreamer::_wait_copy_block(vectorized::Block* block, int& un_f
_multi_cast_blocks.pop_front();
}
+Status MultiCastDataStreamer::_trigger_spill_if_need(RuntimeState* state,
bool* triggered) {
+ vectorized::SpillStreamSPtr spill_stream;
+ *triggered = false;
+ if (_cumulative_mem_size.load() >= config::exchg_node_buffer_size_bytes &&
+ _multi_cast_blocks.size() >= 4) {
+ _write_dependency->block();
+
+ bool has_reached_end = false;
+ std::vector<int64_t> distances(_cast_sender_count);
+ size_t total_count = _multi_cast_blocks.size();
+ for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
+ distances[i] = std::distance(_multi_cast_blocks.begin(),
_sender_pos_to_read[i]);
+ if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
+ has_reached_end = true;
+ CHECK_EQ(distances[i], total_count);
+ }
+
+ if (!_spill_readers[i].empty()) {
+ CHECK_EQ(distances[i], 0);
+ }
+ }
+
+ if (has_reached_end) {
+
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+ state, spill_stream, print_id(state->query_id()),
"MultiCastSender", _node_id,
+ std::numeric_limits<int32_t>::max(),
std::numeric_limits<size_t>::max(),
+ _sink_profile));
+ for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
+ if (distances[i] < total_count) {
+ auto reader = spill_stream->create_separate_reader();
+ reader->set_counters(_source_profiles[i]);
+ auto reader_item = std::make_shared<SpillingReader>(
+ std::move(reader), spill_stream, distances[i],
false);
+ _spill_readers[i].emplace_back(std::move(reader_item));
+ }
+
+ _block_reading(i);
+ }
+
+ RETURN_IF_ERROR(_submit_spill_task(state, spill_stream));
+ DCHECK_EQ(_multi_cast_blocks.size(), 0);
+
+ for (auto& pos : _sender_pos_to_read) {
+ pos = _multi_cast_blocks.end();
+ }
+ _cumulative_mem_size = 0;
+ *triggered = true;
+ }
+ }
+
+ return Status::OK();
+}
+
+Status MultiCastDataStreamer::_submit_spill_task(RuntimeState* state,
+ vectorized::SpillStreamSPtr
spill_stream) {
+ std::vector<vectorized::Block> blocks;
+ for (auto& block : _multi_cast_blocks) {
+ blocks.emplace_back(std::move(*block._block));
+ }
+
+ _multi_cast_blocks.clear();
+
+ auto spill_func = [state, blocks = std::move(blocks),
+ spill_stream = std::move(spill_stream)]() mutable {
+ const auto blocks_count = blocks.size();
+ while (!blocks.empty() && !state->is_cancelled()) {
+ auto block = std::move(blocks.front());
+ blocks.erase(blocks.begin());
+
+ RETURN_IF_ERROR(spill_stream->spill_block(state, block, false));
+ }
+ VLOG_DEBUG << "query: " << print_id(state->query_id()) << " multi cast
write "
+ << blocks_count << " blocks";
+ return spill_stream->spill_eof();
+ };
+
+ auto exception_catch_func = [spill_func = std::move(spill_func),
+ query_id = print_id(state->query_id()),
this]() mutable {
+ auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return spill_func());
}();
+ _write_dependency->set_ready();
+
+ if (!status.ok()) {
+ LOG(WARNING) << "query: " << query_id
+ << " multi cast write failed: " << status.to_string()
+ << ", dependency: " << (void*)_spill_dependency.get();
+ } else {
+ for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
+ _set_ready_for_read(i);
+ }
+ }
+ return status;
+ };
+
+ auto spill_runnable = std::make_shared<SpillSinkRunnable>(
+ state, nullptr, _spill_dependency, _sink_profile,
_shared_state->shared_from_this(),
+ exception_catch_func);
+
+ _spill_dependency->block();
+
+ auto* thread_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
+ return thread_pool->submit(std::move(spill_runnable));
+}
+
Status MultiCastDataStreamer::push(RuntimeState* state,
doris::vectorized::Block* block, bool eos) {
auto rows = block->rows();
COUNTER_UPDATE(_process_rows, rows);
const auto block_mem_size = block->allocated_bytes();
- _cumulative_mem_size += block_mem_size;
- COUNTER_SET(_peak_mem_usage, std::max(_cumulative_mem_size,
_peak_mem_usage->value()));
+
+ if (!_shared_state->_spill_status.ok()) {
+ return _shared_state->_spill_status.status();
+ }
{
std::lock_guard l(_mutex);
+
+ if (_pending_block) {
+ const auto pending_size = _pending_block->allocated_bytes();
+ _cumulative_mem_size += pending_size;
+ _multi_cast_blocks.emplace_back(_pending_block.get(),
_cast_sender_count,
+ _cast_sender_count - 1,
pending_size);
+ _pending_block.reset();
+
+ auto end = std::prev(_multi_cast_blocks.end());
+ for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
+ if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
+ _sender_pos_to_read[i] = end;
+ _set_ready_for_read(i);
+ }
+ }
+ }
+
+ _cumulative_mem_size += block_mem_size;
+ COUNTER_SET(_peak_mem_usage,
+ std::max(_cumulative_mem_size.load(),
_peak_mem_usage->value()));
+
+ if (!eos) {
+ bool spilled = false;
+ RETURN_IF_ERROR(_trigger_spill_if_need(state, &spilled));
+ if (spilled) {
+ _pending_block =
+
vectorized::Block::create_unique(block->get_columns_with_type_and_name());
+ block->clear();
+ return Status::OK();
+ }
+ }
+
_multi_cast_blocks.emplace_back(block, _cast_sender_count,
_cast_sender_count - 1,
block_mem_size);
// last elem
@@ -106,6 +331,7 @@ Status MultiCastDataStreamer::push(RuntimeState* state,
doris::vectorized::Block
_set_ready_for_read(i);
}
}
+
_eos = eos;
}
@@ -135,4 +361,11 @@ void MultiCastDataStreamer::_block_reading(int sender_idx)
{
dep->block();
}
+std::string MultiCastDataStreamer::debug_string() const {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer, "MemSize: {}",
+ PrettyPrinter::print_bytes(_cumulative_mem_size));
+ return fmt::to_string(debug_string_buffer);
+}
+
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 07e64016363..079acb9c81f 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -17,11 +17,22 @@
#pragma once
+#include <atomic>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "pipeline/dependency.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
#include "vec/sink/vdata_stream_sender.h"
+#include "vec/spill/spill_stream.h"
namespace doris::pipeline {
class Dependency;
+struct MultiCastSharedState;
+
struct MultiCastBlock {
MultiCastBlock(vectorized::Block* block, int used_count, int need_copy,
size_t mem_size);
@@ -31,30 +42,53 @@ struct MultiCastBlock {
size_t _mem_size;
};
+struct SpillingReader {
+ vectorized::SpillReaderUPtr reader;
+ vectorized::SpillStreamSPtr stream;
+ int64_t block_offset {0};
+ bool all_data_read {false};
+};
+
// TDOD: MultiCastDataStreamer same as the data queue, maybe rethink union and
refactor the
// code
class MultiCastDataStreamer {
public:
- MultiCastDataStreamer(const RowDescriptor& row_desc, ObjectPool* pool, int
cast_sender_count,
+ MultiCastDataStreamer(const RowDescriptor& row_desc, MultiCastSharedState*
shared_state,
+ ObjectPool* pool, int cast_sender_count, int32_t
node_id,
bool with_dependencies = false)
: _row_desc(row_desc),
+ _shared_state(shared_state),
_profile(pool->add(new
RuntimeProfile("MultiCastDataStreamSink"))),
- _cast_sender_count(cast_sender_count) {
+ _cached_blocks(cast_sender_count),
+ _cast_sender_count(cast_sender_count),
+ _node_id(node_id),
+ _spill_readers(cast_sender_count),
+ _source_profiles(cast_sender_count) {
_sender_pos_to_read.resize(cast_sender_count,
_multi_cast_blocks.end());
if (with_dependencies) {
_dependencies.resize(cast_sender_count, nullptr);
}
+ _spill_dependency = Dependency::create_shared(_node_id, _node_id,
+
"MultiCastDataStreamerDependency", true);
+
+ for (int i = 0; i != cast_sender_count; ++i) {
+ _spill_read_dependencies.emplace_back(Dependency::create_shared(
+ node_id, node_id, "MultiCastReadSpillDependency", true));
+ }
_peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES);
_process_rows = ADD_COUNTER(profile(), "ProcessRows", TUnit::UNIT);
};
- ~MultiCastDataStreamer() = default;
+ ~MultiCastDataStreamer() {
+ for (auto& item : _spill_readers) {
+ DCHECK(item.empty());
+ }
+ }
- Status pull(int sender_idx, vectorized::Block* block, bool* eos);
+ Status pull(RuntimeState* state, int sender_idx, vectorized::Block* block,
bool* eos);
Status push(RuntimeState* state, vectorized::Block* block, bool eos);
-
const RowDescriptor& row_desc() { return _row_desc; }
RuntimeProfile* profile() { return _profile; }
@@ -64,6 +98,22 @@ public:
_block_reading(sender_idx);
}
+ void set_write_dependency(Dependency* dependency) { _write_dependency =
dependency; }
+
+ Dependency* get_spill_dependency() const { return _spill_dependency.get();
}
+
+ Dependency* get_spill_read_dependency(int sender_idx) const {
+ return _spill_read_dependencies[sender_idx].get();
+ }
+
+ void set_sink_profile(RuntimeProfile* profile) { _sink_profile = profile; }
+
+ void set_source_profile(int sender_idx, RuntimeProfile* profile) {
+ _source_profiles[sender_idx] = profile;
+ }
+
+ std::string debug_string() const;
+
private:
void _set_ready_for_read(int sender_idx);
void _block_reading(int sender_idx);
@@ -72,19 +122,37 @@ private:
void _wait_copy_block(vectorized::Block* block, int& un_finish_copy);
+ Status _submit_spill_task(RuntimeState* state, vectorized::SpillStreamSPtr
spill_stream);
+
+ Status _trigger_spill_if_need(RuntimeState* state, bool* triggered);
+
const RowDescriptor& _row_desc;
+ MultiCastSharedState* _shared_state;
RuntimeProfile* _profile = nullptr;
std::list<MultiCastBlock> _multi_cast_blocks;
+ std::list<MultiCastBlock> _spilling_blocks;
+ std::vector<std::vector<vectorized::Block>> _cached_blocks;
std::vector<std::list<MultiCastBlock>::iterator> _sender_pos_to_read;
std::condition_variable _cv;
std::mutex _mutex;
bool _eos = false;
int _cast_sender_count = 0;
- int64_t _cumulative_mem_size = 0;
-
+ int _node_id;
+ std::atomic_int64_t _cumulative_mem_size = 0;
RuntimeProfile::Counter* _process_rows = nullptr;
RuntimeProfile::Counter* _peak_mem_usage = nullptr;
+ Dependency* _write_dependency;
std::vector<Dependency*> _dependencies;
+ std::shared_ptr<Dependency> _spill_dependency;
+
+ vectorized::BlockUPtr _pending_block;
+
+ std::vector<std::vector<std::shared_ptr<SpillingReader>>> _spill_readers;
+
+ std::vector<std::shared_ptr<Dependency>> _spill_read_dependencies;
+
+ RuntimeProfile* _sink_profile;
+ std::vector<RuntimeProfile*> _source_profiles;
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index a8c2b495365..6f2f7c8bc15 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -21,6 +21,7 @@
#include <algorithm>
#include <memory>
+#include <mutex>
#include "common/logging.h"
#include "pipeline/exec/operator.h"
@@ -332,9 +333,12 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
if (spill_context) {
spill_context->on_task_finished();
}
+
+ std::lock_guard<std::mutex> lock(_spill_mutex);
_spill_dependency->set_ready();
return status;
};
+
for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size();
++i) {
vectorized::SpillStreamSPtr& spilling_stream =
_shared_state->spilled_streams[i];
auto& mutable_block = _shared_state->partitioned_build_blocks[i];
@@ -390,9 +394,15 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
}
}
- if (_spilling_task_count > 0) {
- _spill_dependency->block();
- } else if (_child_eos) {
+ if (_spilling_task_count.load() > 0) {
+ std::lock_guard<std::mutex> lock(_spill_mutex);
+ if (_spilling_task_count.load() > 0) {
+ _spill_dependency->block();
+ return Status::OK();
+ }
+ }
+
+ if (_child_eos) {
VLOG_DEBUG << "query:" << print_id(state->query_id()) << ", hash join
sink "
<< _parent->node_id() << " set_ready_to_read"
<< ", task id: " << state->task_id();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 58b19004f33..374055a838f 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -68,6 +68,7 @@ protected:
friend class PartitionedHashJoinSinkOperatorX;
+ std::mutex _spill_mutex;
std::atomic<bool> _spilling_finished {false};
std::atomic_int32_t _spilling_task_count {0};
diff --git a/be/src/vec/spill/spill_reader.cpp
b/be/src/vec/spill/spill_reader.cpp
index c947081fcaf..014b83be23d 100644
--- a/be/src/vec/spill/spill_reader.cpp
+++ b/be/src/vec/spill/spill_reader.cpp
@@ -17,6 +17,8 @@
#include "vec/spill/spill_reader.h"
+#include <glog/logging.h>
+
#include <algorithm>
#include "common/cast_set.h"
@@ -99,6 +101,11 @@ Status SpillReader::open() {
return Status::OK();
}
+void SpillReader::seek(size_t block_index) {
+ DCHECK_LT(block_index, block_count_);
+ read_block_index_ = block_index;
+}
+
Status SpillReader::read(Block* block, bool* eos) {
DCHECK(file_reader_);
block->clear_column_data();
diff --git a/be/src/vec/spill/spill_stream.cpp
b/be/src/vec/spill/spill_stream.cpp
index 3e5b93a21d7..a27916a87a3 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -115,6 +115,10 @@ Status SpillStream::prepare() {
return writer_->open();
}
+SpillReaderUPtr SpillStream::create_separate_reader() const {
+ return std::make_unique<SpillReader>(stream_id_, writer_->get_file_path());
+}
+
const TUniqueId& SpillStream::query_id() const {
return query_id_;
}
@@ -144,6 +148,10 @@ Status SpillStream::spill_eof() {
auto status = writer_->close();
total_written_bytes_ = writer_->get_written_bytes();
writer_.reset();
+
+ if (status.ok()) {
+ _ready_for_reading = true;
+ }
return status;
}
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index 9682130aad0..525abbb7855 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -67,8 +67,12 @@ public:
void update_shared_profiles(RuntimeProfile* source_op_profile);
+ SpillReaderUPtr create_separate_reader() const;
+
const TUniqueId& query_id() const;
+ bool ready_for_reading() const { return _ready_for_reading; }
+
private:
friend class SpillStreamManager;
@@ -86,6 +90,7 @@ private:
size_t batch_bytes_;
int64_t total_written_bytes_ = 0;
+ std::atomic_bool _ready_for_reading = false;
std::atomic_bool _is_reading = false;
SpillWriterUPtr writer_;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]