This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 935953d92bf branch-3.1: [fix](sink) fix sink operator tolerate failed 
replica number incorrect #52560 (#53099)
935953d92bf is described below

commit 935953d92bf69229c8b100218f3d6bc3ef3cd5df
Author: hui lai <[email protected]>
AuthorDate: Fri Jul 11 19:16:46 2025 +0800

    branch-3.1: [fix](sink) fix sink operator tolerate failed replica number 
incorrect #52560 (#53099)
    
    pick #52560
---
 be/src/exec/tablet_info.cpp                        |   6 +
 be/src/exec/tablet_info.h                          |   2 +
 be/src/runtime/load_channel.cpp                    |   3 +
 be/src/vec/sink/writer/vtablet_writer.cpp          |  31 +++++-
 be/src/vec/sink/writer/vtablet_writer.h            |   7 ++
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |  29 ++++-
 be/src/vec/sink/writer/vtablet_writer_v2.h         |  10 +-
 be/test/vec/sink/vtablet_writer_v2_test.cpp        |  96 +++++++++-------
 .../java/org/apache/doris/catalog/OlapTable.java   |   6 +-
 .../org/apache/doris/planner/OlapTableSink.java    |   4 +
 gensrc/thrift/Descriptors.thrift                   |   2 +
 .../apache/doris/regression/util/DebugPoint.groovy |  18 +++
 .../load_p0/stream_load/test_sink_tolerate.groovy  | 121 +++++++++++++++++++++
 13 files changed, 284 insertions(+), 51 deletions(-)

diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index 5e75f8bac71..205b7e440d3 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -713,6 +713,12 @@ Status 
VOlapTablePartitionParam::generate_partition_from(const TOlapTablePartiti
                     part_result->indexes[j].index_id, 
_schema->indexes()[j]->index_id);
         }
     }
+    if (t_part.__isset.total_replica_num) {
+        part_result->total_replica_num = t_part.total_replica_num;
+    }
+    if (t_part.__isset.load_required_replica_num) {
+        part_result->load_required_replica_num = 
t_part.load_required_replica_num;
+    }
     return Status::OK();
 }
 
diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h
index b5a139c036d..0b75e8706b3 100644
--- a/be/src/exec/tablet_info.h
+++ b/be/src/exec/tablet_info.h
@@ -163,6 +163,8 @@ struct VOlapTablePartition {
     bool is_mutable;
     // -1 indicates partition with hash distribution
     int64_t load_tablet_idx = -1;
+    int total_replica_num = 0;
+    int load_required_replica_num = 0;
 
     VOlapTablePartition(vectorized::Block* partition_block)
             // the default value of partition bound is -1.
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 54665d8184f..cf10d3d66e8 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -29,6 +29,7 @@
 #include "runtime/tablets_channel.h"
 #include "runtime/thread_context.h"
 #include "runtime/workload_group/workload_group_manager.h"
+#include "util/debug_points.h"
 
 namespace doris {
 
@@ -172,6 +173,8 @@ Status 
LoadChannel::_get_tablets_channel(std::shared_ptr<BaseTabletsChannel>& ch
 
 Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request,
                               PTabletWriterAddBlockResult* response) {
+    DBUG_EXECUTE_IF("LoadChannel.add_batch.failed",
+                    { return Status::InternalError("fault injection"); });
     SCOPED_TIMER(_add_batch_timer);
     COUNTER_UPDATE(_add_batch_times, 1);
     SCOPED_ATTACH_TASK(_query_thread_context);
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 581fc1704e9..64903c5b212 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -172,7 +172,7 @@ void IndexChannel::mark_as_failed(const VNodeChannel* 
node_channel, const std::s
                 _failed_channels[the_tablet_id].insert(node_id);
                 _failed_channels_msgs.emplace(the_tablet_id,
                                               err + ", host: " + 
node_channel->host());
-                if (_failed_channels[the_tablet_id].size() >= 
((_parent->_num_replicas + 1) / 2)) {
+                if (_failed_channels[the_tablet_id].size() > 
_max_failed_replicas(the_tablet_id)) {
                     _intolerable_failure_status = 
Status::Error<ErrorCode::INTERNAL_ERROR, false>(
                             _failed_channels_msgs[the_tablet_id]);
                 }
@@ -180,7 +180,7 @@ void IndexChannel::mark_as_failed(const VNodeChannel* 
node_channel, const std::s
         } else {
             _failed_channels[tablet_id].insert(node_id);
             _failed_channels_msgs.emplace(tablet_id, err + ", host: " + 
node_channel->host());
-            if (_failed_channels[tablet_id].size() >= ((_parent->_num_replicas 
+ 1) / 2)) {
+            if (_failed_channels[tablet_id].size() > 
_max_failed_replicas(tablet_id)) {
                 _intolerable_failure_status = 
Status::Error<ErrorCode::INTERNAL_ERROR, false>(
                         _failed_channels_msgs[tablet_id]);
             }
@@ -188,6 +188,15 @@ void IndexChannel::mark_as_failed(const VNodeChannel* 
node_channel, const std::s
     }
 }
 
+int IndexChannel::_max_failed_replicas(int64_t tablet_id) {
+    auto [total_replicas_num, load_required_replicas_num] =
+            _parent->_tablet_replica_info[tablet_id];
+    int max_failed_replicas = total_replicas_num == 0
+                                      ? (_parent->_num_replicas - 1) / 2
+                                      : total_replicas_num - 
load_required_replicas_num;
+    return max_failed_replicas;
+}
+
 Status IndexChannel::check_intolerable_failure() {
     std::lock_guard<std::mutex> l(_fail_lock);
     return _intolerable_failure_status;
@@ -1404,6 +1413,7 @@ Status VTabletWriter::_init(RuntimeState* state, 
RuntimeProfile* profile) {
                 tablet_with_partition.partition_id = part->id;
                 tablet_with_partition.tablet_id = tablet;
                 tablets.emplace_back(std::move(tablet_with_partition));
+                _build_tablet_replica_info(tablet, part);
             }
         }
         if (tablets.empty() && !_vpartition->is_auto_partition()) {
@@ -1436,6 +1446,7 @@ Status VTabletWriter::_incremental_open_node_channel(
                 tablet_with_partition.partition_id = part->id;
                 tablet_with_partition.tablet_id = tablet;
                 tablets.emplace_back(std::move(tablet_with_partition));
+                _build_tablet_replica_info(tablet, part);
             }
             DCHECK(!tablets.empty()) << "incremental open got nothing!";
         }
@@ -1472,6 +1483,22 @@ Status VTabletWriter::_incremental_open_node_channel(
     return Status::OK();
 }
 
+void VTabletWriter::_build_tablet_replica_info(const int64_t tablet_id,
+                                               VOlapTablePartition* partition) 
{
+    if (partition != nullptr) {
+        int total_replicas_num =
+                partition->total_replica_num == 0 ? _num_replicas : 
partition->total_replica_num;
+        int load_required_replicas_num = partition->load_required_replica_num 
== 0
+                                                 ? (_num_replicas + 1) / 2
+                                                 : 
partition->load_required_replica_num;
+        _tablet_replica_info.emplace(
+                tablet_id, std::make_pair(total_replicas_num, 
load_required_replicas_num));
+    } else {
+        _tablet_replica_info.emplace(tablet_id,
+                                     std::make_pair(_num_replicas, 
(_num_replicas + 1) / 2));
+    }
+}
+
 void VTabletWriter::_cancel_all_channel(Status status) {
     for (const auto& index_channel : _channels) {
         index_channel->for_each_node_channel([&status](const 
std::shared_ptr<VNodeChannel>& ch) {
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index 116aa20a98c..8a15df9a252 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -551,6 +551,8 @@ private:
     friend class VTabletWriter;
     friend class VRowDistribution;
 
+    int _max_failed_replicas(int64_t tablet_id);
+
     VTabletWriter* _parent = nullptr;
     int64_t _index_id;
     vectorized::VExprContextSPtr _where_clause;
@@ -636,6 +638,8 @@ private:
 
     void _do_try_close(RuntimeState* state, const Status& exec_status);
 
+    void _build_tablet_replica_info(const int64_t tablet_id, 
VOlapTablePartition* partition);
+
     TDataSink _t_sink;
 
     std::shared_ptr<MemTracker> _mem_tracker;
@@ -738,5 +742,8 @@ private:
     VRowDistribution _row_distribution;
     // reuse to avoid frequent memory allocation and release.
     std::vector<RowPartTabletIds> _row_part_tablet_ids;
+
+    // tablet_id -> <total replicas num, load required replicas num>
+    std::unordered_map<int64_t, std::pair<int, int>> _tablet_replica_info;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index d44db473b28..fa63a352e88 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -111,6 +111,7 @@ Status VTabletWriterV2::_incremental_open_streams(
                     VLOG_DEBUG << "incremental open stream (" << partition->id 
<< ", " << tablet_id
                                << ")";
                 }
+                _build_tablet_replica_info(tablet_id, partition);
             }
         }
     }
@@ -346,12 +347,28 @@ Status VTabletWriterV2::_build_tablet_node_mapping() {
                     _indexes_from_node[node].emplace_back(tablet);
                     known_indexes.insert(index.index_id);
                 }
+                _build_tablet_replica_info(tablet_id, partition);
             }
         }
     }
     return Status::OK();
 }
 
+void VTabletWriterV2::_build_tablet_replica_info(const int64_t tablet_id,
+                                                 VOlapTablePartition* 
partition) {
+    if (partition != nullptr) {
+        int total_replicas_num =
+                partition->total_replica_num == 0 ? _num_replicas : 
partition->total_replica_num;
+        int load_required_replicas_num = partition->load_required_replica_num 
== 0
+                                                 ? (_num_replicas + 1) / 2
+                                                 : 
partition->load_required_replica_num;
+        _tablet_replica_info[tablet_id] =
+                std::make_pair(total_replicas_num, load_required_replicas_num);
+    } else {
+        _tablet_replica_info[tablet_id] = std::make_pair(_num_replicas, 
(_num_replicas + 1) / 2);
+    }
+}
+
 void VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
                                                 RowsForTablet& 
rows_for_tablet) {
     for (int index_idx = 0; index_idx < row_part_tablet_ids.size(); 
index_idx++) {
@@ -670,8 +687,7 @@ Status VTabletWriterV2::close(Status exec_status) {
             });
 
             std::vector<TTabletCommitInfo> tablet_commit_infos;
-            RETURN_IF_ERROR(
-                    _create_commit_info(tablet_commit_infos, _load_stream_map, 
_num_replicas));
+            RETURN_IF_ERROR(_create_commit_info(tablet_commit_infos, 
_load_stream_map));
             _state->add_tablet_commit_infos(tablet_commit_infos);
         }
 
@@ -806,8 +822,7 @@ void VTabletWriterV2::_calc_tablets_to_commit() {
 }
 
 Status VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>& 
tablet_commit_infos,
-                                            std::shared_ptr<LoadStreamMap> 
load_stream_map,
-                                            int num_replicas) {
+                                            std::shared_ptr<LoadStreamMap> 
load_stream_map) {
     std::unordered_map<int64_t, int> failed_tablets;
     std::unordered_map<int64_t, Status> failed_reason;
     load_stream_map->for_each([&](int64_t dst_id, LoadStreamStubs& streams) {
@@ -830,7 +845,11 @@ Status 
VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>& tabl
     });
 
     for (auto [tablet_id, replicas] : failed_tablets) {
-        if (replicas > (num_replicas - 1) / 2) {
+        auto [total_replicas_num, load_required_replicas_num] = 
_tablet_replica_info[tablet_id];
+        int max_failed_replicas = total_replicas_num == 0
+                                          ? (_num_replicas - 1) / 2
+                                          : total_replicas_num - 
load_required_replicas_num;
+        if (replicas > max_failed_replicas) {
             LOG(INFO) << "tablet " << tablet_id
                       << " failed on majority backends: " << 
failed_reason[tablet_id];
             return Status::InternalError("tablet {} failed on majority 
backends: {}", tablet_id,
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index cc87002a097..ab83bd782c3 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -118,9 +118,8 @@ public:
 #ifndef BE_TEST
 private:
 #endif
-    static Status _create_commit_info(std::vector<TTabletCommitInfo>& 
tablet_commit_infos,
-                                      std::shared_ptr<LoadStreamMap> 
load_stream_map,
-                                      int num_replicas);
+    Status _create_commit_info(std::vector<TTabletCommitInfo>& 
tablet_commit_infos,
+                               std::shared_ptr<LoadStreamMap> load_stream_map);
 
 private:
     Status _init_row_distribution();
@@ -133,6 +132,8 @@ private:
 
     Status _incremental_open_streams(const std::vector<TOlapTablePartition>& 
partitions);
 
+    void _build_tablet_replica_info(const int64_t tablet_id, 
VOlapTablePartition* partition);
+
     Status _send_new_partition_batch();
 
     Status _build_tablet_node_mapping();
@@ -242,6 +243,9 @@ private:
     VRowDistribution _row_distribution;
     // reuse to avoid frequent memory allocation and release.
     std::vector<RowPartTabletIds> _row_part_tablet_ids;
+
+    // tablet_id -> <total replicas num, load required replicas num>
+    std::unordered_map<int64_t, std::pair<int, int>> _tablet_replica_info;
 };
 
 } // namespace vectorized
diff --git a/be/test/vec/sink/vtablet_writer_v2_test.cpp 
b/be/test/vec/sink/vtablet_writer_v2_test.cpp
index 67dc9d089ab..ce467fb1d45 100644
--- a/be/test/vec/sink/vtablet_writer_v2_test.cpp
+++ b/be/test/vec/sink/vtablet_writer_v2_test.cpp
@@ -27,7 +27,7 @@ namespace doris {
 class TestVTabletWriterV2 : public ::testing::Test {
 public:
     TestVTabletWriterV2() = default;
-    ~TestVTabletWriterV2() = default;
+    ~TestVTabletWriterV2() override = default;
     static void SetUpTestSuite() {}
     static void TearDownTestSuite() {}
 };
@@ -47,15 +47,31 @@ static void add_stream(std::shared_ptr<LoadStreamMap> 
load_stream_map, int64_t n
     }
 }
 
+static std::unique_ptr<vectorized::VTabletWriterV2> create_vtablet_writer(int 
num_replicas = 3) {
+    TDataSink t_sink;
+    t_sink.__isset.olap_table_sink = true;
+    t_sink.olap_table_sink.num_replicas = num_replicas;
+    vectorized::VExprContextSPtrs output_exprs;
+    std::shared_ptr<pipeline::Dependency> dep = nullptr;
+    std::shared_ptr<pipeline::Dependency> fin_dep = nullptr;
+    auto writer = std::make_unique<vectorized::VTabletWriterV2>(t_sink, 
output_exprs, dep, fin_dep);
+
+    int required_replicas = num_replicas / 2 + 1;
+    writer->_tablet_replica_info[1] = std::make_pair(num_replicas, 
required_replicas);
+    writer->_tablet_replica_info[2] = std::make_pair(num_replicas, 
required_replicas);
+
+    return writer;
+}
+
 TEST_F(TestVTabletWriterV2, one_replica) {
     UniqueId load_id;
     std::vector<TTabletCommitInfo> tablet_commit_infos;
     std::shared_ptr<LoadStreamMap> load_stream_map =
             std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
-    const int num_replicas = 1;
     add_stream(load_stream_map, 1001, {1, 2}, {});
-    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
-                                                               num_replicas);
+
+    auto writer = create_vtablet_writer(1);
+    auto st = writer->_create_commit_info(tablet_commit_infos, 
load_stream_map);
     ASSERT_TRUE(st.ok());
     ASSERT_EQ(tablet_commit_infos.size(), 2);
 }
@@ -65,10 +81,10 @@ TEST_F(TestVTabletWriterV2, one_replica_fail) {
     std::vector<TTabletCommitInfo> tablet_commit_infos;
     std::shared_ptr<LoadStreamMap> load_stream_map =
             std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
-    const int num_replicas = 1;
     add_stream(load_stream_map, 1001, {1}, {{2, 
Status::InternalError("test")}});
-    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
-                                                               num_replicas);
+
+    auto writer = create_vtablet_writer(1);
+    auto st = writer->_create_commit_info(tablet_commit_infos, 
load_stream_map);
     ASSERT_EQ(st, Status::InternalError("test"));
 }
 
@@ -77,11 +93,11 @@ TEST_F(TestVTabletWriterV2, two_replica) {
     std::vector<TTabletCommitInfo> tablet_commit_infos;
     std::shared_ptr<LoadStreamMap> load_stream_map =
             std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
-    const int num_replicas = 2;
     add_stream(load_stream_map, 1001, {1, 2}, {});
     add_stream(load_stream_map, 1002, {1, 2}, {});
-    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
-                                                               num_replicas);
+
+    auto writer = create_vtablet_writer(2);
+    auto st = writer->_create_commit_info(tablet_commit_infos, 
load_stream_map);
     ASSERT_TRUE(st.ok());
     ASSERT_EQ(tablet_commit_infos.size(), 4);
 }
@@ -91,11 +107,11 @@ TEST_F(TestVTabletWriterV2, two_replica_fail) {
     std::vector<TTabletCommitInfo> tablet_commit_infos;
     std::shared_ptr<LoadStreamMap> load_stream_map =
             std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
-    const int num_replicas = 2;
     add_stream(load_stream_map, 1001, {1}, {{2, 
Status::InternalError("test")}});
     add_stream(load_stream_map, 1002, {1, 2}, {});
-    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
-                                                               num_replicas);
+
+    auto writer = create_vtablet_writer(2);
+    auto st = writer->_create_commit_info(tablet_commit_infos, 
load_stream_map);
     ASSERT_EQ(st, Status::InternalError("test"));
 }
 
@@ -104,12 +120,12 @@ TEST_F(TestVTabletWriterV2, normal) {
     std::vector<TTabletCommitInfo> tablet_commit_infos;
     std::shared_ptr<LoadStreamMap> load_stream_map =
             std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
-    const int num_replicas = 3;
     add_stream(load_stream_map, 1001, {1, 2}, {});
     add_stream(load_stream_map, 1002, {1, 2}, {});
     add_stream(load_stream_map, 1003, {1, 2}, {});
-    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
-                                                               num_replicas);
+
+    auto writer = create_vtablet_writer();
+    auto st = writer->_create_commit_info(tablet_commit_infos, 
load_stream_map);
     ASSERT_TRUE(st.ok());
     ASSERT_EQ(tablet_commit_infos.size(), 6);
 }
@@ -119,12 +135,12 @@ TEST_F(TestVTabletWriterV2, miss_one) {
     std::vector<TTabletCommitInfo> tablet_commit_infos;
     std::shared_ptr<LoadStreamMap> load_stream_map =
             std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
-    const int num_replicas = 3;
     add_stream(load_stream_map, 1001, {1, 2}, {});
     add_stream(load_stream_map, 1002, {1}, {});
     add_stream(load_stream_map, 1003, {1, 2}, {});
-    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
-                                                               num_replicas);
+
+    auto writer = create_vtablet_writer();
+    auto st = writer->_create_commit_info(tablet_commit_infos, 
load_stream_map);
     ASSERT_TRUE(st.ok());
     ASSERT_EQ(tablet_commit_infos.size(), 5);
 }
@@ -134,12 +150,12 @@ TEST_F(TestVTabletWriterV2, miss_two) {
     std::vector<TTabletCommitInfo> tablet_commit_infos;
     std::shared_ptr<LoadStreamMap> load_stream_map =
             std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
-    const int num_replicas = 3;
     add_stream(load_stream_map, 1001, {1, 2}, {});
     add_stream(load_stream_map, 1002, {1}, {});
     add_stream(load_stream_map, 1003, {1}, {});
-    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
-                                                               num_replicas);
+
+    auto writer = create_vtablet_writer();
+    auto st = writer->_create_commit_info(tablet_commit_infos, 
load_stream_map);
     ASSERT_TRUE(st.ok());
     ASSERT_EQ(tablet_commit_infos.size(), 4);
 }
@@ -149,12 +165,12 @@ TEST_F(TestVTabletWriterV2, fail_one) {
     std::vector<TTabletCommitInfo> tablet_commit_infos;
     std::shared_ptr<LoadStreamMap> load_stream_map =
             std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
-    const int num_replicas = 3;
     add_stream(load_stream_map, 1001, {1, 2}, {});
     add_stream(load_stream_map, 1002, {1}, {{2, 
Status::InternalError("test")}});
     add_stream(load_stream_map, 1003, {1, 2}, {});
-    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
-                                                               num_replicas);
+
+    auto writer = create_vtablet_writer();
+    auto st = writer->_create_commit_info(tablet_commit_infos, 
load_stream_map);
     ASSERT_TRUE(st.ok());
     ASSERT_EQ(tablet_commit_infos.size(), 5);
 }
@@ -164,13 +180,13 @@ TEST_F(TestVTabletWriterV2, fail_one_duplicate) {
     std::vector<TTabletCommitInfo> tablet_commit_infos;
     std::shared_ptr<LoadStreamMap> load_stream_map =
             std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
-    const int num_replicas = 3;
     add_stream(load_stream_map, 1001, {1, 2}, {});
     add_stream(load_stream_map, 1002, {1}, {{2, 
Status::InternalError("test")}});
     add_stream(load_stream_map, 1002, {1}, {{2, 
Status::InternalError("test")}});
     add_stream(load_stream_map, 1003, {1, 2}, {});
-    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
-                                                               num_replicas);
+
+    auto writer = create_vtablet_writer();
+    auto st = writer->_create_commit_info(tablet_commit_infos, 
load_stream_map);
     // Duplicate tablets from same node should be ignored
     ASSERT_TRUE(st.ok());
     ASSERT_EQ(tablet_commit_infos.size(), 5);
@@ -181,13 +197,13 @@ TEST_F(TestVTabletWriterV2, 
fail_two_diff_tablet_same_node) {
     std::vector<TTabletCommitInfo> tablet_commit_infos;
     std::shared_ptr<LoadStreamMap> load_stream_map =
             std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
-    const int num_replicas = 3;
     add_stream(load_stream_map, 1001, {1, 2}, {});
     add_stream(load_stream_map, 1002, {},
                {{1, Status::InternalError("test")}, {2, 
Status::InternalError("test")}});
     add_stream(load_stream_map, 1003, {1, 2}, {});
-    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
-                                                               num_replicas);
+
+    auto writer = create_vtablet_writer();
+    auto st = writer->_create_commit_info(tablet_commit_infos, 
load_stream_map);
     ASSERT_TRUE(st.ok());
     ASSERT_EQ(tablet_commit_infos.size(), 4);
 }
@@ -197,12 +213,12 @@ TEST_F(TestVTabletWriterV2, 
fail_two_diff_tablet_diff_node) {
     std::vector<TTabletCommitInfo> tablet_commit_infos;
     std::shared_ptr<LoadStreamMap> load_stream_map =
             std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
-    const int num_replicas = 3;
     add_stream(load_stream_map, 1001, {1, 2}, {});
     add_stream(load_stream_map, 1002, {1}, {{2, 
Status::InternalError("test")}});
     add_stream(load_stream_map, 1003, {2}, {{1, 
Status::InternalError("test")}});
-    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
-                                                               num_replicas);
+
+    auto writer = create_vtablet_writer();
+    auto st = writer->_create_commit_info(tablet_commit_infos, 
load_stream_map);
     ASSERT_TRUE(st.ok());
     ASSERT_EQ(tablet_commit_infos.size(), 4);
 }
@@ -212,12 +228,12 @@ TEST_F(TestVTabletWriterV2, fail_two_same_tablet) {
     std::vector<TTabletCommitInfo> tablet_commit_infos;
     std::shared_ptr<LoadStreamMap> load_stream_map =
             std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
-    const int num_replicas = 3;
     add_stream(load_stream_map, 1001, {1, 2}, {});
     add_stream(load_stream_map, 1002, {1}, {{2, 
Status::InternalError("test")}});
     add_stream(load_stream_map, 1003, {1}, {{2, 
Status::InternalError("test")}});
-    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
-                                                               num_replicas);
+
+    auto writer = create_vtablet_writer();
+    auto st = writer->_create_commit_info(tablet_commit_infos, 
load_stream_map);
     // BE should detect and abort commit if majority of replicas failed
     ASSERT_EQ(st, Status::InternalError("test"));
 }
@@ -227,12 +243,12 @@ TEST_F(TestVTabletWriterV2, 
fail_two_miss_one_same_tablet) {
     std::vector<TTabletCommitInfo> tablet_commit_infos;
     std::shared_ptr<LoadStreamMap> load_stream_map =
             std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
-    const int num_replicas = 3;
     add_stream(load_stream_map, 1001, {1}, {});
     add_stream(load_stream_map, 1002, {1}, {{2, 
Status::InternalError("test")}});
     add_stream(load_stream_map, 1003, {1}, {{2, 
Status::InternalError("test")}});
-    auto st = 
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos, 
load_stream_map,
-                                                               num_replicas);
+
+    auto writer = create_vtablet_writer();
+    auto st = writer->_create_commit_info(tablet_commit_infos, 
load_stream_map);
     // BE should detect and abort commit if majority of replicas failed
     ASSERT_EQ(st, Status::InternalError("test"));
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index ded68fafc5a..b94ccbb42d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -2364,8 +2364,12 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
         tableProperty.buildMinLoadReplicaNum();
     }
 
+    public int getPartitionTotalReplicasNum(long partitionId) {
+        return 
partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum();
+    }
+
     public int getLoadRequiredReplicaNum(long partitionId) {
-        int totalReplicaNum = 
partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum();
+        int totalReplicaNum = getPartitionTotalReplicasNum(partitionId);
         int minLoadReplicaNum = getMinLoadReplicaNum();
         if (minLoadReplicaNum > 0) {
             return Math.min(minLoadReplicaNum, totalReplicaNum);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index d201c85eb35..329da4d5eb8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -256,6 +256,10 @@ public class OlapTableSink extends DataSink {
 
         tOlapTableSchemaParam = createSchema(tSink.getDbId(), dstTable, 
analyzer);
         tOlapTablePartitionParam = createPartition(tSink.getDbId(), dstTable, 
analyzer);
+        for (TOlapTablePartition partition : 
tOlapTablePartitionParam.getPartitions()) {
+            
partition.setTotalReplicaNum(dstTable.getPartitionTotalReplicasNum(partition.getId()));
+            
partition.setLoadRequiredReplicaNum(dstTable.getLoadRequiredReplicaNum(partition.getId()));
+        }
         tOlapTableLocationParams = createLocation(tSink.getDbId(), dstTable);
 
         tSink.setSchema(tOlapTableSchemaParam);
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 12701800139..b758acd28b7 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -210,6 +210,8 @@ struct TOlapTablePartition {
     10: optional bool is_default_partition;
     // only used in random distribution scenario to make data distributed even 
     11: optional i64 load_tablet_idx
+    12: optional i32 total_replica_num
+    13: optional i32 load_required_replica_num
 }
 
 struct TOlapTablePartitionParam {
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy
index 7386d896ac3..cbf4cda6f8d 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy
@@ -111,6 +111,24 @@ class DebugPoint {
         })
     }
 
+    /* Enable specific debug point for major BE node in cluster */
+    def enableDebugPointForMajorBEs(String name, Map<String, String> params = 
null) {
+        def skip_flag = false
+        operateDebugPointForAllBEs({ host, port ->
+            logger.info("enable debug point ${name} with params ${params} for 
BE $host:$port")
+            if (port == -1) {
+                logger.info("skip for BE $host:$port")
+                return
+            }
+            if (!skip_flag) {
+                skip_flag = true
+                logger.info("skip for BE $host:$port")
+                return
+            }
+            enableDebugPoint(host, port, NodeType.BE, name, params)
+        })
+    }
+
     /* Disable specific debug point for all BE node in cluster */
     def disableDebugPointForAllBEs(String name) {
         operateDebugPointForAllBEs { host, port ->
diff --git 
a/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy 
b/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
new file mode 100644
index 00000000000..c55d80a70cd
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_sink_tolerate", "docker") {
+    def options = new ClusterOptions()
+    options.beConfigs += [
+        'enable_debug_points=true'
+    ]
+
+    docker(options) {
+        def tableName = "test_sink_tolerate"
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `k1` bigint(20) NULL,
+                `k2` bigint(20) NULL,
+                `v1` tinyint(4) SUM NULL,
+                `v2` tinyint(4) REPLACE NULL,
+                `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL,
+                `v4` smallint(6) REPLACE_IF_NOT_NULL NULL,
+                `v5` int(11) REPLACE_IF_NOT_NULL NULL,
+                `v6` bigint(20) REPLACE_IF_NOT_NULL NULL,
+                `v7` largeint(40) REPLACE_IF_NOT_NULL NULL,
+                `v8` datetime REPLACE_IF_NOT_NULL NULL,
+                `v9` date REPLACE_IF_NOT_NULL NULL,
+                `v10` char(10) REPLACE_IF_NOT_NULL NULL,
+                `v11` varchar(6) REPLACE_IF_NOT_NULL NULL,
+                `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL
+            ) ENGINE=OLAP
+            AGGREGATE KEY(`k1`, `k2`)
+            COMMENT 'OLAP'
+            PARTITION BY RANGE(`k1`)
+            (PARTITION partition_a VALUES [("-9223372036854775808"), 
("100000")),
+            PARTITION partition_b VALUES [("100000"), ("1000000000")),
+            PARTITION partition_c VALUES [("1000000000"), ("10000000000")),
+            PARTITION partition_d VALUES [("10000000000"), (MAXVALUE)))
+            DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+            PROPERTIES (
+                "replication_num" = "3",
+                "min_load_replica_num" = "1"
+            );
+        """
+
+        try {
+            
GetDebugPoint().enableDebugPointForMajorBEs("LoadChannel.add_batch.failed")
+            streamLoad {
+                table "${tableName}"
+                set 'column_separator', '\t'
+                set 'columns', 'k1, k2, v2, v10, v11'
+                set 'partitions', 'partition_a, partition_b, partition_c, 
partition_d'
+                set 'strict_mode', 'true'
+                file 'test_strict_mode.csv'
+                time 10000 // limit inflight 10s
+                check { result, exception, startTime, endTime ->
+                    if (exception != null) {
+                        throw exception
+                    }
+                    log.info("Stream load result: ${result}".toString())
+                    def json = parseJson(result)
+                    assertEquals("success", json.Status.toLowerCase())
+                    assertEquals(2, json.NumberTotalRows)
+                    assertEquals(0, json.NumberFilteredRows)
+                    assertEquals(0, json.NumberUnselectedRows)
+                }
+            }
+            sql "sync"
+            def res = sql "select * from ${tableName}"
+            log.info("select result: ${res}".toString())
+            assertEquals(2, res.size())
+        } finally {
+            
GetDebugPoint().disableDebugPointForAllBEs("LoadChannel.add_batch.failed")
+        }
+
+        try {
+            
GetDebugPoint().enableDebugPointForMajorBEs("TabletStream.add_segment.add_segment_failed")
+            streamLoad {
+                table "${tableName}"
+                set 'column_separator', '\t'
+                set 'columns', 'k1, k2, v2, v10, v11'
+                set 'partitions', 'partition_a, partition_b, partition_c, 
partition_d'
+                set 'strict_mode', 'true'
+                set 'memtable_on_sink_node', 'true'
+                file 'test_strict_mode.csv'
+                time 10000 // limit inflight 10s
+                check { result, exception, startTime, endTime ->
+                    if (exception != null) {
+                        throw exception
+                    }
+                    log.info("Stream load result: ${result}".toString())
+                    def json = parseJson(result)
+                    assertEquals("success", json.Status.toLowerCase())
+                    assertEquals(2, json.NumberTotalRows)
+                    assertEquals(0, json.NumberFilteredRows)
+                    assertEquals(0, json.NumberUnselectedRows)
+                }
+            }
+            sql "sync"
+            def res = sql "select * from ${tableName}"
+            log.info("select result: ${res}".toString())
+            assertEquals(2, res.size())
+        } finally {
+            
GetDebugPoint().disableDebugPointForAllBEs("TabletStream.add_segment.add_segment_failed")
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to