This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e29d8cb1108 [feature](move-memtable) support pipelineX in sink v2
(#27067)
e29d8cb1108 is described below
commit e29d8cb110849637fc7beca5b5d76b103b9c95bf
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Nov 16 15:00:55 2023 +0800
[feature](move-memtable) support pipelineX in sink v2 (#27067)
---
be/src/exec/data_sink.cpp | 8 +-
..._operator.h => olap_table_sink_v2_operator.cpp} | 51 ++-
be/src/pipeline/exec/olap_table_sink_v2_operator.h | 74 ++-
be/src/pipeline/pipeline_x/operator.cpp | 3 +
.../pipeline_x/pipeline_x_fragment_context.cpp | 30 +-
.../pipeline_x/pipeline_x_fragment_context.h | 3 +
be/src/vec/sink/vtablet_sink_v2.cpp | 505 +--------------------
be/src/vec/sink/vtablet_sink_v2.h | 178 +-------
.../vtablet_writer_v2.cpp} | 129 +++---
.../vtablet_writer_v2.h} | 37 +-
10 files changed, 228 insertions(+), 790 deletions(-)
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index f849dc84a25..970e7a3a18a 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -146,15 +146,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
- Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
if (state->query_options().enable_memtable_on_sink_node &&
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
- sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc,
output_exprs, &status));
+ sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc,
output_exprs, false));
} else {
sink->reset(new vectorized::VOlapTableSink(pool, row_desc,
output_exprs, false));
}
- RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: {
@@ -301,15 +299,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
- Status status = Status::OK();
DCHECK(thrift_sink.__isset.olap_table_sink);
if (state->query_options().enable_memtable_on_sink_node &&
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
- sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc,
output_exprs, &status));
+ sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc,
output_exprs, false));
} else {
sink->reset(new vectorized::VOlapTableSink(pool, row_desc,
output_exprs, false));
}
- RETURN_IF_ERROR(status);
break;
}
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
b/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
similarity index 53%
copy from be/src/pipeline/exec/olap_table_sink_v2_operator.h
copy to be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
index f280e856f0c..99efc1d752e 100644
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
@@ -15,35 +15,36 @@
// specific language governing permissions and limitations
// under the License.
-#pragma once
+#include "olap_table_sink_v2_operator.h"
-#include "operator.h"
-#include "vec/sink/vtablet_sink_v2.h"
+#include "common/status.h"
-namespace doris {
-
-namespace pipeline {
-
-class OlapTableSinkV2OperatorBuilder final
- : public DataSinkOperatorBuilder<vectorized::VOlapTableSinkV2> {
-public:
- OlapTableSinkV2OperatorBuilder(int32_t id, DataSink* sink)
- : DataSinkOperatorBuilder(id, "OlapTableSinkV2Operator", sink) {}
-
- OperatorPtr build_operator() override;
-};
-
-class OlapTableSinkV2Operator final : public
DataSinkOperator<OlapTableSinkV2OperatorBuilder> {
-public:
- OlapTableSinkV2Operator(OperatorBuilderBase* operator_builder, DataSink*
sink)
- : DataSinkOperator(operator_builder, sink) {}
-
- bool can_write() override { return true; } // TODO: need use mem_limit
-};
+namespace doris::pipeline {
OperatorPtr OlapTableSinkV2OperatorBuilder::build_operator() {
return std::make_shared<OlapTableSinkV2Operator>(this, _sink);
}
-} // namespace pipeline
-} // namespace doris
\ No newline at end of file
+Status OlapTableSinkV2LocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
+ RETURN_IF_ERROR(Base::init(state, info));
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_open_timer);
+ auto& p = _parent->cast<Parent>();
+ RETURN_IF_ERROR(_writer->init_properties(p._pool, p._group_commit));
+ return Status::OK();
+}
+
+Status OlapTableSinkV2LocalState::close(RuntimeState* state, Status
exec_status) {
+ if (Base::_closed) {
+ return Status::OK();
+ }
+ SCOPED_TIMER(_close_timer);
+ SCOPED_TIMER(exec_time_counter());
+ if (_closed) {
+ return _close_status;
+ }
+ _close_status = Base::close(state, exec_status);
+ return _close_status;
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
index f280e856f0c..5fb8f64dd31 100644
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
@@ -18,6 +18,7 @@
#pragma once
#include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
#include "vec/sink/vtablet_sink_v2.h"
namespace doris {
@@ -41,9 +42,76 @@ public:
bool can_write() override { return true; } // TODO: need use mem_limit
};
-OperatorPtr OlapTableSinkV2OperatorBuilder::build_operator() {
- return std::make_shared<OlapTableSinkV2Operator>(this, _sink);
-}
+class OlapTableSinkV2OperatorX;
+
+class OlapTableSinkV2LocalState final
+ : public AsyncWriterSink<vectorized::VTabletWriterV2,
OlapTableSinkV2OperatorX> {
+public:
+ using Base = AsyncWriterSink<vectorized::VTabletWriterV2,
OlapTableSinkV2OperatorX>;
+ using Parent = OlapTableSinkV2OperatorX;
+ ENABLE_FACTORY_CREATOR(OlapTableSinkV2LocalState);
+ OlapTableSinkV2LocalState(DataSinkOperatorXBase* parent, RuntimeState*
state)
+ : Base(parent, state) {};
+ Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+ Status open(RuntimeState* state) override {
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_open_timer);
+ return Base::open(state);
+ }
+
+ Status close(RuntimeState* state, Status exec_status) override;
+ friend class OlapTableSinkV2OperatorX;
+
+private:
+ Status _close_status = Status::OK();
+};
+
+class OlapTableSinkV2OperatorX final : public
DataSinkOperatorX<OlapTableSinkV2LocalState> {
+public:
+ using Base = DataSinkOperatorX<OlapTableSinkV2LocalState>;
+ OlapTableSinkV2OperatorX(ObjectPool* pool, int operator_id, const
RowDescriptor& row_desc,
+ const std::vector<TExpr>& t_output_expr, bool
group_commit)
+ : Base(operator_id, 0),
+ _row_desc(row_desc),
+ _t_output_expr(t_output_expr),
+ _group_commit(group_commit),
+ _pool(pool) {};
+
+ Status init(const TDataSink& thrift_sink) override {
+ RETURN_IF_ERROR(Base::init(thrift_sink));
+ // From the thrift expressions create the real exprs.
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr,
_output_vexpr_ctxs));
+ return Status::OK();
+ }
+
+ Status prepare(RuntimeState* state) override {
+ RETURN_IF_ERROR(Base::prepare(state));
+ return vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_row_desc);
+ }
+
+ Status open(RuntimeState* state) override {
+ RETURN_IF_ERROR(Base::open(state));
+ return vectorized::VExpr::open(_output_vexpr_ctxs, state);
+ }
+
+ Status sink(RuntimeState* state, vectorized::Block* in_block,
+ SourceState source_state) override {
+ auto& local_state = get_local_state(state);
+ SCOPED_TIMER(local_state.exec_time_counter());
+ COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
+ return local_state.sink(state, in_block, source_state);
+ }
+
+private:
+ friend class OlapTableSinkV2LocalState;
+ template <typename Writer, typename Parent>
+ friend class AsyncWriterSink;
+ const RowDescriptor& _row_desc;
+ vectorized::VExprContextSPtrs _output_vexpr_ctxs;
+ const std::vector<TExpr>& _t_output_expr;
+ const bool _group_commit;
+ ObjectPool* _pool;
+};
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index f8430a57159..9e6df06da01 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -46,6 +46,7 @@
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/olap_table_sink_operator.h"
+#include "pipeline/exec/olap_table_sink_v2_operator.h"
#include "pipeline/exec/partition_sort_sink_operator.h"
#include "pipeline/exec/partition_sort_source_operator.h"
#include "pipeline/exec/repeat_operator.h"
@@ -544,6 +545,7 @@ DECLARE_OPERATOR_X(ResultSinkLocalState)
DECLARE_OPERATOR_X(JdbcTableSinkLocalState)
DECLARE_OPERATOR_X(ResultFileSinkLocalState)
DECLARE_OPERATOR_X(OlapTableSinkLocalState)
+DECLARE_OPERATOR_X(OlapTableSinkV2LocalState)
DECLARE_OPERATOR_X(AnalyticSinkLocalState)
DECLARE_OPERATOR_X(SortSinkLocalState)
DECLARE_OPERATOR_X(LocalExchangeSinkLocalState)
@@ -624,5 +626,6 @@ template class PipelineXLocalState<LocalExchangeDependency>;
template class AsyncWriterSink<doris::vectorized::VFileResultWriter,
ResultFileSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter,
JdbcTableSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VTabletWriter,
OlapTableSinkOperatorX>;
+template class AsyncWriterSink<doris::vectorized::VTabletWriterV2,
OlapTableSinkV2OperatorX>;
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 200ac23504e..7113989ee1e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -62,6 +62,7 @@
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/olap_table_sink_operator.h"
+#include "pipeline/exec/olap_table_sink_v2_operator.h"
#include "pipeline/exec/partition_sort_sink_operator.h"
#include "pipeline/exec/partition_sort_source_operator.h"
#include "pipeline/exec/repeat_operator.h"
@@ -268,9 +269,10 @@ Status
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
- if (state->query_options().enable_memtable_on_sink_node) {
- return Status::InternalError(
- "Unsuported OLAP_TABLE_SINK with
enable_memtable_on_sink_node ");
+ if (state->query_options().enable_memtable_on_sink_node &&
+
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
+ _sink.reset(new OlapTableSinkV2OperatorX(pool, next_operator_id(),
row_desc,
+ output_exprs, false));
} else {
_sink.reset(new OlapTableSinkOperatorX(pool, next_operator_id(),
row_desc, output_exprs,
false));
@@ -412,6 +414,9 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id);
_runtime_states[i]->set_num_per_fragment_instances(request.num_senders);
_runtime_states[i]->resize_op_id_to_local_state(max_operator_id());
+
_runtime_states[i]->set_load_stream_per_node(request.load_stream_per_node);
+ _runtime_states[i]->set_total_load_streams(request.total_load_streams);
+ _runtime_states[i]->set_num_local_sink(request.num_local_sink);
std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
auto task = std::make_unique<PipelineXTask>(
@@ -1005,4 +1010,23 @@ Status PipelineXFragmentContext::send_report(bool done) {
std::placeholders::_2)},
shared_from_this());
}
+
+bool
PipelineXFragmentContext::_has_inverted_index_or_partial_update(TOlapTableSink
sink) {
+ OlapTableSchemaParam schema;
+ if (!schema.init(sink.schema).ok()) {
+ return false;
+ }
+ if (schema.is_partial_update()) {
+ return true;
+ }
+ for (const auto& index_schema : schema.indexes()) {
+ for (const auto& index : index_schema->indexes) {
+ if (index->index_type() == INVERTED) {
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 4d2a59277e9..6fa91aedf12 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -145,6 +145,9 @@ private:
const TPipelineFragmentParams& params, const
RowDescriptor& row_desc,
RuntimeState* state, DescriptorTbl& desc_tbl,
PipelineId cur_pipeline_id);
+
+ bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
+
OperatorXPtr _root_op = nullptr;
// this is a [n * m] matrix. n is parallelism of pipeline engine and m is
the number of pipelines.
std::vector<std::vector<std::unique_ptr<PipelineXTask>>> _tasks;
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp
b/be/src/vec/sink/vtablet_sink_v2.cpp
index ac9be0e7fb7..9385bd93202 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -17,47 +17,22 @@
#include "vec/sink/vtablet_sink_v2.h"
-#include <brpc/uri.h>
-#include <bthread/bthread.h>
-#include <fmt/format.h>
#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/Descriptors_types.h>
-#include <gen_cpp/Metrics_types.h>
-#include <gen_cpp/Types_types.h>
-#include <gen_cpp/internal_service.pb.h>
-#include <algorithm>
-#include <execution>
-#include <mutex>
#include <ranges>
-#include <string>
#include <unordered_map>
#include "common/compiler_util.h" // IWYU pragma: keep
-#include "common/logging.h"
#include "common/object_pool.h"
-#include "common/signal_handler.h"
#include "common/status.h"
-#include "exec/tablet_info.h"
#include "olap/delta_writer_v2.h"
#include "runtime/descriptors.h"
-#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
-#include "service/brpc.h"
-#include "util/brpc_client_cache.h"
#include "util/doris_metrics.h"
-#include "util/network_util.h"
-#include "util/threadpool.h"
-#include "util/thrift_util.h"
-#include "util/uid_util.h"
-#include "vec/core/block.h"
-#include "vec/exprs/vexpr.h"
#include "vec/sink/delta_writer_v2_pool.h"
#include "vec/sink/load_stream_stub.h"
#include "vec/sink/load_stream_stub_pool.h"
-#include "vec/sink/vtablet_block_convertor.h"
-#include "vec/sink/vtablet_finder.h"
namespace doris {
class TExpr;
@@ -65,391 +40,16 @@ class TExpr;
namespace vectorized {
VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor&
row_desc,
- const std::vector<TExpr>& texprs, Status*
status)
- : DataSink(row_desc), _pool(pool) {
- // From the thrift expressions create the real exprs.
- *status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs);
- _name = "VOlapTableSinkV2";
-}
+ const std::vector<TExpr>& texprs, bool
group_commit)
+ : AsyncWriterSink<VTabletWriterV2, VOLAP_TABLE_SINK_V2>(row_desc,
texprs),
+ _pool(pool),
+ _group_commit(group_commit) {}
VOlapTableSinkV2::~VOlapTableSinkV2() = default;
-Status VOlapTableSinkV2::on_partitions_created(TCreatePartitionResult* result)
{
- // add new tablet locations. it will use by address. so add to pool
- auto* new_locations = _pool->add(new
std::vector<TTabletLocation>(result->tablets));
- _location->add_locations(*new_locations);
-
- // update new node info
- _nodes_info->add_nodes(result->nodes);
-
- // incremental open stream
- RETURN_IF_ERROR(_incremental_open_streams(result->partitions));
-
- return Status::OK();
-}
-
-static Status on_partitions_created(void* writer, TCreatePartitionResult*
result) {
- return
static_cast<VOlapTableSinkV2*>(writer)->on_partitions_created(result);
-}
-
-Status VOlapTableSinkV2::_incremental_open_streams(
- const std::vector<TOlapTablePartition>& partitions) {
- // do what we did in prepare() for partitions. indexes which don't change
when we create new partition is orthogonal to partitions.
- std::unordered_set<int64_t> known_indexes;
- std::unordered_set<int64_t> new_backends;
- for (const auto& t_partition : partitions) {
- VOlapTablePartition* partition = nullptr;
- RETURN_IF_ERROR(_vpartition->generate_partition_from(t_partition,
partition));
- for (const auto& index : partition->indexes) {
- for (const auto& tablet_id : index.tablets) {
- auto nodes = _location->find_tablet(tablet_id)->node_ids;
- for (auto& node : nodes) {
- PTabletID tablet;
- tablet.set_partition_id(partition->id);
- tablet.set_index_id(index.index_id);
- tablet.set_tablet_id(tablet_id);
- if (!_streams_for_node.contains(node)) {
- new_backends.insert(node);
- }
- _tablets_for_node[node].emplace(tablet_id, tablet);
- if (known_indexes.contains(index.index_id)) [[likely]] {
- continue;
- }
- _indexes_from_node[node].emplace_back(tablet);
- known_indexes.insert(index.index_id);
- }
- }
- }
- }
- for (int64_t node_id : new_backends) {
- auto load_streams =
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
- _load_id, _backend_id, node_id, _stream_per_node,
_num_local_sink);
- RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams));
- _streams_for_node[node_id] = load_streams;
- }
- return Status::OK();
-}
-
-Status VOlapTableSinkV2::_init_row_distribution() {
- VRowDistributionContext ctx;
-
- ctx.state = _state;
- ctx.block_convertor = _block_convertor.get();
- ctx.tablet_finder = _tablet_finder.get();
- ctx.vpartition = _vpartition;
- ctx.add_partition_request_timer = _add_partition_request_timer;
- ctx.txn_id = _txn_id;
- ctx.pool = _pool;
- ctx.location = _location;
- ctx.vec_output_expr_ctxs = &_output_vexpr_ctxs;
- ctx.on_partitions_created = &vectorized::on_partitions_created;
- ctx.caller = (void*)this;
- ctx.schema = _schema;
-
- _row_distribution.init(&ctx);
-
- RETURN_IF_ERROR(_row_distribution.open(_output_row_desc));
-
- return Status::OK();
-}
-
Status VOlapTableSinkV2::init(const TDataSink& t_sink) {
- DCHECK(t_sink.__isset.olap_table_sink);
- auto& table_sink = t_sink.olap_table_sink;
- _load_id.set_hi(table_sink.load_id.hi);
- _load_id.set_lo(table_sink.load_id.lo);
- _txn_id = table_sink.txn_id;
- _num_replicas = table_sink.num_replicas;
- _tuple_desc_id = table_sink.tuple_id;
- _write_file_cache = table_sink.write_file_cache;
- _schema.reset(new OlapTableSchemaParam());
- RETURN_IF_ERROR(_schema->init(table_sink.schema));
- _location = _pool->add(new OlapTableLocationParam(table_sink.location));
- _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
-
- // if distributed column list is empty, we can ensure that tablet is with
random distribution info
- // and if load_to_single_tablet is set and set to true, we should find
only one tablet in one partition
- // for the whole olap table sink
- auto find_tablet_mode =
OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
- if (table_sink.partition.distributed_columns.empty()) {
- if (table_sink.__isset.load_to_single_tablet &&
table_sink.load_to_single_tablet) {
- find_tablet_mode =
OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK;
- } else {
- find_tablet_mode =
OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH;
- }
- }
- _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema,
table_sink.partition));
- _tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition,
find_tablet_mode);
- RETURN_IF_ERROR(_vpartition->init());
-
- return Status::OK();
-}
-
-Status VOlapTableSinkV2::prepare(RuntimeState* state) {
- RETURN_IF_ERROR(DataSink::prepare(state));
-
- _state = state;
-
- _sender_id = state->per_fragment_instance_idx();
- _num_senders = state->num_per_fragment_instances();
- _backend_id = state->backend_id();
- _stream_per_node = state->load_stream_per_node();
- _total_streams = state->total_load_streams();
- _num_local_sink = state->num_local_sink();
- DCHECK(_stream_per_node > 0) << "load stream per node should be greator
than 0";
- DCHECK(_total_streams > 0) << "total load streams should be greator than
0";
- DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0";
- LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " <<
_stream_per_node
- << ", total_streams " << _total_streams << ", num_local_sink: "
<< _num_local_sink;
- _is_high_priority =
- (state->execution_timeout() <=
config::load_task_high_priority_threshold_second);
-
- // profile must add to state's object pool
- _profile = state->obj_pool()->add(new RuntimeProfile("VOlapTableSinkV2"));
- _mem_tracker = std::make_shared<MemTracker>("VOlapTableSinkV2:" +
-
std::to_string(state->load_job_id()));
- SCOPED_TIMER(_profile->total_time_counter());
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-
- // get table's tuple descriptor
- _output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
- if (_output_tuple_desc == nullptr) {
- return Status::InternalError("unknown destination tuple descriptor, id
= {}",
- _tuple_desc_id);
- }
- _block_convertor =
std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
- _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(),
- _state->batch_size());
- _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc,
false));
-
- // add all counter
- _input_rows_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT);
- _output_rows_counter = ADD_COUNTER(_profile, "RowsReturned", TUnit::UNIT);
- _filtered_rows_counter = ADD_COUNTER(_profile, "RowsFiltered",
TUnit::UNIT);
- _send_data_timer = ADD_TIMER(_profile, "SendDataTime");
- _wait_mem_limit_timer = ADD_CHILD_TIMER(_profile, "WaitMemLimitTime",
"SendDataTime");
- _row_distribution_timer = ADD_CHILD_TIMER(_profile, "RowDistributionTime",
"SendDataTime");
- _write_memtable_timer = ADD_CHILD_TIMER(_profile, "WriteMemTableTime",
"SendDataTime");
- _validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime");
- _open_timer = ADD_TIMER(_profile, "OpenTime");
- _close_timer = ADD_TIMER(_profile, "CloseWaitTime");
- _close_writer_timer = ADD_CHILD_TIMER(_profile, "CloseWriterTime",
"CloseWaitTime");
- _close_load_timer = ADD_CHILD_TIMER(_profile, "CloseLoadTime",
"CloseWaitTime");
-
- // Prepare the exprs to run.
- RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_row_desc));
- if (config::share_delta_writers) {
- _delta_writer_for_tablet =
ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(
- _load_id, _num_local_sink);
- } else {
- _delta_writer_for_tablet =
std::make_shared<DeltaWriterV2Map>(_load_id);
- }
- return Status::OK();
-}
-
-Status VOlapTableSinkV2::open(RuntimeState* state) {
- // Prepare the exprs to run.
- RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state));
- SCOPED_TIMER(_profile->total_time_counter());
- SCOPED_TIMER(_open_timer);
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
- signal::set_signal_task_id(_load_id);
-
- _build_tablet_node_mapping();
- RETURN_IF_ERROR(_open_streams(_backend_id));
- RETURN_IF_ERROR(_init_row_distribution());
-
- return Status::OK();
-}
-
-Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
- for (auto& [dst_id, _] : _tablets_for_node) {
- auto streams =
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
- _load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
- RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
- _streams_for_node[dst_id] = streams;
- }
- return Status::OK();
-}
-
-Status VOlapTableSinkV2::_open_streams_to_backend(int64_t dst_id,
-
::doris::stream_load::LoadStreams& streams) {
- auto node_info = _nodes_info->find_node(dst_id);
- if (node_info == nullptr) {
- return Status::InternalError("Unknown node {} in tablet location",
dst_id);
- }
- // get tablet schema from each backend only in the 1st stream
- for (auto& stream : streams.streams() | std::ranges::views::take(1)) {
- const std::vector<PTabletID>& tablets_for_schema =
_indexes_from_node[node_info->id];
-
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
*node_info,
- _txn_id, *_schema, tablets_for_schema,
_total_streams,
- _state->enable_profile()));
- }
- // for the rest streams, open without getting tablet schema
- for (auto& stream : streams.streams() | std::ranges::views::drop(1)) {
-
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
*node_info,
- _txn_id, *_schema, {}, _total_streams,
- _state->enable_profile()));
- }
- return Status::OK();
-}
-
-void VOlapTableSinkV2::_build_tablet_node_mapping() {
- std::unordered_set<int64_t> known_indexes;
- for (const auto& partition : _vpartition->get_partitions()) {
- for (const auto& index : partition->indexes) {
- for (const auto& tablet_id : index.tablets) {
- auto nodes = _location->find_tablet(tablet_id)->node_ids;
- for (auto& node : nodes) {
- PTabletID tablet;
- tablet.set_partition_id(partition->id);
- tablet.set_index_id(index.index_id);
- tablet.set_tablet_id(tablet_id);
- _tablets_for_node[node].emplace(tablet_id, tablet);
- if (known_indexes.contains(index.index_id)) [[likely]] {
- continue;
- }
- _indexes_from_node[node].emplace_back(tablet);
- known_indexes.insert(index.index_id);
- }
- }
- }
- }
-}
-
-void
VOlapTableSinkV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>&
row_part_tablet_ids,
- RowsForTablet&
rows_for_tablet) {
- for (int index_idx = 0; index_idx < row_part_tablet_ids.size();
index_idx++) {
- auto& row_ids = row_part_tablet_ids[index_idx].row_ids;
- auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids;
- auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids;
-
- for (int i = 0; i < row_ids.size(); i++) {
- auto& tablet_id = tablet_ids[i];
- auto it = rows_for_tablet.find(tablet_id);
- if (it == rows_for_tablet.end()) {
- Rows rows;
- rows.partition_id = partition_ids[i];
- rows.index_id = _schema->indexes()[index_idx]->index_id;
- rows.row_idxes.reserve(row_ids.size());
- auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows});
- it = tmp_it;
- }
- it->second.row_idxes.push_back(row_ids[i]);
- _number_output_rows++;
- }
- }
-}
-
-Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, int64_t
partition_id, int64_t index_id,
- Streams& streams) {
- auto location = _location->find_tablet(tablet_id);
- if (location == nullptr) {
- return Status::InternalError("unknown tablet location, tablet id =
{}", tablet_id);
- }
- for (auto& node_id : location->node_ids) {
- PTabletID tablet;
- tablet.set_partition_id(partition_id);
- tablet.set_index_id(index_id);
- tablet.set_tablet_id(tablet_id);
- _tablets_for_node[node_id].emplace(tablet_id, tablet);
-
streams.emplace_back(_streams_for_node.at(node_id)->streams().at(_stream_index));
- RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id,
tablet_id));
- }
- _stream_index = (_stream_index + 1) % _stream_per_node;
- return Status::OK();
-}
-
-Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block*
input_block, bool eos) {
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
- Status status = Status::OK();
-
- if (state->query_options().dry_run_query) {
- return status;
- }
-
- auto input_rows = input_block->rows();
- auto input_bytes = input_block->bytes();
- if (UNLIKELY(input_rows == 0)) {
- return status;
- }
- SCOPED_TIMER(_profile->total_time_counter());
- _number_input_rows += input_rows;
- // update incrementally so that FE can get the progress.
- // the real 'num_rows_load_total' will be set when sink being closed.
- state->update_num_rows_load_total(input_rows);
- state->update_num_bytes_load_total(input_bytes);
- DorisMetrics::instance()->load_rows->increment(input_rows);
- DorisMetrics::instance()->load_bytes->increment(input_bytes);
-
- bool has_filtered_rows = false;
- int64_t filtered_rows = 0;
-
- SCOPED_RAW_TIMER(&_send_data_ns);
- // This is just for passing compilation.
- _row_distribution_watch.start();
-
- std::shared_ptr<vectorized::Block> block;
- RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
- *input_block, block, filtered_rows, has_filtered_rows,
_row_part_tablet_ids));
- RowsForTablet rows_for_tablet;
- _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet);
-
- _row_distribution_watch.stop();
-
- // For each tablet, send its input_rows from block to delta writer
- for (const auto& [tablet_id, rows] : rows_for_tablet) {
- Streams streams;
- RETURN_IF_ERROR(_select_streams(tablet_id, rows.partition_id,
rows.index_id, streams));
- RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows, streams));
- }
-
- return Status::OK();
-}
-
-Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block>
block,
- int64_t tablet_id, const Rows& rows,
- const Streams& streams) {
- DeltaWriterV2* delta_writer =
_delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
- WriteRequest req {
- .tablet_id = tablet_id,
- .txn_id = _txn_id,
- .index_id = rows.index_id,
- .partition_id = rows.partition_id,
- .load_id = _load_id,
- .tuple_desc = _output_tuple_desc,
- .table_schema_param = _schema.get(),
- .is_high_priority = _is_high_priority,
- .write_file_cache = _write_file_cache,
- };
- for (auto& index : _schema->indexes()) {
- if (index->index_id == rows.index_id) {
- req.slots = &index->slots;
- req.schema_hash = index->schema_hash;
- break;
- }
- }
- return DeltaWriterV2::open(&req, streams);
- });
- {
- SCOPED_TIMER(_wait_mem_limit_timer);
-
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
- }
- SCOPED_TIMER(_write_memtable_timer);
- auto st = delta_writer->write(block.get(), rows.row_idxes, false);
- return st;
-}
-
-Status VOlapTableSinkV2::_cancel(Status status) {
- LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id)
- << ", txn_id=" << _txn_id << ", due to error: " << status;
- if (_delta_writer_for_tablet) {
- _delta_writer_for_tablet->cancel(status);
- _delta_writer_for_tablet.reset();
- }
- for (const auto& [_, streams] : _streams_for_node) {
- streams->release();
- }
+ RETURN_IF_ERROR(AsyncWriterSink::init(t_sink));
+ RETURN_IF_ERROR(_writer->init_properties(_pool, _group_commit));
return Status::OK();
}
@@ -457,97 +57,8 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status
exec_status) {
if (_closed) {
return _close_status;
}
- SCOPED_TIMER(_close_timer);
- Status status = exec_status;
- if (status.ok()) {
- // only if status is ok can we call this
_profile->total_time_counter().
- // if status is not ok, this sink may not be prepared, so that
_profile is null
- SCOPED_TIMER(_profile->total_time_counter());
-
- COUNTER_SET(_input_rows_counter, _number_input_rows);
- COUNTER_SET(_output_rows_counter, _number_output_rows);
- COUNTER_SET(_filtered_rows_counter,
- _block_convertor->num_filtered_rows() +
_tablet_finder->num_filtered_rows());
- COUNTER_SET(_send_data_timer, _send_data_ns);
- COUNTER_SET(_row_distribution_timer,
(int64_t)_row_distribution_watch.elapsed_time());
- COUNTER_SET(_validate_data_timer,
_block_convertor->validate_data_ns());
-
- // release streams from the pool first, to prevent memory leak
- for (const auto& [_, streams] : _streams_for_node) {
- streams->release();
- }
-
- {
- SCOPED_TIMER(_close_writer_timer);
- // close all delta writers if this is the last user
- RETURN_IF_ERROR(_delta_writer_for_tablet->close(_profile));
- _delta_writer_for_tablet.reset();
- }
-
- {
- // send CLOSE_LOAD to all streams, return ERROR if any
- for (const auto& [_, streams] : _streams_for_node) {
- RETURN_IF_ERROR(_close_load(streams->streams()));
- }
- }
-
- {
- SCOPED_TIMER(_close_load_timer);
- for (const auto& [_, streams] : _streams_for_node) {
- for (const auto& stream : streams->streams()) {
- RETURN_IF_ERROR(stream->close_wait());
- }
- }
- }
-
- std::vector<TTabletCommitInfo> tablet_commit_infos;
- for (const auto& [node_id, streams] : _streams_for_node) {
- for (const auto& stream : streams->streams()) {
- for (auto tablet_id : stream->success_tablets()) {
- TTabletCommitInfo commit_info;
- commit_info.tabletId = tablet_id;
- commit_info.backendId = node_id;
- tablet_commit_infos.emplace_back(std::move(commit_info));
- }
- }
- }
- state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
-
std::make_move_iterator(tablet_commit_infos.begin()),
-
std::make_move_iterator(tablet_commit_infos.end()));
- _streams_for_node.clear();
-
- // _number_input_rows don't contain num_rows_load_filtered and
num_rows_load_unselected in scan node
- int64_t num_rows_load_total = _number_input_rows +
state->num_rows_load_filtered() +
- state->num_rows_load_unselected();
- state->set_num_rows_load_total(num_rows_load_total);
-
state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
-
_tablet_finder->num_filtered_rows());
- state->update_num_rows_load_unselected(
- _tablet_finder->num_immutable_partition_filtered_rows());
-
- LOG(INFO) << "finished to close olap table sink. load_id=" <<
print_id(_load_id)
- << ", txn_id=" << _txn_id;
- } else {
- RETURN_IF_ERROR(_cancel(status));
- }
-
- _close_status = status;
- RETURN_IF_ERROR(DataSink::close(state, exec_status));
- return status;
-}
-
-Status VOlapTableSinkV2::_close_load(const Streams& streams) {
- auto node_id = streams[0]->dst_id();
- std::vector<PTabletID> tablets_to_commit;
- for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) {
- if (_tablet_finder->partition_ids().contains(tablet.partition_id())) {
- tablets_to_commit.push_back(tablet);
- }
- }
- for (const auto& stream : streams) {
- RETURN_IF_ERROR(stream->close_load(tablets_to_commit));
- }
- return Status::OK();
+ _close_status = AsyncWriterSink::close(state, exec_status);
+ return _close_status;
}
} // namespace vectorized
diff --git a/be/src/vec/sink/vtablet_sink_v2.h
b/be/src/vec/sink/vtablet_sink_v2.h
index 1f317420de4..cef4659bddb 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -26,204 +26,46 @@
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
-#include <stddef.h>
-#include <stdint.h>
#include <atomic>
+#include <cstddef>
+#include <cstdint>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
-#include <functional>
-#include <initializer_list>
-#include <map>
-#include <memory>
-#include <mutex>
-#include <ostream>
-#include <queue>
-#include <set>
-#include <string>
-#include <unordered_map>
-#include <unordered_set>
-#include <utility>
#include <vector>
-#include "common/config.h"
#include "common/status.h"
-#include "exec/data_sink.h"
-#include "exec/tablet_info.h"
-#include "gutil/ref_counted.h"
#include "runtime/exec_env.h"
-#include "runtime/memory/mem_tracker.h"
-#include "runtime/thread_context.h"
#include "runtime/types.h"
-#include "util/countdown_latch.h"
-#include "util/runtime_profile.h"
-#include "util/stopwatch.hpp"
-#include "vec/columns/column.h"
-#include "vec/common/allocator.h"
-#include "vec/common/hash_table/phmap_fwd_decl.h"
-#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
-#include "vec/sink/vrow_distribution.h"
+#include "vec/sink/async_writer_sink.h"
+#include "vec/sink/writer/vtablet_writer_v2.h"
namespace doris {
-class DeltaWriterV2;
-class LoadStreamStub;
-class ObjectPool;
-class RowDescriptor;
-class RuntimeState;
-class TDataSink;
-class TExpr;
-class TabletSchema;
-class TupleDescriptor;
-
-namespace stream_load {
-class LoadStreams;
-}
namespace vectorized {
-class OlapTableBlockConvertor;
-class OlapTabletFinder;
-class VOlapTableSinkV2;
-class DeltaWriterV2Map;
-
-using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
-
-struct Rows {
- int64_t partition_id;
- int64_t index_id;
- std::vector<int32_t> row_idxes;
-};
-
-using RowsForTablet = std::unordered_map<int64_t, Rows>;
+inline constexpr char VOLAP_TABLE_SINK_V2[] = "VOlapTableSinkV2";
-// Write block data to Olap Table.
-// When OlapTableSink::open() called, there will be a consumer thread running
in the background.
-// When you call VOlapTableSinkV2::send(), you will be the producer who
products pending batches.
-// Join the consumer thread in close().
-class VOlapTableSinkV2 final : public DataSink {
+class VOlapTableSinkV2 final : public AsyncWriterSink<VTabletWriterV2,
VOLAP_TABLE_SINK_V2> {
public:
// Construct from thrift struct which is generated by FE.
VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc,
- const std::vector<TExpr>& texprs, Status* status);
+ const std::vector<TExpr>& texprs, bool group_commit);
~VOlapTableSinkV2() override;
Status init(const TDataSink& sink) override;
- // TODO: unify the code of prepare/open/close with result sink
- Status prepare(RuntimeState* state) override;
- Status open(RuntimeState* state) override;
-
- Status close(RuntimeState* state, Status close_status) override;
-
- Status send(RuntimeState* state, vectorized::Block* block, bool eos =
false) override;
-
- Status on_partitions_created(TCreatePartitionResult* result);
+ Status close(RuntimeState* state, Status exec_status) override;
private:
- Status _init_row_distribution();
-
- Status _open_streams(int64_t src_id);
-
- Status _open_streams_to_backend(int64_t dst_id,
::doris::stream_load::LoadStreams& streams);
-
- Status _incremental_open_streams(const std::vector<TOlapTablePartition>&
partitions);
-
- void _build_tablet_node_mapping();
-
- void _generate_rows_for_tablet(std::vector<RowPartTabletIds>&
row_part_tablet_ids,
- RowsForTablet& rows_for_tablet);
-
- Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t
tablet_id,
- const Rows& rows, const Streams& streams);
-
- Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t
index_id,
- Streams& streams);
-
- Status _close_load(const Streams& streams);
-
- Status _cancel(Status status);
-
- std::shared_ptr<MemTracker> _mem_tracker;
-
ObjectPool* _pool;
- // unique load id
- PUniqueId _load_id;
- int64_t _txn_id = -1;
- int _num_replicas = -1;
- int _tuple_desc_id = -1;
-
- // this is tuple descriptor of destination OLAP table
- TupleDescriptor* _output_tuple_desc = nullptr;
- RowDescriptor* _output_row_desc = nullptr;
-
- // number of senders used to insert into OlapTable, if we only support
single node insert,
- // all data from select should collectted and then send to OlapTable.
- // To support multiple senders, we maintain a channel for each sender.
- int _sender_id = -1;
- int _num_senders = -1;
- int64_t _backend_id = -1;
- int _stream_per_node = -1;
- int _total_streams = -1;
- int _num_local_sink = -1;
- bool _is_high_priority = false;
- bool _write_file_cache = false;
-
- // TODO(zc): think about cache this data
- std::shared_ptr<OlapTableSchemaParam> _schema;
- OlapTableLocationParam* _location = nullptr;
- DorisNodesInfo* _nodes_info = nullptr;
-
- std::unique_ptr<OlapTabletFinder> _tablet_finder;
-
- std::unique_ptr<OlapTableBlockConvertor> _block_convertor;
-
- // Stats for this
- int64_t _send_data_ns = 0;
- int64_t _number_input_rows = 0;
- int64_t _number_output_rows = 0;
-
- MonotonicStopWatch _row_distribution_watch;
-
- RuntimeProfile::Counter* _input_rows_counter = nullptr;
- RuntimeProfile::Counter* _output_rows_counter = nullptr;
- RuntimeProfile::Counter* _filtered_rows_counter = nullptr;
- RuntimeProfile::Counter* _send_data_timer = nullptr;
- RuntimeProfile::Counter* _row_distribution_timer = nullptr;
- RuntimeProfile::Counter* _write_memtable_timer = nullptr;
- RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr;
- RuntimeProfile::Counter* _validate_data_timer = nullptr;
- RuntimeProfile::Counter* _open_timer = nullptr;
- RuntimeProfile::Counter* _close_timer = nullptr;
- RuntimeProfile::Counter* _close_writer_timer = nullptr;
- RuntimeProfile::Counter* _close_load_timer = nullptr;
- RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
-
- // Save the status of close() method
- Status _close_status;
-
- VOlapTablePartitionParam* _vpartition = nullptr;
- vectorized::VExprContextSPtrs _output_vexpr_ctxs;
-
- RuntimeState* _state = nullptr;
-
- std::unordered_set<int64_t> _opened_partitions;
-
- std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>>
_tablets_for_node;
- std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
-
- std::unordered_map<int64_t,
std::shared_ptr<::doris::stream_load::LoadStreams>>
- _streams_for_node;
-
- size_t _stream_index = 0;
- std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
+ bool _group_commit = false;
- VRowDistribution _row_distribution;
- // reuse to avoid frequent memory allocation and release.
- std::vector<RowPartTabletIds> _row_part_tablet_ids;
+ Status _close_status = Status::OK();
};
} // namespace vectorized
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
similarity index 83%
copy from be/src/vec/sink/vtablet_sink_v2.cpp
copy to be/src/vec/sink/writer/vtablet_writer_v2.cpp
index ac9be0e7fb7..7cf553fddab 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -15,18 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-#include "vec/sink/vtablet_sink_v2.h"
+#include "vec/sink/writer/vtablet_writer_v2.h"
#include <brpc/uri.h>
-#include <bthread/bthread.h>
-#include <fmt/format.h>
#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
-#include <algorithm>
#include <execution>
#include <mutex>
#include <ranges>
@@ -34,7 +31,6 @@
#include <unordered_map>
#include "common/compiler_util.h" // IWYU pragma: keep
-#include "common/logging.h"
#include "common/object_pool.h"
#include "common/signal_handler.h"
#include "common/status.h"
@@ -47,12 +43,10 @@
#include "service/brpc.h"
#include "util/brpc_client_cache.h"
#include "util/doris_metrics.h"
-#include "util/network_util.h"
#include "util/threadpool.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
-#include "vec/exprs/vexpr.h"
#include "vec/sink/delta_writer_v2_pool.h"
#include "vec/sink/load_stream_stub.h"
#include "vec/sink/load_stream_stub_pool.h"
@@ -60,21 +54,17 @@
#include "vec/sink/vtablet_finder.h"
namespace doris {
-class TExpr;
namespace vectorized {
-VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor&
row_desc,
- const std::vector<TExpr>& texprs, Status*
status)
- : DataSink(row_desc), _pool(pool) {
- // From the thrift expressions create the real exprs.
- *status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs);
- _name = "VOlapTableSinkV2";
+VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const
VExprContextSPtrs& output_exprs)
+ : AsyncResultWriter(output_exprs), _t_sink(t_sink) {
+ DCHECK(t_sink.__isset.olap_table_sink);
}
-VOlapTableSinkV2::~VOlapTableSinkV2() = default;
+VTabletWriterV2::~VTabletWriterV2() = default;
-Status VOlapTableSinkV2::on_partitions_created(TCreatePartitionResult* result)
{
+Status VTabletWriterV2::on_partitions_created(TCreatePartitionResult* result) {
// add new tablet locations. it will use by address. so add to pool
auto* new_locations = _pool->add(new
std::vector<TTabletLocation>(result->tablets));
_location->add_locations(*new_locations);
@@ -89,10 +79,10 @@ Status
VOlapTableSinkV2::on_partitions_created(TCreatePartitionResult* result) {
}
static Status on_partitions_created(void* writer, TCreatePartitionResult*
result) {
- return
static_cast<VOlapTableSinkV2*>(writer)->on_partitions_created(result);
+ return
static_cast<VTabletWriterV2*>(writer)->on_partitions_created(result);
}
-Status VOlapTableSinkV2::_incremental_open_streams(
+Status VTabletWriterV2::_incremental_open_streams(
const std::vector<TOlapTablePartition>& partitions) {
// do what we did in prepare() for partitions. indexes which don't change
when we create new partition is orthogonal to partitions.
std::unordered_set<int64_t> known_indexes;
@@ -130,7 +120,7 @@ Status VOlapTableSinkV2::_incremental_open_streams(
return Status::OK();
}
-Status VOlapTableSinkV2::_init_row_distribution() {
+Status VTabletWriterV2::_init_row_distribution() {
VRowDistributionContext ctx;
ctx.state = _state;
@@ -141,7 +131,7 @@ Status VOlapTableSinkV2::_init_row_distribution() {
ctx.txn_id = _txn_id;
ctx.pool = _pool;
ctx.location = _location;
- ctx.vec_output_expr_ctxs = &_output_vexpr_ctxs;
+ ctx.vec_output_expr_ctxs = &_vec_output_expr_ctxs;
ctx.on_partitions_created = &vectorized::on_partitions_created;
ctx.caller = (void*)this;
ctx.schema = _schema;
@@ -153,11 +143,17 @@ Status VOlapTableSinkV2::_init_row_distribution() {
return Status::OK();
}
-Status VOlapTableSinkV2::init(const TDataSink& t_sink) {
- DCHECK(t_sink.__isset.olap_table_sink);
- auto& table_sink = t_sink.olap_table_sink;
+Status VTabletWriterV2::init_properties(ObjectPool* pool, bool group_commit) {
+ _pool = pool;
+ _group_commit = group_commit;
+ return Status::OK();
+}
+
+Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
+ auto& table_sink = _t_sink.olap_table_sink;
_load_id.set_hi(table_sink.load_id.hi);
_load_id.set_lo(table_sink.load_id.lo);
+ signal::set_signal_task_id(_load_id);
_txn_id = table_sink.txn_id;
_num_replicas = table_sink.num_replicas;
_tuple_desc_id = table_sink.tuple_id;
@@ -182,13 +178,8 @@ Status VOlapTableSinkV2::init(const TDataSink& t_sink) {
_tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition,
find_tablet_mode);
RETURN_IF_ERROR(_vpartition->init());
- return Status::OK();
-}
-
-Status VOlapTableSinkV2::prepare(RuntimeState* state) {
- RETURN_IF_ERROR(DataSink::prepare(state));
-
_state = state;
+ _profile = profile;
_sender_id = state->per_fragment_instance_idx();
_num_senders = state->num_per_fragment_instances();
@@ -205,9 +196,9 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
(state->execution_timeout() <=
config::load_task_high_priority_threshold_second);
// profile must add to state's object pool
- _profile = state->obj_pool()->add(new RuntimeProfile("VOlapTableSinkV2"));
- _mem_tracker = std::make_shared<MemTracker>("VOlapTableSinkV2:" +
-
std::to_string(state->load_job_id()));
+ _profile = state->obj_pool()->add(new RuntimeProfile("VTabletWriterV2"));
+ _mem_tracker =
+ std::make_shared<MemTracker>("VTabletWriterV2:" +
std::to_string(state->load_job_id()));
SCOPED_TIMER(_profile->total_time_counter());
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
@@ -236,8 +227,6 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
_close_writer_timer = ADD_CHILD_TIMER(_profile, "CloseWriterTime",
"CloseWaitTime");
_close_load_timer = ADD_CHILD_TIMER(_profile, "CloseLoadTime",
"CloseWaitTime");
- // Prepare the exprs to run.
- RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_row_desc));
if (config::share_delta_writers) {
_delta_writer_for_tablet =
ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(
_load_id, _num_local_sink);
@@ -247,13 +236,11 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
return Status::OK();
}
-Status VOlapTableSinkV2::open(RuntimeState* state) {
- // Prepare the exprs to run.
- RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state));
+Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {
+ RETURN_IF_ERROR(_init(state, profile));
SCOPED_TIMER(_profile->total_time_counter());
SCOPED_TIMER(_open_timer);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
- signal::set_signal_task_id(_load_id);
_build_tablet_node_mapping();
RETURN_IF_ERROR(_open_streams(_backend_id));
@@ -262,7 +249,7 @@ Status VOlapTableSinkV2::open(RuntimeState* state) {
return Status::OK();
}
-Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
+Status VTabletWriterV2::_open_streams(int64_t src_id) {
for (auto& [dst_id, _] : _tablets_for_node) {
auto streams =
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
_load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
@@ -272,9 +259,9 @@ Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
return Status::OK();
}
-Status VOlapTableSinkV2::_open_streams_to_backend(int64_t dst_id,
-
::doris::stream_load::LoadStreams& streams) {
- auto node_info = _nodes_info->find_node(dst_id);
+Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id,
+
::doris::stream_load::LoadStreams& streams) {
+ const auto* node_info = _nodes_info->find_node(dst_id);
if (node_info == nullptr) {
return Status::InternalError("Unknown node {} in tablet location",
dst_id);
}
@@ -294,7 +281,7 @@ Status VOlapTableSinkV2::_open_streams_to_backend(int64_t
dst_id,
return Status::OK();
}
-void VOlapTableSinkV2::_build_tablet_node_mapping() {
+void VTabletWriterV2::_build_tablet_node_mapping() {
std::unordered_set<int64_t> known_indexes;
for (const auto& partition : _vpartition->get_partitions()) {
for (const auto& index : partition->indexes) {
@@ -317,8 +304,8 @@ void VOlapTableSinkV2::_build_tablet_node_mapping() {
}
}
-void
VOlapTableSinkV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>&
row_part_tablet_ids,
- RowsForTablet&
rows_for_tablet) {
+void VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>&
row_part_tablet_ids,
+ RowsForTablet&
rows_for_tablet) {
for (int index_idx = 0; index_idx < row_part_tablet_ids.size();
index_idx++) {
auto& row_ids = row_part_tablet_ids[index_idx].row_ids;
auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids;
@@ -341,8 +328,8 @@ void
VOlapTableSinkV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>&
}
}
-Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, int64_t
partition_id, int64_t index_id,
- Streams& streams) {
+Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t
partition_id, int64_t index_id,
+ Streams& streams) {
auto location = _location->find_tablet(tablet_id);
if (location == nullptr) {
return Status::InternalError("unknown tablet location, tablet id =
{}", tablet_id);
@@ -360,16 +347,16 @@ Status VOlapTableSinkV2::_select_streams(int64_t
tablet_id, int64_t partition_id
return Status::OK();
}
-Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block*
input_block, bool eos) {
+Status VTabletWriterV2::append_block(Block& input_block) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Status status = Status::OK();
- if (state->query_options().dry_run_query) {
+ if (_state->query_options().dry_run_query) {
return status;
}
- auto input_rows = input_block->rows();
- auto input_bytes = input_block->bytes();
+ auto input_rows = input_block.rows();
+ auto input_bytes = input_block.bytes();
if (UNLIKELY(input_rows == 0)) {
return status;
}
@@ -377,8 +364,8 @@ Status VOlapTableSinkV2::send(RuntimeState* state,
vectorized::Block* input_bloc
_number_input_rows += input_rows;
// update incrementally so that FE can get the progress.
// the real 'num_rows_load_total' will be set when sink being closed.
- state->update_num_rows_load_total(input_rows);
- state->update_num_bytes_load_total(input_bytes);
+ _state->update_num_rows_load_total(input_rows);
+ _state->update_num_bytes_load_total(input_bytes);
DorisMetrics::instance()->load_rows->increment(input_rows);
DorisMetrics::instance()->load_bytes->increment(input_bytes);
@@ -391,7 +378,7 @@ Status VOlapTableSinkV2::send(RuntimeState* state,
vectorized::Block* input_bloc
std::shared_ptr<vectorized::Block> block;
RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
- *input_block, block, filtered_rows, has_filtered_rows,
_row_part_tablet_ids));
+ input_block, block, filtered_rows, has_filtered_rows,
_row_part_tablet_ids));
RowsForTablet rows_for_tablet;
_generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet);
@@ -407,9 +394,8 @@ Status VOlapTableSinkV2::send(RuntimeState* state,
vectorized::Block* input_bloc
return Status::OK();
}
-Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block>
block,
- int64_t tablet_id, const Rows& rows,
- const Streams& streams) {
+Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block>
block, int64_t tablet_id,
+ const Rows& rows, const Streams&
streams) {
DeltaWriterV2* delta_writer =
_delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
WriteRequest req {
.tablet_id = tablet_id,
@@ -440,7 +426,7 @@ Status
VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> bloc
return st;
}
-Status VOlapTableSinkV2::_cancel(Status status) {
+Status VTabletWriterV2::_cancel(Status status) {
LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id)
<< ", txn_id=" << _txn_id << ", due to error: " << status;
if (_delta_writer_for_tablet) {
@@ -453,8 +439,9 @@ Status VOlapTableSinkV2::_cancel(Status status) {
return Status::OK();
}
-Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) {
- if (_closed) {
+Status VTabletWriterV2::close(Status exec_status) {
+ std::lock_guard<std::mutex> close_lock(_close_mutex);
+ if (_is_closed) {
return _close_status;
}
SCOPED_TIMER(_close_timer);
@@ -511,18 +498,18 @@ Status VOlapTableSinkV2::close(RuntimeState* state,
Status exec_status) {
}
}
}
- state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
-
std::make_move_iterator(tablet_commit_infos.begin()),
-
std::make_move_iterator(tablet_commit_infos.end()));
+
_state->tablet_commit_infos().insert(_state->tablet_commit_infos().end(),
+
std::make_move_iterator(tablet_commit_infos.begin()),
+
std::make_move_iterator(tablet_commit_infos.end()));
_streams_for_node.clear();
// _number_input_rows don't contain num_rows_load_filtered and
num_rows_load_unselected in scan node
- int64_t num_rows_load_total = _number_input_rows +
state->num_rows_load_filtered() +
- state->num_rows_load_unselected();
- state->set_num_rows_load_total(num_rows_load_total);
-
state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
-
_tablet_finder->num_filtered_rows());
- state->update_num_rows_load_unselected(
+ int64_t num_rows_load_total = _number_input_rows +
_state->num_rows_load_filtered() +
+ _state->num_rows_load_unselected();
+ _state->set_num_rows_load_total(num_rows_load_total);
+
_state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
+
_tablet_finder->num_filtered_rows());
+ _state->update_num_rows_load_unselected(
_tablet_finder->num_immutable_partition_filtered_rows());
LOG(INFO) << "finished to close olap table sink. load_id=" <<
print_id(_load_id)
@@ -531,12 +518,12 @@ Status VOlapTableSinkV2::close(RuntimeState* state,
Status exec_status) {
RETURN_IF_ERROR(_cancel(status));
}
+ _is_closed = true;
_close_status = status;
- RETURN_IF_ERROR(DataSink::close(state, exec_status));
return status;
}
-Status VOlapTableSinkV2::_close_load(const Streams& streams) {
+Status VTabletWriterV2::_close_load(const Streams& streams) {
auto node_id = streams[0]->dst_id();
std::vector<PTabletID> tablets_to_commit;
for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) {
diff --git a/be/src/vec/sink/vtablet_sink_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
similarity index 87%
copy from be/src/vec/sink/vtablet_sink_v2.h
copy to be/src/vec/sink/writer/vtablet_writer_v2.h
index 1f317420de4..d4ccf7b6523 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -26,10 +26,10 @@
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
-#include <stddef.h>
-#include <stdint.h>
#include <atomic>
+#include <cstddef>
+#include <cstdint>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <functional>
@@ -48,7 +48,6 @@
#include "common/config.h"
#include "common/status.h"
-#include "exec/data_sink.h"
#include "exec/tablet_info.h"
#include "gutil/ref_counted.h"
#include "runtime/exec_env.h"
@@ -65,6 +64,7 @@
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
#include "vec/sink/vrow_distribution.h"
+#include "vec/sink/writer/async_result_writer.h"
namespace doris {
class DeltaWriterV2;
@@ -85,7 +85,7 @@ namespace vectorized {
class OlapTableBlockConvertor;
class OlapTabletFinder;
-class VOlapTableSinkV2;
+class VTabletWriterV2;
class DeltaWriterV2Map;
using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
@@ -100,31 +100,30 @@ using RowsForTablet = std::unordered_map<int64_t, Rows>;
// Write block data to Olap Table.
// When OlapTableSink::open() called, there will be a consumer thread running
in the background.
-// When you call VOlapTableSinkV2::send(), you will be the producer who
products pending batches.
+// When you call VTabletWriterV2::send(), you will be the producer who
products pending batches.
// Join the consumer thread in close().
-class VOlapTableSinkV2 final : public DataSink {
+class VTabletWriterV2 final : public AsyncResultWriter {
public:
// Construct from thrift struct which is generated by FE.
- VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc,
- const std::vector<TExpr>& texprs, Status* status);
+ VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs&
output_exprs);
- ~VOlapTableSinkV2() override;
+ ~VTabletWriterV2() override;
- Status init(const TDataSink& sink) override;
- // TODO: unify the code of prepare/open/close with result sink
- Status prepare(RuntimeState* state) override;
+ Status init_properties(ObjectPool* pool, bool group_commit);
- Status open(RuntimeState* state) override;
+ Status append_block(Block& block) override;
- Status close(RuntimeState* state, Status close_status) override;
+ Status open(RuntimeState* state, RuntimeProfile* profile) override;
- Status send(RuntimeState* state, vectorized::Block* block, bool eos =
false) override;
+ Status close(Status close_status) override;
Status on_partitions_created(TCreatePartitionResult* result);
private:
Status _init_row_distribution();
+ Status _init(RuntimeState* state, RuntimeProfile* profile);
+
Status _open_streams(int64_t src_id);
Status _open_streams_to_backend(int64_t dst_id,
::doris::stream_load::LoadStreams& streams);
@@ -148,6 +147,7 @@ private:
std::shared_ptr<MemTracker> _mem_tracker;
+ TDataSink _t_sink;
ObjectPool* _pool;
// unique load id
@@ -202,13 +202,16 @@ private:
RuntimeProfile::Counter* _close_load_timer = nullptr;
RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
+ std::mutex _close_mutex;
+ bool _is_closed = false;
// Save the status of close() method
Status _close_status;
VOlapTablePartitionParam* _vpartition = nullptr;
- vectorized::VExprContextSPtrs _output_vexpr_ctxs;
- RuntimeState* _state = nullptr;
+ RuntimeState* _state = nullptr; // not owned, set when open
+ RuntimeProfile* _profile = nullptr; // not owned, set when open
+ bool _group_commit = false;
std::unordered_set<int64_t> _opened_partitions;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]