This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a2419a8eb40 [enhancement](sink) refactor code of auto partition and 
where clause and enable them on sinkv2 (#26432)
a2419a8eb40 is described below

commit a2419a8eb408c0c890d48e0905ee199eb3deb9a5
Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com>
AuthorDate: Wed Nov 8 11:51:40 2023 +0800

    [enhancement](sink) refactor code of auto partition and where clause and 
enable them on sinkv2 (#26432)
    
    For better performance and elasticity, we move memtable from loadchannel to
    sink, VTabletSinkV2 is introduced, then there are VTabletWriter and
    VTabletSinkV2 distributing rows to tablets. where clauses on mvs are
    executed in VTabletWriter, while VTabletSinkV2 needs it too. So common code
    is moved to row distribution.
    
    Actually, we can layer code by rows' data flow, then the code is much more
    understood and maintainable.
    
    ScanNode -> Sink/Writer (RowDistribution -> IndexChannel / DeltaWriter)
---
 be/src/vec/sink/vrow_distribution.cpp     | 306 ++++++++++++++++++++++
 be/src/vec/sink/vrow_distribution.h       | 160 ++++++++++++
 be/src/vec/sink/vtablet_sink_v2.cpp       | 101 ++++----
 be/src/vec/sink/vtablet_sink_v2.h         |  22 +-
 be/src/vec/sink/writer/vtablet_writer.cpp | 408 +++++++-----------------------
 be/src/vec/sink/writer/vtablet_writer.h   |  37 ++-
 6 files changed, 655 insertions(+), 379 deletions(-)

diff --git a/be/src/vec/sink/vrow_distribution.cpp 
b/be/src/vec/sink/vrow_distribution.cpp
new file mode 100644
index 00000000000..78d3b062045
--- /dev/null
+++ b/be/src/vec/sink/vrow_distribution.cpp
@@ -0,0 +1,306 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/vrow_distribution.h"
+
+#include <gen_cpp/FrontendService.h>
+#include <gen_cpp/FrontendService_types.h>
+
+#include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
+#include "util/thrift_rpc_helper.h"
+#include "vec/columns/column_const.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_vector.h"
+#include "vec/sink/writer/vtablet_writer.h"
+
+namespace doris::vectorized {
+
+std::pair<vectorized::VExprContextSPtr, vectorized::VExprSPtr>
+VRowDistribution::_get_partition_function() {
+    return {_vpartition->get_part_func_ctx(), 
_vpartition->get_partition_function()};
+}
+
+void VRowDistribution::_save_missing_values(vectorized::ColumnPtr col,
+                                            vectorized::DataTypePtr 
value_type) {
+    _partitions_need_create.clear();
+    std::set<std::string> deduper;
+    // de-duplication
+    for (auto row : _missing_map) {
+        deduper.emplace(value_type->to_string(*col, row));
+    }
+    for (auto& value : deduper) {
+        TStringLiteral node;
+        node.value = value;
+        _partitions_need_create.emplace_back(std::vector {node}); // only 1 
partition column now
+    }
+}
+
+Status VRowDistribution::_automatic_create_partition() {
+    SCOPED_TIMER(_add_partition_request_timer);
+    TCreatePartitionRequest request;
+    TCreatePartitionResult result;
+    request.__set_txn_id(_txn_id);
+    request.__set_db_id(_vpartition->db_id());
+    request.__set_table_id(_vpartition->table_id());
+    request.__set_partitionValues(_partitions_need_create);
+
+    VLOG(1) << "automatic partition rpc begin request " << request;
+    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
+    int time_out = _state->execution_timeout() * 1000;
+    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
+            master_addr.hostname, master_addr.port,
+            [&request, &result](FrontendServiceConnection& client) {
+                client->createPartition(result, request);
+            },
+            time_out));
+
+    Status status(Status::create(result.status));
+    VLOG(1) << "automatic partition rpc end response " << result;
+    if (result.status.status_code == TStatusCode::OK) {
+        // add new created partitions
+        RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions));
+        RETURN_IF_ERROR(_on_partitions_created(_caller, &result));
+    }
+
+    return status;
+}
+
+void VRowDistribution::_get_tablet_ids(vectorized::Block* block, int32_t 
index_idx,
+                                       std::vector<int64_t>& tablet_ids) {
+    tablet_ids.reserve(block->rows());
+    for (int row_idx = 0; row_idx < block->rows(); row_idx++) {
+        if (_skip[row_idx]) {
+            continue;
+        }
+        auto& partition = _partitions[row_idx];
+        auto& tablet_index = _tablet_indexes[row_idx];
+        auto& index = partition->indexes[index_idx];
+
+        auto tablet_id = index.tablets[tablet_index];
+        tablet_ids[row_idx] = tablet_id;
+    }
+}
+
+void VRowDistribution::_filter_block_by_skip(vectorized::Block* block,
+                                             RowPartTabletIds& 
row_part_tablet_id) {
+    auto& row_ids = row_part_tablet_id.row_ids;
+    auto& partition_ids = row_part_tablet_id.partition_ids;
+    auto& tablet_ids = row_part_tablet_id.tablet_ids;
+
+    for (size_t i = 0; i < block->rows(); i++) {
+        if (!_skip[i]) {
+            row_ids.emplace_back(i);
+            partition_ids.emplace_back(_partitions[i]->id);
+            tablet_ids.emplace_back(_tablet_ids[i]);
+        }
+    }
+}
+
+Status VRowDistribution::_filter_block_by_skip_and_where_clause(
+        vectorized::Block* block, const vectorized::VExprContextSPtr& 
where_clause,
+        RowPartTabletIds& row_part_tablet_id) {
+    // TODO
+    //SCOPED_RAW_TIMER(&_stat.where_clause_ns);
+    int result_index = -1;
+    size_t column_number = block->columns();
+    RETURN_IF_ERROR(where_clause->execute(block, &result_index));
+
+    auto filter_column = block->get_by_position(result_index).column;
+
+    auto& row_ids = row_part_tablet_id.row_ids;
+    auto& partition_ids = row_part_tablet_id.partition_ids;
+    auto& tablet_ids = row_part_tablet_id.tablet_ids;
+    if (auto* nullable_column =
+                
vectorized::check_and_get_column<vectorized::ColumnNullable>(*filter_column)) {
+        for (size_t i = 0; i < block->rows(); i++) {
+            if (nullable_column->get_bool_inline(i) && !_skip[i]) {
+                row_ids.emplace_back(i);
+                partition_ids.emplace_back(_partitions[i]->id);
+                tablet_ids.emplace_back(_tablet_ids[i]);
+            }
+        }
+    } else if (auto* const_column =
+                       
vectorized::check_and_get_column<vectorized::ColumnConst>(*filter_column)) {
+        bool ret = const_column->get_bool(0);
+        if (!ret) {
+            return Status::OK();
+        }
+        // should we optimize?
+        _filter_block_by_skip(block, row_part_tablet_id);
+    } else {
+        auto& filter = assert_cast<const 
vectorized::ColumnUInt8&>(*filter_column).get_data();
+        for (size_t i = 0; i < block->rows(); i++) {
+            if (filter[i] != 0 && !_skip[i]) {
+                row_ids.emplace_back(i);
+                partition_ids.emplace_back(_partitions[i]->id);
+                tablet_ids.emplace_back(_tablet_ids[i]);
+            }
+        }
+    }
+
+    for (size_t i = block->columns() - 1; i >= column_number; i--) {
+        block->erase(i);
+    }
+    return Status::OK();
+}
+
+Status VRowDistribution::_filter_block(vectorized::Block* block,
+                                       std::vector<RowPartTabletIds>& 
row_part_tablet_ids) {
+    for (int i = 0; i < _schema->indexes().size(); i++) {
+        _get_tablet_ids(block, i, _tablet_ids);
+        auto& where_clause = _schema->indexes()[i]->where_clause;
+        if (where_clause != nullptr) {
+            RETURN_IF_ERROR(_filter_block_by_skip_and_where_clause(block, 
where_clause,
+                                                                   
row_part_tablet_ids[i]));
+        } else {
+            _filter_block_by_skip(block, row_part_tablet_ids[i]);
+        }
+    }
+    return Status::OK();
+}
+
+Status VRowDistribution::_generate_rows_distribution_for_non_auto_parititon(
+        vectorized::Block* block, bool has_filtered_rows,
+        std::vector<RowPartTabletIds>& row_part_tablet_ids) {
+    auto num_rows = block->rows();
+
+    bool stop_processing = false;
+    RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, 
_partitions,
+                                                 _tablet_indexes, 
stop_processing, _skip));
+    if (has_filtered_rows) {
+        for (int i = 0; i < num_rows; i++) {
+            _skip[i] = _skip[i] || _block_convertor->filter_map()[i];
+        }
+    }
+    RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids));
+    return Status::OK();
+}
+
+Status VRowDistribution::_generate_rows_distribution_for_auto_parititon(
+        vectorized::Block* block, int partition_col_idx, bool 
has_filtered_rows,
+        std::vector<RowPartTabletIds>& row_part_tablet_ids) {
+    auto num_rows = block->rows();
+    std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys();
+
+    //TODO: use loop to create missing_vals for multi column.
+    CHECK(partition_keys.size() == 1) << "now support only 1 partition column 
for auto partitions.";
+    auto partition_col = block->get_by_position(partition_keys[0]);
+    _missing_map.clear();
+    _missing_map.reserve(partition_col.column->size());
+    bool stop_processing = false;
+    //TODO: we could use the buffer to save tablets we found so that no need 
to find them again when we created partitions and try to append block next time.
+    RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, 
_partitions,
+                                                 _tablet_indexes, 
stop_processing, _skip,
+                                                 &_missing_map));
+    if (_missing_map.empty()) {
+        // we don't calculate it distribution when have missing values
+        if (has_filtered_rows) {
+            for (int i = 0; i < num_rows; i++) {
+                _skip[i] = _skip[i] || _block_convertor->filter_map()[i];
+            }
+        }
+        RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids));
+    } else { // for missing partition keys, calc the missing partition and 
save in _partitions_need_create
+        auto [part_ctx, part_func] = _get_partition_function();
+        auto return_type = part_func->data_type();
+
+        // expose the data column
+        vectorized::ColumnPtr range_left_col = 
block->get_by_position(partition_col_idx).column;
+        if (const auto* nullable =
+                    
check_and_get_column<vectorized::ColumnNullable>(*range_left_col)) {
+            range_left_col = nullable->get_nested_column_ptr();
+            return_type = assert_cast<const 
vectorized::DataTypeNullable*>(return_type.get())
+                                  ->get_nested_type();
+        }
+        // calc the end value and save them.
+        _save_missing_values(range_left_col, return_type);
+        // then call FE to create it. then FragmentExecutor will redo the load.
+        RETURN_IF_ERROR(_automatic_create_partition());
+        // In the next round, we will _generate_rows_distribution_payload 
again to get right payload of new tablet
+        LOG(INFO) << "Auto created partition. Send block again.";
+        return Status::NeedSendAgain("");
+    } // creating done
+
+    return Status::OK();
+}
+
+void VRowDistribution::_reset_row_part_tablet_ids(
+        std::vector<RowPartTabletIds>& row_part_tablet_ids, int64_t rows) {
+    row_part_tablet_ids.resize(_schema->indexes().size());
+    for (auto& row_part_tablet_id : row_part_tablet_ids) {
+        auto& row_ids = row_part_tablet_id.row_ids;
+        auto& partition_ids = row_part_tablet_id.partition_ids;
+        auto& tablet_ids = row_part_tablet_id.tablet_ids;
+
+        row_ids.clear();
+        partition_ids.clear();
+        tablet_ids.clear();
+        row_ids.reserve(rows);
+        partition_ids.reserve(rows);
+        tablet_ids.reserve(rows);
+    }
+}
+
+Status VRowDistribution::generate_rows_distribution(
+        vectorized::Block& input_block, std::shared_ptr<vectorized::Block>& 
block,
+        int64_t& filtered_rows, bool& has_filtered_rows,
+        std::vector<RowPartTabletIds>& row_part_tablet_ids) {
+    auto input_rows = input_block.rows();
+    _reset_row_part_tablet_ids(row_part_tablet_ids, input_rows);
+
+    int64_t prev_filtered_rows =
+            _block_convertor->num_filtered_rows() + 
_tablet_finder->num_filtered_rows();
+    RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
+            _state, &input_block, block, *_vec_output_expr_ctxs, input_rows, 
has_filtered_rows));
+
+    _tablet_finder->clear_for_new_batch();
+    _row_distribution_watch.start();
+    auto num_rows = block->rows();
+    _tablet_finder->filter_bitmap().Reset(num_rows);
+
+    //reuse vars for find_tablets
+    _partitions.assign(num_rows, nullptr);
+    _skip.assign(num_rows, false);
+    _tablet_indexes.assign(num_rows, 0);
+
+    // if there's projection of partition calc, we need to calc it first.
+    auto [part_ctx, part_func] = _get_partition_function();
+    int partition_col_idx = -1;
+    if (_vpartition->is_projection_partition()) {
+        // calc the start value of missing partition ranges.
+        RETURN_IF_ERROR(part_func->execute(part_ctx.get(), block.get(), 
&partition_col_idx));
+        VLOG_DEBUG << "Partition-calculated block:" << block->dump_data();
+        // change the column to compare to transformed.
+        _vpartition->set_transformed_slots({(uint16_t)partition_col_idx});
+    }
+
+    if (_vpartition->is_auto_partition()) {
+        RETURN_IF_ERROR(_generate_rows_distribution_for_auto_parititon(
+                block.get(), partition_col_idx, has_filtered_rows, 
row_part_tablet_ids));
+    } else { // not auto partition
+        RETURN_IF_ERROR(_generate_rows_distribution_for_non_auto_parititon(
+                block.get(), has_filtered_rows, row_part_tablet_ids));
+    }
+    _row_distribution_watch.stop();
+    filtered_rows = _block_convertor->num_filtered_rows() + 
_tablet_finder->num_filtered_rows() -
+                    prev_filtered_rows;
+    return Status::OK();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/vrow_distribution.h 
b/be/src/vec/sink/vrow_distribution.h
new file mode 100644
index 00000000000..5da964d44fc
--- /dev/null
+++ b/be/src/vec/sink/vrow_distribution.h
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+// IWYU pragma: no_include <bits/chrono.h>
+#include <gen_cpp/FrontendService.h>
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/PaloInternalService_types.h>
+
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/tablet_info.h"
+#include "runtime/types.h"
+#include "util/runtime_profile.h"
+#include "util/stopwatch.hpp"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type.h"
+#include "vec/exprs/vexpr_fwd.h"
+#include "vec/sink/vtablet_block_convertor.h"
+#include "vec/sink/vtablet_finder.h"
+
+namespace doris::vectorized {
+
+class IndexChannel;
+class VNodeChannel;
+
+// <row_idx, partition_id, tablet_id>
+class RowPartTabletIds {
+public:
+    std::vector<int64_t> row_ids;
+    std::vector<int64_t> partition_ids;
+    std::vector<int64_t> tablet_ids;
+};
+
+typedef Status (*OnPartitionsCreated)(void*, TCreatePartitionResult*);
+
+class VRowDistributionContext {
+public:
+    RuntimeState* state = nullptr;
+    OlapTableBlockConvertor* block_convertor = nullptr;
+    OlapTabletFinder* tablet_finder = nullptr;
+    VOlapTablePartitionParam* vpartition = nullptr;
+    RuntimeProfile::Counter* add_partition_request_timer = nullptr;
+    int64_t txn_id = -1;
+    ObjectPool* pool;
+    OlapTableLocationParam* location;
+    const VExprContextSPtrs* vec_output_expr_ctxs;
+    OnPartitionsCreated on_partitions_created;
+    void* caller;
+    std::shared_ptr<OlapTableSchemaParam> schema;
+};
+
+class VRowDistribution {
+public:
+    VRowDistribution() {}
+    virtual ~VRowDistribution() {}
+
+    void init(VRowDistributionContext* ctx) {
+        _state = ctx->state;
+        _block_convertor = ctx->block_convertor;
+        _tablet_finder = ctx->tablet_finder;
+        _vpartition = ctx->vpartition;
+        _add_partition_request_timer = ctx->add_partition_request_timer;
+        _txn_id = ctx->txn_id;
+        _pool = ctx->pool;
+        _location = ctx->location;
+        _vec_output_expr_ctxs = ctx->vec_output_expr_ctxs;
+        _on_partitions_created = ctx->on_partitions_created;
+        _caller = ctx->caller;
+        _schema = ctx->schema;
+    }
+
+    // auto partition
+    // mv where clause
+    // v1 needs index->node->row_ids - tabletids
+    // v2 needs index,tablet->rowids
+    Status generate_rows_distribution(vectorized::Block& input_block,
+                                      std::shared_ptr<vectorized::Block>& 
block,
+                                      int64_t& filtered_rows, bool& 
has_filtered_rows,
+                                      std::vector<RowPartTabletIds>& 
row_part_tablet_ids);
+
+private:
+    std::pair<vectorized::VExprContextSPtr, vectorized::VExprSPtr> 
_get_partition_function();
+
+    void _save_missing_values(vectorized::ColumnPtr col, 
vectorized::DataTypePtr value_type);
+
+    // create partitions when need for auto-partition table using 
#_partitions_need_create.
+    Status _automatic_create_partition();
+
+    void _get_tablet_ids(vectorized::Block* block, int32_t index_idx,
+                         std::vector<int64_t>& tablet_ids);
+
+    void _filter_block_by_skip(vectorized::Block* block, RowPartTabletIds& 
row_part_tablet_id);
+
+    Status _filter_block_by_skip_and_where_clause(vectorized::Block* block,
+                                                  const 
vectorized::VExprContextSPtr& where_clause,
+                                                  RowPartTabletIds& 
row_part_tablet_id);
+
+    Status _filter_block(vectorized::Block* block,
+                         std::vector<RowPartTabletIds>& row_part_tablet_ids);
+
+    Status _generate_rows_distribution_for_auto_parititon(
+            vectorized::Block* block, int partition_col_idx, bool 
has_filtered_rows,
+            std::vector<RowPartTabletIds>& row_part_tablet_ids);
+
+    Status _generate_rows_distribution_for_non_auto_parititon(
+            vectorized::Block* block, bool has_filtered_rows,
+            std::vector<RowPartTabletIds>& row_part_tablet_ids);
+
+    void _reset_row_part_tablet_ids(std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
+                                    int64_t rows);
+
+private:
+    RuntimeState* _state = nullptr;
+
+    // support only one partition column now
+    std::vector<std::vector<TStringLiteral>> _partitions_need_create;
+
+    MonotonicStopWatch _row_distribution_watch;
+    OlapTableBlockConvertor* _block_convertor = nullptr;
+    OlapTabletFinder* _tablet_finder = nullptr;
+    VOlapTablePartitionParam* _vpartition = nullptr;
+    RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
+    int64_t _txn_id = -1;
+    ObjectPool* _pool;
+    OlapTableLocationParam* _location = nullptr;
+    // std::function _on_partition_created;
+    // int64_t _number_output_rows = 0;
+    const VExprContextSPtrs* _vec_output_expr_ctxs;
+    OnPartitionsCreated _on_partitions_created = nullptr;
+    void* _caller;
+    std::shared_ptr<OlapTableSchemaParam> _schema;
+
+    // reuse for find_tablet.
+    std::vector<VOlapTablePartition*> _partitions;
+    std::vector<bool> _skip;
+    std::vector<uint32_t> _tablet_indexes;
+    std::vector<int64_t> _tablet_ids;
+    std::vector<int64_t> _missing_map; // indice of missing values in 
partition_col
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp 
b/be/src/vec/sink/vtablet_sink_v2.cpp
index 696a66ac3fb..69a372e0e38 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -77,6 +77,42 @@ VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const 
RowDescriptor& row_de
 
 VOlapTableSinkV2::~VOlapTableSinkV2() = default;
 
+Status VOlapTableSinkV2::on_partitions_created(TCreatePartitionResult* result) 
{
+    // add new tablet locations. it will use by address. so add to pool
+    auto* new_locations = _pool->add(new 
std::vector<TTabletLocation>(result->tablets));
+    _location->add_locations(*new_locations);
+
+    // update new node info
+    _nodes_info->add_nodes(result->nodes);
+
+    // incremental open stream
+
+    return Status::OK();
+}
+
+static Status on_partitions_created(void* writer, TCreatePartitionResult* 
result) {
+    return 
static_cast<VOlapTableSinkV2*>(writer)->on_partitions_created(result);
+}
+
+void VOlapTableSinkV2::_init_row_distribution() {
+    VRowDistributionContext ctx;
+
+    ctx.state = _state;
+    ctx.block_convertor = _block_convertor.get();
+    ctx.tablet_finder = _tablet_finder.get();
+    ctx.vpartition = _vpartition;
+    ctx.add_partition_request_timer = _add_partition_request_timer;
+    ctx.txn_id = _txn_id;
+    ctx.pool = _pool;
+    ctx.location = _location;
+    ctx.vec_output_expr_ctxs = &_output_vexpr_ctxs;
+    ctx.on_partitions_created = &vectorized::on_partitions_created;
+    ctx.caller = (void*)this;
+    ctx.schema = _schema;
+
+    _row_distribution.init(&ctx);
+}
+
 Status VOlapTableSinkV2::init(const TDataSink& t_sink) {
     DCHECK(t_sink.__isset.olap_table_sink);
     auto& table_sink = t_sink.olap_table_sink;
@@ -104,7 +140,9 @@ Status VOlapTableSinkV2::init(const TDataSink& t_sink) {
     }
     _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, 
table_sink.partition));
     _tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, 
find_tablet_mode);
-    return _vpartition->init();
+    RETURN_IF_ERROR(_vpartition->init());
+
+    return Status::OK();
 }
 
 Status VOlapTableSinkV2::prepare(RuntimeState* state) {
@@ -168,6 +206,7 @@ Status VOlapTableSinkV2::open(RuntimeState* state) {
 
     _build_tablet_node_mapping();
     RETURN_IF_ERROR(_open_streams(state->backend_id()));
+    _init_row_distribution();
 
     return Status::OK();
 }
@@ -222,30 +261,25 @@ void VOlapTableSinkV2::_build_tablet_node_mapping() {
     }
 }
 
-void VOlapTableSinkV2::_generate_rows_for_tablet(
-        RowsForTablet& rows_for_tablet, const 
std::vector<VOlapTablePartition*>& partitions,
-        const std::vector<uint32_t>& tablet_indexes, const std::vector<bool>& 
skip,
-        size_t row_cnt) {
-    for (int row_idx = 0; row_idx < row_cnt; row_idx++) {
-        if (skip[row_idx]) {
-            continue;
-        }
-
-        auto& partition = partitions[row_idx];
-        auto& tablet_index = tablet_indexes[row_idx];
+void 
VOlapTableSinkV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
+                                                 RowsForTablet& 
rows_for_tablet) {
+    for (int index_idx = 0; index_idx < row_part_tablet_ids.size(); 
index_idx++) {
+        auto& row_ids = row_part_tablet_ids[index_idx].row_ids;
+        auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids;
+        auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids;
 
-        for (const auto& index : partition->indexes) {
-            auto tablet_id = index.tablets[tablet_index];
+        for (int i = 0; i < row_ids.size(); i++) {
+            auto& tablet_id = tablet_ids[i];
             auto it = rows_for_tablet.find(tablet_id);
             if (it == rows_for_tablet.end()) {
                 Rows rows;
-                rows.partition_id = partition->id;
-                rows.index_id = index.index_id;
-                rows.row_idxes.reserve(row_cnt);
+                rows.partition_id = partition_ids[i];
+                rows.index_id = _schema->indexes()[index_idx]->index_id;
+                rows.row_idxes.reserve(row_ids.size());
                 auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows});
                 it = tmp_it;
             }
-            it->second.row_idxes.push_back(row_idx);
+            it->second.row_idxes.push_back(row_ids[i]);
             _number_output_rows++;
         }
     }
@@ -285,37 +319,18 @@ Status VOlapTableSinkV2::send(RuntimeState* state, 
vectorized::Block* input_bloc
     DorisMetrics::instance()->load_rows->increment(input_rows);
     DorisMetrics::instance()->load_bytes->increment(input_bytes);
 
-    std::shared_ptr<vectorized::Block> block;
     bool has_filtered_rows = false;
-    RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
-            state, input_block, block, _output_vexpr_ctxs, input_rows, 
has_filtered_rows));
-
-    // clear and release the references of columns
-    input_block->clear();
+    int64_t filtered_rows = 0;
 
     SCOPED_RAW_TIMER(&_send_data_ns);
     // This is just for passing compilation.
-    bool stop_processing = false;
-    RowsForTablet rows_for_tablet;
-    _tablet_finder->clear_for_new_batch();
     _row_distribution_watch.start();
-    const auto num_rows = input_rows;
-    const auto* __restrict filter_map = _block_convertor->filter_map();
 
-    //reuse vars
-    _partitions.assign(num_rows, nullptr);
-    _skip.assign(num_rows, false);
-    _tablet_indexes.assign(num_rows, 0);
-
-    RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), 
num_rows, _partitions,
-                                                 _tablet_indexes, 
stop_processing, _skip));
-
-    if (has_filtered_rows) {
-        for (int i = 0; i < num_rows; i++) {
-            _skip[i] = _skip[i] || filter_map[i];
-        }
-    }
-    _generate_rows_for_tablet(rows_for_tablet, _partitions, _tablet_indexes, 
_skip, num_rows);
+    std::shared_ptr<vectorized::Block> block;
+    RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
+            *input_block, block, filtered_rows, has_filtered_rows, 
_row_part_tablet_ids));
+    RowsForTablet rows_for_tablet;
+    _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet);
 
     _row_distribution_watch.stop();
 
diff --git a/be/src/vec/sink/vtablet_sink_v2.h 
b/be/src/vec/sink/vtablet_sink_v2.h
index 63f0985eb09..a67c4e65cd2 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -64,6 +64,7 @@
 #include "vec/core/block.h"
 #include "vec/data_types/data_type.h"
 #include "vec/exprs/vexpr_fwd.h"
+#include "vec/sink/vrow_distribution.h"
 
 namespace doris {
 class DeltaWriterV2;
@@ -112,17 +113,20 @@ public:
     Status open(RuntimeState* state) override;
 
     Status close(RuntimeState* state, Status close_status) override;
+
     Status send(RuntimeState* state, vectorized::Block* block, bool eos = 
false) override;
 
+    Status on_partitions_created(TCreatePartitionResult* result);
+
 private:
+    void _init_row_distribution();
+
     Status _open_streams(int64_t src_id);
 
     void _build_tablet_node_mapping();
 
-    void _generate_rows_for_tablet(RowsForTablet& rows_for_tablet,
-                                   const std::vector<VOlapTablePartition*>& 
partitions,
-                                   const std::vector<uint32_t>& tablet_indexes,
-                                   const std::vector<bool>& skip, size_t 
row_cnt);
+    void _generate_rows_for_tablet(std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
+                                   RowsForTablet& rows_for_tablet);
 
     Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t 
tablet_id,
                            const Rows& rows, const Streams& streams);
@@ -168,11 +172,6 @@ private:
     int64_t _number_input_rows = 0;
     int64_t _number_output_rows = 0;
 
-    // reuse for find_tablet
-    std::vector<VOlapTablePartition*> _partitions;
-    std::vector<bool> _skip;
-    std::vector<uint32_t> _tablet_indexes;
-
     MonotonicStopWatch _row_distribution_watch;
 
     RuntimeProfile::Counter* _input_rows_counter = nullptr;
@@ -187,6 +186,7 @@ private:
     RuntimeProfile::Counter* _close_timer = nullptr;
     RuntimeProfile::Counter* _close_writer_timer = nullptr;
     RuntimeProfile::Counter* _close_load_timer = nullptr;
+    RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
 
     // Save the status of close() method
     Status _close_status;
@@ -204,6 +204,10 @@ private:
     std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
     size_t _stream_index = 0;
     std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
+
+    VRowDistribution _row_distribution;
+    // reuse to avoid frequent memory allocation and release.
+    std::vector<RowPartTabletIds> _row_part_tablet_ids;
 };
 
 } // namespace vectorized
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index d0720255695..786c0e21105 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -152,6 +152,7 @@ Status IndexChannel::init(RuntimeState* state, const 
std::vector<TTabletWithPart
         RETURN_IF_ERROR(_where_clause->prepare(state, 
*_parent->_output_row_desc));
         RETURN_IF_ERROR(_where_clause->open(state));
     }
+
     return Status::OK();
 }
 
@@ -488,52 +489,6 @@ Status VNodeChannel::add_block(vectorized::Block* block, 
const Payload* payload,
         _cur_mutable_block = 
vectorized::MutableBlock::create_unique(block->clone_empty());
     }
 
-    std::unique_ptr<Payload> temp_payload = nullptr;
-    if (_index_channel != nullptr && _index_channel->get_where_clause() != 
nullptr) {
-        SCOPED_RAW_TIMER(&_stat.where_clause_ns);
-        temp_payload.reset(new Payload(
-                std::unique_ptr<vectorized::IColumn::Selector>(new 
vectorized::IColumn::Selector()),
-                std::vector<int64_t>()));
-        int result_index = -1;
-        size_t column_number = block->columns();
-        RETURN_IF_ERROR(_index_channel->get_where_clause()->execute(block, 
&result_index));
-
-        auto& row_ids = *payload->first;
-        auto& tablets_ids = payload->second;
-
-        auto filter_column = block->get_by_position(result_index).column;
-
-        if (auto* nullable_column =
-                    
vectorized::check_and_get_column<vectorized::ColumnNullable>(*filter_column)) {
-            for (size_t i = 0; i < payload->second.size(); i++) {
-                if (nullable_column->get_bool_inline(row_ids[i])) {
-                    temp_payload->first->emplace_back(row_ids[i]);
-                    temp_payload->second.emplace_back(tablets_ids[i]);
-                }
-            }
-            payload = temp_payload.get();
-        } else if (auto* const_column = 
vectorized::check_and_get_column<vectorized::ColumnConst>(
-                           *filter_column)) {
-            bool ret = const_column->get_bool(0);
-            if (!ret) {
-                return Status::OK();
-            }
-        } else {
-            auto& filter = assert_cast<const 
vectorized::ColumnUInt8&>(*filter_column).get_data();
-            for (size_t i = 0; i < payload->second.size(); i++) {
-                if (filter[row_ids[i]] != 0) {
-                    temp_payload->first->emplace_back(row_ids[i]);
-                    temp_payload->second.emplace_back(tablets_ids[i]);
-                }
-            }
-            payload = temp_payload.get();
-        }
-
-        for (size_t i = block->columns() - 1; i >= column_number; i--) {
-            block->erase(i);
-        }
-    }
-
     SCOPED_RAW_TIMER(&_stat.append_node_channel_ns);
     if (is_append) {
         // Do not split the data of the block by tablets but append it to a 
single delta writer.
@@ -1095,6 +1050,43 @@ Status VTabletWriter::open(doris::RuntimeState* state, 
doris::RuntimeProfile* pr
     return Status::OK();
 }
 
+Status VTabletWriter::on_partitions_created(TCreatePartitionResult* result) {
+    // add new tablet locations. it will use by address. so add to pool
+    auto* new_locations = _pool->add(new 
std::vector<TTabletLocation>(result->tablets));
+    _location->add_locations(*new_locations);
+
+    // update new node info
+    _nodes_info->add_nodes(result->nodes);
+
+    // incremental open node channel
+    RETURN_IF_ERROR(_incremental_open_node_channel(result->partitions));
+
+    return Status::OK();
+}
+
+static Status on_partitions_created(void* writer, TCreatePartitionResult* 
result) {
+    return static_cast<VTabletWriter*>(writer)->on_partitions_created(result);
+}
+
+void VTabletWriter::_init_row_distribution() {
+    VRowDistributionContext ctx;
+
+    ctx.state = _state;
+    ctx.block_convertor = _block_convertor.get();
+    ctx.tablet_finder = _tablet_finder.get();
+    ctx.vpartition = _vpartition;
+    ctx.add_partition_request_timer = _add_partition_request_timer;
+    ctx.txn_id = _txn_id;
+    ctx.pool = _pool;
+    ctx.location = _location;
+    ctx.vec_output_expr_ctxs = &_vec_output_expr_ctxs;
+    ctx.on_partitions_created = &vectorized::on_partitions_created;
+    ctx.caller = (void*)this;
+    ctx.schema = _schema;
+
+    _row_distribution.init(&ctx);
+}
+
 Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
     DCHECK(_t_sink.__isset.olap_table_sink);
     auto& table_sink = _t_sink.olap_table_sink;
@@ -1243,49 +1235,12 @@ Status VTabletWriter::_init(RuntimeState* state, 
RuntimeProfile* profile) {
         
RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id, 
_wal_writer));
     }
 
+    _init_row_distribution();
+
     _inited = true;
     return Status::OK();
 }
 
-Status VTabletWriter::_automatic_create_partition() {
-    SCOPED_TIMER(_add_partition_request_timer);
-    TCreatePartitionRequest request;
-    TCreatePartitionResult result;
-    request.__set_txn_id(_txn_id);
-    request.__set_db_id(_vpartition->db_id());
-    request.__set_table_id(_vpartition->table_id());
-    request.__set_partitionValues(_partitions_need_create);
-
-    VLOG(1) << "automatic partition rpc begin request " << request;
-    TNetworkAddress master_addr = 
ExecEnv::GetInstance()->master_info()->network_address;
-    int time_out = _state->execution_timeout() * 1000;
-    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
-            master_addr.hostname, master_addr.port,
-            [&request, &result](FrontendServiceConnection& client) {
-                client->createPartition(result, request);
-            },
-            time_out));
-
-    Status status(Status::create(result.status));
-    VLOG(1) << "automatic partition rpc end response " << result;
-    if (result.status.status_code == TStatusCode::OK) {
-        // add new created partitions
-        RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions));
-
-        // add new tablet locations. it will use by address. so add to pool
-        auto* new_locations = _pool->add(new 
std::vector<TTabletLocation>(result.tablets));
-        _location->add_locations(*new_locations);
-
-        // update new node info
-        _nodes_info->add_nodes(result.nodes);
-
-        // incremental open node channel
-        RETURN_IF_ERROR(_incremental_open_node_channel(result.partitions));
-    }
-
-    return status;
-}
-
 Status VTabletWriter::_incremental_open_node_channel(
         const std::vector<TOlapTablePartition>& partitions) {
     // do what we did in prepare() for partitions. indexes which don't change 
when we create new partition is orthogonal to partitions.
@@ -1337,129 +1292,11 @@ Status VTabletWriter::_incremental_open_node_channel(
     return Status::OK();
 }
 
-// Generate channel payload for sinking data to differenct node channel
-// Payload = std::pair<std::unique_ptr<vectorized::IColumn::Selector>, 
std::vector<int64_t>>;
-//   first = row_id, second = vector<tablet_id>
-void VTabletWriter::_generate_row_distribution_payload(
-        ChannelDistributionPayload& channel_to_payload,
-        const std::vector<VOlapTablePartition*>& partitions,
-        const std::vector<uint32_t>& tablet_indexes, const std::vector<bool>& 
skip,
-        size_t row_cnt) {
-    for (int row_idx = 0; row_idx < row_cnt; row_idx++) {
-        if (skip[row_idx]) {
-            continue;
-        }
-        const auto& partition = partitions[row_idx];
-        const auto& tablet_index = tablet_indexes[row_idx];
-
-        for (int index_num = 0; index_num < partition->indexes.size();
-             ++index_num) { // partition->indexes = [index, tablets...]
-
-            auto tablet_id = 
partition->indexes[index_num].tablets[tablet_index];
-            auto it = _channels[index_num]->_channels_by_tablet.find(
-                    tablet_id); // (tablet_id, VNodeChannel) where this tablet 
locate
-
-            DCHECK(it != _channels[index_num]->_channels_by_tablet.end())
-                    << "unknown tablet, tablet_id=" << tablet_index;
-
-            std::vector<std::shared_ptr<VNodeChannel>>& tablet_locations = 
it->second;
-            std::unordered_map<VNodeChannel*, Payload>& payloads_this_index =
-                    channel_to_payload[index_num]; // payloads of this index 
in every node
-
-            for (const auto& locate_node : tablet_locations) {
-                auto payload_it =
-                        payloads_this_index.find(locate_node.get()); // 
<VNodeChannel*, Payload>
-                if (payload_it == payloads_this_index.end()) {
-                    auto [tmp_it, _] = payloads_this_index.emplace(
-                            locate_node.get(),
-                            Payload 
{std::make_unique<vectorized::IColumn::Selector>(),
-                                     std::vector<int64_t>()});
-                    payload_it = tmp_it;
-                    payload_it->second.first->reserve(row_cnt);
-                    payload_it->second.second.reserve(row_cnt);
-                }
-                payload_it->second.first->push_back(row_idx);
-                payload_it->second.second.push_back(tablet_id);
-            }
-            _number_output_rows++;
-        }
-    }
-}
-
-Status VTabletWriter::_single_partition_generate(RuntimeState* state, 
vectorized::Block* block,
-                                                 ChannelDistributionPayload& 
channel_to_payload,
-                                                 size_t num_rows, bool 
has_filtered_rows) {
-    // only need to calculate one value for single partition.
-    std::vector<VOlapTablePartition*> partitions(1, nullptr);
-    std::vector<bool> skip(1, false);
-    std::vector<uint32_t> tablet_indexes(1, 0);
-    bool stop_processing = false;
-
-    RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, 1, partitions, 
tablet_indexes,
-                                                 stop_processing, skip));
-
-    const VOlapTablePartition* partition = nullptr;
-    uint32_t tablet_index = 0;
-    for (size_t i = 0; i < num_rows; i++) {
-        if (!skip[i]) {
-            partition = partitions[i];
-            tablet_index = tablet_indexes[i];
-            break;
-        }
-    }
-    if (partition == nullptr) {
-        return Status::OK();
-    }
-
-    for (int j = 0; j < partition->indexes.size(); ++j) {
-        auto tid = partition->indexes[j].tablets[tablet_index];
-        auto it = _channels[j]->_channels_by_tablet.find(tid);
-        DCHECK(it != _channels[j]->_channels_by_tablet.end())
-                << "unknown tablet, tablet_id=" << tablet_index;
-        int64_t row_cnt = 0;
-        for (const auto& channel : it->second) {
-            if (!channel_to_payload[j].contains(channel.get())) {
-                channel_to_payload[j].insert(
-                        {channel.get(), Payload 
{std::make_unique<vectorized::IColumn::Selector>(),
-                                                 std::vector<int64_t>()}});
-            }
-            auto& selector = channel_to_payload[j][channel.get()].first;
-            auto& tablet_ids = channel_to_payload[j][channel.get()].second;
-            for (int32_t i = 0; i < num_rows; ++i) {
-                if (UNLIKELY(has_filtered_rows) && 
_block_convertor->filter_map()[i]) {
-                    continue;
-                }
-                selector->push_back(i);
-            }
-            tablet_ids.resize(selector->size(), tid);
-            row_cnt = selector->size();
-        }
-        _number_output_rows += row_cnt;
-    }
-    return Status::OK();
-}
-
 std::pair<vectorized::VExprContextSPtr, vectorized::VExprSPtr>
 VTabletWriter::_get_partition_function() {
     return {_vpartition->get_part_func_ctx(), 
_vpartition->get_partition_function()};
 }
 
-void VTabletWriter::_save_missing_values(vectorized::ColumnPtr col,
-                                         vectorized::DataTypePtr value_type,
-                                         std::vector<int64_t> filter) {
-    _partitions_need_create.clear();
-    std::set<std::string> deduper;
-    // de-duplication
-    for (auto row : filter) {
-        deduper.emplace(value_type->to_string(*col, row));
-    }
-    for (auto& value : deduper) {
-        TStringLiteral node;
-        node.value = value;
-        _partitions_need_create.emplace_back(std::vector {node}); // only 1 
partition column now
-    }
-}
-
 Status VTabletWriter::_cancel_channel_and_check_intolerable_failure(
         Status status, const std::string& err_msg, const 
std::shared_ptr<IndexChannel> ich,
         const std::shared_ptr<VNodeChannel> nch) {
@@ -1705,6 +1542,46 @@ Status VTabletWriter::close(Status exec_status) {
     return _close_status;
 }
 
+void VTabletWriter::_generate_one_index_channel_payload(
+        RowPartTabletIds& row_part_tablet_id, int32_t index_idx,
+        ChannelDistributionPayload& channel_payload) {
+    auto& row_ids = row_part_tablet_id.row_ids;
+    auto& tablet_ids = row_part_tablet_id.tablet_ids;
+
+    size_t row_cnt = row_ids.size();
+
+    for (int i = 0; i < row_ids.size(); i++) {
+        // (tablet_id, VNodeChannel) where this tablet locate
+        auto it = 
_channels[index_idx]->_channels_by_tablet.find(tablet_ids[i]);
+        DCHECK(it != _channels[index_idx]->_channels_by_tablet.end())
+                << "unknown tablet, tablet_id=" << tablet_ids[i];
+
+        std::vector<std::shared_ptr<VNodeChannel>>& tablet_locations = 
it->second;
+        for (const auto& locate_node : tablet_locations) {
+            auto payload_it = channel_payload.find(locate_node.get()); // 
<VNodeChannel*, Payload>
+            if (payload_it == channel_payload.end()) {
+                auto [tmp_it, _] = channel_payload.emplace(
+                        locate_node.get(),
+                        Payload 
{std::make_unique<vectorized::IColumn::Selector>(),
+                                 std::vector<int64_t>()});
+                payload_it = tmp_it;
+                payload_it->second.first->reserve(row_cnt);
+                payload_it->second.second.reserve(row_cnt);
+            }
+            payload_it->second.first->push_back(row_ids[i]);
+            payload_it->second.second.push_back(tablet_ids[i]);
+        }
+    }
+}
+
+void VTabletWriter::_generate_index_channels_payloads(
+        std::vector<RowPartTabletIds>& row_part_tablet_ids,
+        ChannelDistributionPayloadVec& payload) {
+    for (int i = 0; i < _schema->indexes().size(); i++) {
+        _generate_one_index_channel_payload(row_part_tablet_ids[i], i, 
payload[i]);
+    }
+}
+
 Status VTabletWriter::append_block(doris::vectorized::Block& input_block) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     Status status = Status::OK();
@@ -1719,6 +1596,19 @@ Status 
VTabletWriter::append_block(doris::vectorized::Block& input_block) {
         return status;
     }
     SCOPED_TIMER(_profile->total_time_counter());
+
+    std::shared_ptr<vectorized::Block> block;
+    bool has_filtered_rows = false;
+    int64_t filtered_rows = 0;
+
+    RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
+            input_block, block, filtered_rows, has_filtered_rows, 
_row_part_tablet_ids));
+
+    ChannelDistributionPayloadVec channel_to_payload;
+
+    channel_to_payload.resize(_channels.size());
+    _generate_index_channels_payloads(_row_part_tablet_ids, 
channel_to_payload);
+
     _number_input_rows += rows;
     // update incrementally so that FE can get the progress.
     // the real 'num_rows_load_total' will be set when sink being closed.
@@ -1727,112 +1617,6 @@ Status 
VTabletWriter::append_block(doris::vectorized::Block& input_block) {
     DorisMetrics::instance()->load_rows->increment(rows);
     DorisMetrics::instance()->load_bytes->increment(bytes);
 
-    std::shared_ptr<vectorized::Block> block;
-    bool has_filtered_rows = false;
-    int64_t filtered_rows =
-            _block_convertor->num_filtered_rows() + 
_tablet_finder->num_filtered_rows();
-    RETURN_IF_ERROR(_block_convertor->validate_and_convert_block(
-            _state, &input_block, block, _vec_output_expr_ctxs, rows, 
has_filtered_rows));
-
-    SCOPED_RAW_TIMER(&_send_data_ns);
-    // This is just for passing compilation.
-    bool stop_processing = false;
-    ChannelDistributionPayload channel_to_payload;
-    channel_to_payload.resize(_channels.size());
-    _tablet_finder->clear_for_new_batch();
-    _row_distribution_watch.start();
-    auto num_rows = block->rows();
-    _tablet_finder->filter_bitmap().Reset(num_rows);
-    size_t partition_num = _vpartition->get_partitions().size();
-    if (!_vpartition->is_auto_partition() && partition_num == 1 &&
-        _tablet_finder->is_find_tablet_every_sink()) {
-        RETURN_IF_ERROR(_single_partition_generate(_state, block.get(), 
channel_to_payload,
-                                                   num_rows, 
has_filtered_rows));
-    } else {
-        // if there's projection of partition calc, we need to calc it first.
-        auto [part_ctx, part_func] = _get_partition_function();
-        int result_idx = -1;
-        if (_vpartition->is_projection_partition()) {
-            // calc the start value of missing partition ranges.
-            RETURN_IF_ERROR(part_func->execute(part_ctx.get(), block.get(), 
&result_idx));
-            VLOG_DEBUG << "Partition-calculated block:" << block->dump_data();
-            // change the column to compare to transformed.
-            _vpartition->set_transformed_slots({(uint16_t)result_idx});
-        }
-
-        if (_vpartition->is_auto_partition()) {
-            std::vector<uint16_t> partition_keys = 
_vpartition->get_partition_keys();
-            //TODO: use loop to create missing_vals for multi column.
-            CHECK(partition_keys.size() == 1)
-                    << "now support only 1 partition column for auto 
partitions.";
-            auto partition_col = block->get_by_position(partition_keys[0]);
-
-            std::vector<int64_t> missing_map; // indice of missing values in 
partition_col
-            missing_map.reserve(partition_col.column->size());
-
-            // try to find tablet and save missing value
-            std::vector<VOlapTablePartition*> partitions(num_rows, nullptr);
-            std::vector<bool> skip(num_rows, false);
-            std::vector<uint32_t> tablet_indexes(num_rows, 0);
-
-            //TODO: we could use the buffer to save tablets we found so that 
no need to find them again when we created partitions and try to append block 
next time.
-            RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), 
num_rows, partitions,
-                                                         tablet_indexes, 
stop_processing, skip,
-                                                         &missing_map));
-
-            if (missing_map.empty()) {
-                // we don't calculate it distribution when have missing values
-                if (has_filtered_rows) {
-                    for (int i = 0; i < num_rows; i++) {
-                        skip[i] = skip[i] || _block_convertor->filter_map()[i];
-                    }
-                }
-                _generate_row_distribution_payload(channel_to_payload, 
partitions, tablet_indexes,
-                                                   skip, num_rows);
-            } else { // for missing partition keys, calc the missing partition 
and save in _partitions_need_create
-                auto return_type = part_func->data_type();
-
-                // expose the data column
-                vectorized::ColumnPtr range_left_col = 
block->get_by_position(result_idx).column;
-                if (const auto* nullable =
-                            
check_and_get_column<vectorized::ColumnNullable>(*range_left_col)) {
-                    range_left_col = nullable->get_nested_column_ptr();
-                    return_type =
-                            assert_cast<const 
vectorized::DataTypeNullable*>(return_type.get())
-                                    ->get_nested_type();
-                }
-                // calc the end value and save them.
-                _save_missing_values(range_left_col, return_type, missing_map);
-                // then call FE to create it. then FragmentExecutor will redo 
the load.
-                RETURN_IF_ERROR(_automatic_create_partition());
-                // now we need to rollback the metrics
-                _number_input_rows -= rows;
-                _state->update_num_rows_load_total(-rows);
-                _state->update_num_bytes_load_total(-bytes);
-                DorisMetrics::instance()->load_rows->increment(-rows);
-                DorisMetrics::instance()->load_bytes->increment(-bytes);
-                // In the next round, we will 
_generate_row_distribution_payload again to get right payload of new tablet
-                LOG(INFO) << "Auto created partition. Send block again.";
-                return Status::NeedSendAgain("");
-            }    // creating done
-        } else { // not auto partition
-            std::vector<VOlapTablePartition*> partitions(num_rows, nullptr);
-            std::vector<bool> skip(num_rows, false);
-            std::vector<uint32_t> tablet_indexes(num_rows, 0);
-
-            RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block.get(), 
num_rows, partitions,
-                                                         tablet_indexes, 
stop_processing, skip));
-
-            if (has_filtered_rows) {
-                for (int i = 0; i < num_rows; i++) {
-                    skip[i] = skip[i] || _block_convertor->filter_map()[i];
-                }
-            }
-            _generate_row_distribution_payload(channel_to_payload, partitions, 
tablet_indexes, skip,
-                                               num_rows);
-        }
-    }
-    _row_distribution_watch.stop();
     // Random distribution and the block belongs to a single tablet, we could 
optimize to append the whole
     // block into node channel.
     bool load_block_to_single_tablet =
@@ -1855,10 +1639,8 @@ Status 
VTabletWriter::append_block(doris::vectorized::Block& input_block) {
     }
 
     if (_group_commit) {
-        _group_commit_block(&input_block, num_rows,
-                            _block_convertor->num_filtered_rows() +
-                                    _tablet_finder->num_filtered_rows() - 
filtered_rows,
-                            _state, block.get(), _block_convertor.get(), 
_tablet_finder.get());
+        _group_commit_block(&input_block, block->rows(), filtered_rows, 
_state, block.get(),
+                            _block_convertor.get(), _tablet_finder.get());
     }
     // TODO: Before load, we need to projection unuseful column
     // auto slots = _schema->tuple_desc()->slots();
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index c8d5d1c2ce9..7fee45be371 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -74,6 +74,7 @@
 #include "vec/data_types/data_type.h"
 #include "vec/exprs/vexpr_fwd.h"
 #include "vec/runtime/vfile_format_transformer.h"
+#include "vec/sink/vrow_distribution.h"
 #include "vec/sink/vtablet_block_convertor.h"
 #include "vec/sink/vtablet_finder.h"
 #include "vec/sink/writer/async_result_writer.h"
@@ -204,9 +205,6 @@ private:
 class IndexChannel;
 class VTabletWriter;
 
-// pair<row_id,tablet_id>
-using Payload = std::pair<std::unique_ptr<vectorized::IColumn::Selector>, 
std::vector<int64_t>>;
-
 class VNodeChannelStat {
 public:
     VNodeChannelStat& operator+=(const VNodeChannelStat& stat) {
@@ -221,6 +219,9 @@ public:
     int64_t append_node_channel_ns = 0;
 };
 
+// pair<row_id,tablet_id>
+using Payload = std::pair<std::unique_ptr<vectorized::IColumn::Selector>, 
std::vector<int64_t>>;
+
 // every NodeChannel keeps a data transmission channel with one BE. for 
multiple times open, it has a dozen of requests and corresponding closures.
 class VNodeChannel {
 public:
@@ -485,6 +486,7 @@ public:
 private:
     friend class VNodeChannel;
     friend class VTabletWriter;
+    friend class VRowDistribution;
 
     VTabletWriter* _parent;
     int64_t _index_id;
@@ -546,32 +548,34 @@ public:
 
     bool is_close_done();
 
+    Status on_partitions_created(TCreatePartitionResult* result);
+
 private:
     friend class VNodeChannel;
     friend class IndexChannel;
 
-    using ChannelDistributionPayload = 
std::vector<std::unordered_map<VNodeChannel*, Payload>>;
+    using ChannelDistributionPayload = std::unordered_map<VNodeChannel*, 
Payload>;
+    using ChannelDistributionPayloadVec = 
std::vector<std::unordered_map<VNodeChannel*, Payload>>;
+
+    void _init_row_distribution();
 
     Status _init(RuntimeState* state, RuntimeProfile* profile);
 
-    // payload for every row
-    void _generate_row_distribution_payload(ChannelDistributionPayload& 
channel_to_payload,
-                                            const 
std::vector<VOlapTablePartition*>& partitions,
-                                            const std::vector<uint32_t>& 
tablet_indexes,
-                                            const std::vector<bool>& skip, 
size_t row_cnt);
+    void _generate_one_index_channel_payload(RowPartTabletIds& 
row_part_tablet_tuple,
+                                             int32_t index_idx,
+                                             ChannelDistributionPayload& 
channel_payload);
 
-    Status _single_partition_generate(RuntimeState* state, vectorized::Block* 
block,
-                                      ChannelDistributionPayload& 
channel_to_payload,
-                                      size_t num_rows, bool has_filtered_rows);
+    void _generate_index_channels_payloads(std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
+                                           ChannelDistributionPayloadVec& 
payload);
 
     Status _cancel_channel_and_check_intolerable_failure(Status status, const 
std::string& err_msg,
                                                          const 
std::shared_ptr<IndexChannel> ich,
                                                          const 
std::shared_ptr<VNodeChannel> nch);
 
-    void _cancel_all_channel(Status status);
-
     std::pair<vectorized::VExprContextSPtr, vectorized::VExprSPtr> 
_get_partition_function();
 
+    void _cancel_all_channel(Status status);
+
     void _save_missing_values(vectorized::ColumnPtr col, 
vectorized::DataTypePtr value_type,
                               std::vector<int64_t> filter);
 
@@ -689,6 +693,11 @@ private:
     RuntimeProfile* _profile = nullptr; // not owned, set when open
     bool _group_commit = false;
     std::shared_ptr<WalWriter> _wal_writer = nullptr;
+
+    VRowDistribution _row_distribution;
+    // reuse to avoid frequent memory allocation and release.
+    std::vector<RowPartTabletIds> _row_part_tablet_ids;
+
     int64_t _tb_id;
     int64_t _db_id;
     int64_t _wal_id;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to