This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 935953d92bf branch-3.1: [fix](sink) fix sink operator tolerate failed
replica number incorrect #52560 (#53099)
935953d92bf is described below
commit 935953d92bf69229c8b100218f3d6bc3ef3cd5df
Author: hui lai <[email protected]>
AuthorDate: Fri Jul 11 19:16:46 2025 +0800
branch-3.1: [fix](sink) fix sink operator tolerate failed replica number
incorrect #52560 (#53099)
pick #52560
---
be/src/exec/tablet_info.cpp | 6 +
be/src/exec/tablet_info.h | 2 +
be/src/runtime/load_channel.cpp | 3 +
be/src/vec/sink/writer/vtablet_writer.cpp | 31 +++++-
be/src/vec/sink/writer/vtablet_writer.h | 7 ++
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 29 ++++-
be/src/vec/sink/writer/vtablet_writer_v2.h | 10 +-
be/test/vec/sink/vtablet_writer_v2_test.cpp | 96 +++++++++-------
.../java/org/apache/doris/catalog/OlapTable.java | 6 +-
.../org/apache/doris/planner/OlapTableSink.java | 4 +
gensrc/thrift/Descriptors.thrift | 2 +
.../apache/doris/regression/util/DebugPoint.groovy | 18 +++
.../load_p0/stream_load/test_sink_tolerate.groovy | 121 +++++++++++++++++++++
13 files changed, 284 insertions(+), 51 deletions(-)
diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index 5e75f8bac71..205b7e440d3 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -713,6 +713,12 @@ Status
VOlapTablePartitionParam::generate_partition_from(const TOlapTablePartiti
part_result->indexes[j].index_id,
_schema->indexes()[j]->index_id);
}
}
+ if (t_part.__isset.total_replica_num) {
+ part_result->total_replica_num = t_part.total_replica_num;
+ }
+ if (t_part.__isset.load_required_replica_num) {
+ part_result->load_required_replica_num =
t_part.load_required_replica_num;
+ }
return Status::OK();
}
diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h
index b5a139c036d..0b75e8706b3 100644
--- a/be/src/exec/tablet_info.h
+++ b/be/src/exec/tablet_info.h
@@ -163,6 +163,8 @@ struct VOlapTablePartition {
bool is_mutable;
// -1 indicates partition with hash distribution
int64_t load_tablet_idx = -1;
+ int total_replica_num = 0;
+ int load_required_replica_num = 0;
VOlapTablePartition(vectorized::Block* partition_block)
// the default value of partition bound is -1.
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 54665d8184f..cf10d3d66e8 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -29,6 +29,7 @@
#include "runtime/tablets_channel.h"
#include "runtime/thread_context.h"
#include "runtime/workload_group/workload_group_manager.h"
+#include "util/debug_points.h"
namespace doris {
@@ -172,6 +173,8 @@ Status
LoadChannel::_get_tablets_channel(std::shared_ptr<BaseTabletsChannel>& ch
Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request,
PTabletWriterAddBlockResult* response) {
+ DBUG_EXECUTE_IF("LoadChannel.add_batch.failed",
+ { return Status::InternalError("fault injection"); });
SCOPED_TIMER(_add_batch_timer);
COUNTER_UPDATE(_add_batch_times, 1);
SCOPED_ATTACH_TASK(_query_thread_context);
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 581fc1704e9..64903c5b212 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -172,7 +172,7 @@ void IndexChannel::mark_as_failed(const VNodeChannel*
node_channel, const std::s
_failed_channels[the_tablet_id].insert(node_id);
_failed_channels_msgs.emplace(the_tablet_id,
err + ", host: " +
node_channel->host());
- if (_failed_channels[the_tablet_id].size() >=
((_parent->_num_replicas + 1) / 2)) {
+ if (_failed_channels[the_tablet_id].size() >
_max_failed_replicas(the_tablet_id)) {
_intolerable_failure_status =
Status::Error<ErrorCode::INTERNAL_ERROR, false>(
_failed_channels_msgs[the_tablet_id]);
}
@@ -180,7 +180,7 @@ void IndexChannel::mark_as_failed(const VNodeChannel*
node_channel, const std::s
} else {
_failed_channels[tablet_id].insert(node_id);
_failed_channels_msgs.emplace(tablet_id, err + ", host: " +
node_channel->host());
- if (_failed_channels[tablet_id].size() >= ((_parent->_num_replicas
+ 1) / 2)) {
+ if (_failed_channels[tablet_id].size() >
_max_failed_replicas(tablet_id)) {
_intolerable_failure_status =
Status::Error<ErrorCode::INTERNAL_ERROR, false>(
_failed_channels_msgs[tablet_id]);
}
@@ -188,6 +188,15 @@ void IndexChannel::mark_as_failed(const VNodeChannel*
node_channel, const std::s
}
}
+int IndexChannel::_max_failed_replicas(int64_t tablet_id) {
+ auto [total_replicas_num, load_required_replicas_num] =
+ _parent->_tablet_replica_info[tablet_id];
+ int max_failed_replicas = total_replicas_num == 0
+ ? (_parent->_num_replicas - 1) / 2
+ : total_replicas_num -
load_required_replicas_num;
+ return max_failed_replicas;
+}
+
Status IndexChannel::check_intolerable_failure() {
std::lock_guard<std::mutex> l(_fail_lock);
return _intolerable_failure_status;
@@ -1404,6 +1413,7 @@ Status VTabletWriter::_init(RuntimeState* state,
RuntimeProfile* profile) {
tablet_with_partition.partition_id = part->id;
tablet_with_partition.tablet_id = tablet;
tablets.emplace_back(std::move(tablet_with_partition));
+ _build_tablet_replica_info(tablet, part);
}
}
if (tablets.empty() && !_vpartition->is_auto_partition()) {
@@ -1436,6 +1446,7 @@ Status VTabletWriter::_incremental_open_node_channel(
tablet_with_partition.partition_id = part->id;
tablet_with_partition.tablet_id = tablet;
tablets.emplace_back(std::move(tablet_with_partition));
+ _build_tablet_replica_info(tablet, part);
}
DCHECK(!tablets.empty()) << "incremental open got nothing!";
}
@@ -1472,6 +1483,22 @@ Status VTabletWriter::_incremental_open_node_channel(
return Status::OK();
}
+void VTabletWriter::_build_tablet_replica_info(const int64_t tablet_id,
+ VOlapTablePartition* partition)
{
+ if (partition != nullptr) {
+ int total_replicas_num =
+ partition->total_replica_num == 0 ? _num_replicas :
partition->total_replica_num;
+ int load_required_replicas_num = partition->load_required_replica_num
== 0
+ ? (_num_replicas + 1) / 2
+ :
partition->load_required_replica_num;
+ _tablet_replica_info.emplace(
+ tablet_id, std::make_pair(total_replicas_num,
load_required_replicas_num));
+ } else {
+ _tablet_replica_info.emplace(tablet_id,
+ std::make_pair(_num_replicas,
(_num_replicas + 1) / 2));
+ }
+}
+
void VTabletWriter::_cancel_all_channel(Status status) {
for (const auto& index_channel : _channels) {
index_channel->for_each_node_channel([&status](const
std::shared_ptr<VNodeChannel>& ch) {
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index 116aa20a98c..8a15df9a252 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -551,6 +551,8 @@ private:
friend class VTabletWriter;
friend class VRowDistribution;
+ int _max_failed_replicas(int64_t tablet_id);
+
VTabletWriter* _parent = nullptr;
int64_t _index_id;
vectorized::VExprContextSPtr _where_clause;
@@ -636,6 +638,8 @@ private:
void _do_try_close(RuntimeState* state, const Status& exec_status);
+ void _build_tablet_replica_info(const int64_t tablet_id,
VOlapTablePartition* partition);
+
TDataSink _t_sink;
std::shared_ptr<MemTracker> _mem_tracker;
@@ -738,5 +742,8 @@ private:
VRowDistribution _row_distribution;
// reuse to avoid frequent memory allocation and release.
std::vector<RowPartTabletIds> _row_part_tablet_ids;
+
+ // tablet_id -> <total replicas num, load required replicas num>
+ std::unordered_map<int64_t, std::pair<int, int>> _tablet_replica_info;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index d44db473b28..fa63a352e88 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -111,6 +111,7 @@ Status VTabletWriterV2::_incremental_open_streams(
VLOG_DEBUG << "incremental open stream (" << partition->id
<< ", " << tablet_id
<< ")";
}
+ _build_tablet_replica_info(tablet_id, partition);
}
}
}
@@ -346,12 +347,28 @@ Status VTabletWriterV2::_build_tablet_node_mapping() {
_indexes_from_node[node].emplace_back(tablet);
known_indexes.insert(index.index_id);
}
+ _build_tablet_replica_info(tablet_id, partition);
}
}
}
return Status::OK();
}
+void VTabletWriterV2::_build_tablet_replica_info(const int64_t tablet_id,
+ VOlapTablePartition*
partition) {
+ if (partition != nullptr) {
+ int total_replicas_num =
+ partition->total_replica_num == 0 ? _num_replicas :
partition->total_replica_num;
+ int load_required_replicas_num = partition->load_required_replica_num
== 0
+ ? (_num_replicas + 1) / 2
+ :
partition->load_required_replica_num;
+ _tablet_replica_info[tablet_id] =
+ std::make_pair(total_replicas_num, load_required_replicas_num);
+ } else {
+ _tablet_replica_info[tablet_id] = std::make_pair(_num_replicas,
(_num_replicas + 1) / 2);
+ }
+}
+
void VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>&
row_part_tablet_ids,
RowsForTablet&
rows_for_tablet) {
for (int index_idx = 0; index_idx < row_part_tablet_ids.size();
index_idx++) {
@@ -670,8 +687,7 @@ Status VTabletWriterV2::close(Status exec_status) {
});
std::vector<TTabletCommitInfo> tablet_commit_infos;
- RETURN_IF_ERROR(
- _create_commit_info(tablet_commit_infos, _load_stream_map,
_num_replicas));
+ RETURN_IF_ERROR(_create_commit_info(tablet_commit_infos,
_load_stream_map));
_state->add_tablet_commit_infos(tablet_commit_infos);
}
@@ -806,8 +822,7 @@ void VTabletWriterV2::_calc_tablets_to_commit() {
}
Status VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>&
tablet_commit_infos,
- std::shared_ptr<LoadStreamMap>
load_stream_map,
- int num_replicas) {
+ std::shared_ptr<LoadStreamMap>
load_stream_map) {
std::unordered_map<int64_t, int> failed_tablets;
std::unordered_map<int64_t, Status> failed_reason;
load_stream_map->for_each([&](int64_t dst_id, LoadStreamStubs& streams) {
@@ -830,7 +845,11 @@ Status
VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>& tabl
});
for (auto [tablet_id, replicas] : failed_tablets) {
- if (replicas > (num_replicas - 1) / 2) {
+ auto [total_replicas_num, load_required_replicas_num] =
_tablet_replica_info[tablet_id];
+ int max_failed_replicas = total_replicas_num == 0
+ ? (_num_replicas - 1) / 2
+ : total_replicas_num -
load_required_replicas_num;
+ if (replicas > max_failed_replicas) {
LOG(INFO) << "tablet " << tablet_id
<< " failed on majority backends: " <<
failed_reason[tablet_id];
return Status::InternalError("tablet {} failed on majority
backends: {}", tablet_id,
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index cc87002a097..ab83bd782c3 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -118,9 +118,8 @@ public:
#ifndef BE_TEST
private:
#endif
- static Status _create_commit_info(std::vector<TTabletCommitInfo>&
tablet_commit_infos,
- std::shared_ptr<LoadStreamMap>
load_stream_map,
- int num_replicas);
+ Status _create_commit_info(std::vector<TTabletCommitInfo>&
tablet_commit_infos,
+ std::shared_ptr<LoadStreamMap> load_stream_map);
private:
Status _init_row_distribution();
@@ -133,6 +132,8 @@ private:
Status _incremental_open_streams(const std::vector<TOlapTablePartition>&
partitions);
+ void _build_tablet_replica_info(const int64_t tablet_id,
VOlapTablePartition* partition);
+
Status _send_new_partition_batch();
Status _build_tablet_node_mapping();
@@ -242,6 +243,9 @@ private:
VRowDistribution _row_distribution;
// reuse to avoid frequent memory allocation and release.
std::vector<RowPartTabletIds> _row_part_tablet_ids;
+
+ // tablet_id -> <total replicas num, load required replicas num>
+ std::unordered_map<int64_t, std::pair<int, int>> _tablet_replica_info;
};
} // namespace vectorized
diff --git a/be/test/vec/sink/vtablet_writer_v2_test.cpp
b/be/test/vec/sink/vtablet_writer_v2_test.cpp
index 67dc9d089ab..ce467fb1d45 100644
--- a/be/test/vec/sink/vtablet_writer_v2_test.cpp
+++ b/be/test/vec/sink/vtablet_writer_v2_test.cpp
@@ -27,7 +27,7 @@ namespace doris {
class TestVTabletWriterV2 : public ::testing::Test {
public:
TestVTabletWriterV2() = default;
- ~TestVTabletWriterV2() = default;
+ ~TestVTabletWriterV2() override = default;
static void SetUpTestSuite() {}
static void TearDownTestSuite() {}
};
@@ -47,15 +47,31 @@ static void add_stream(std::shared_ptr<LoadStreamMap>
load_stream_map, int64_t n
}
}
+static std::unique_ptr<vectorized::VTabletWriterV2> create_vtablet_writer(int
num_replicas = 3) {
+ TDataSink t_sink;
+ t_sink.__isset.olap_table_sink = true;
+ t_sink.olap_table_sink.num_replicas = num_replicas;
+ vectorized::VExprContextSPtrs output_exprs;
+ std::shared_ptr<pipeline::Dependency> dep = nullptr;
+ std::shared_ptr<pipeline::Dependency> fin_dep = nullptr;
+ auto writer = std::make_unique<vectorized::VTabletWriterV2>(t_sink,
output_exprs, dep, fin_dep);
+
+ int required_replicas = num_replicas / 2 + 1;
+ writer->_tablet_replica_info[1] = std::make_pair(num_replicas,
required_replicas);
+ writer->_tablet_replica_info[2] = std::make_pair(num_replicas,
required_replicas);
+
+ return writer;
+}
+
TEST_F(TestVTabletWriterV2, one_replica) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
- const int num_replicas = 1;
add_stream(load_stream_map, 1001, {1, 2}, {});
- auto st =
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos,
load_stream_map,
- num_replicas);
+
+ auto writer = create_vtablet_writer(1);
+ auto st = writer->_create_commit_info(tablet_commit_infos,
load_stream_map);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 2);
}
@@ -65,10 +81,10 @@ TEST_F(TestVTabletWriterV2, one_replica_fail) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
- const int num_replicas = 1;
add_stream(load_stream_map, 1001, {1}, {{2,
Status::InternalError("test")}});
- auto st =
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos,
load_stream_map,
- num_replicas);
+
+ auto writer = create_vtablet_writer(1);
+ auto st = writer->_create_commit_info(tablet_commit_infos,
load_stream_map);
ASSERT_EQ(st, Status::InternalError("test"));
}
@@ -77,11 +93,11 @@ TEST_F(TestVTabletWriterV2, two_replica) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
- const int num_replicas = 2;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1, 2}, {});
- auto st =
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos,
load_stream_map,
- num_replicas);
+
+ auto writer = create_vtablet_writer(2);
+ auto st = writer->_create_commit_info(tablet_commit_infos,
load_stream_map);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}
@@ -91,11 +107,11 @@ TEST_F(TestVTabletWriterV2, two_replica_fail) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
- const int num_replicas = 2;
add_stream(load_stream_map, 1001, {1}, {{2,
Status::InternalError("test")}});
add_stream(load_stream_map, 1002, {1, 2}, {});
- auto st =
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos,
load_stream_map,
- num_replicas);
+
+ auto writer = create_vtablet_writer(2);
+ auto st = writer->_create_commit_info(tablet_commit_infos,
load_stream_map);
ASSERT_EQ(st, Status::InternalError("test"));
}
@@ -104,12 +120,12 @@ TEST_F(TestVTabletWriterV2, normal) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
- const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1, 2}, {});
add_stream(load_stream_map, 1003, {1, 2}, {});
- auto st =
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos,
load_stream_map,
- num_replicas);
+
+ auto writer = create_vtablet_writer();
+ auto st = writer->_create_commit_info(tablet_commit_infos,
load_stream_map);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 6);
}
@@ -119,12 +135,12 @@ TEST_F(TestVTabletWriterV2, miss_one) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
- const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {});
add_stream(load_stream_map, 1003, {1, 2}, {});
- auto st =
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos,
load_stream_map,
- num_replicas);
+
+ auto writer = create_vtablet_writer();
+ auto st = writer->_create_commit_info(tablet_commit_infos,
load_stream_map);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 5);
}
@@ -134,12 +150,12 @@ TEST_F(TestVTabletWriterV2, miss_two) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
- const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {});
add_stream(load_stream_map, 1003, {1}, {});
- auto st =
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos,
load_stream_map,
- num_replicas);
+
+ auto writer = create_vtablet_writer();
+ auto st = writer->_create_commit_info(tablet_commit_infos,
load_stream_map);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}
@@ -149,12 +165,12 @@ TEST_F(TestVTabletWriterV2, fail_one) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
- const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {{2,
Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1, 2}, {});
- auto st =
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos,
load_stream_map,
- num_replicas);
+
+ auto writer = create_vtablet_writer();
+ auto st = writer->_create_commit_info(tablet_commit_infos,
load_stream_map);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 5);
}
@@ -164,13 +180,13 @@ TEST_F(TestVTabletWriterV2, fail_one_duplicate) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
- const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {{2,
Status::InternalError("test")}});
add_stream(load_stream_map, 1002, {1}, {{2,
Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1, 2}, {});
- auto st =
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos,
load_stream_map,
- num_replicas);
+
+ auto writer = create_vtablet_writer();
+ auto st = writer->_create_commit_info(tablet_commit_infos,
load_stream_map);
// Duplicate tablets from same node should be ignored
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 5);
@@ -181,13 +197,13 @@ TEST_F(TestVTabletWriterV2,
fail_two_diff_tablet_same_node) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
- const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {},
{{1, Status::InternalError("test")}, {2,
Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1, 2}, {});
- auto st =
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos,
load_stream_map,
- num_replicas);
+
+ auto writer = create_vtablet_writer();
+ auto st = writer->_create_commit_info(tablet_commit_infos,
load_stream_map);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}
@@ -197,12 +213,12 @@ TEST_F(TestVTabletWriterV2,
fail_two_diff_tablet_diff_node) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
- const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {{2,
Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {2}, {{1,
Status::InternalError("test")}});
- auto st =
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos,
load_stream_map,
- num_replicas);
+
+ auto writer = create_vtablet_writer();
+ auto st = writer->_create_commit_info(tablet_commit_infos,
load_stream_map);
ASSERT_TRUE(st.ok());
ASSERT_EQ(tablet_commit_infos.size(), 4);
}
@@ -212,12 +228,12 @@ TEST_F(TestVTabletWriterV2, fail_two_same_tablet) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
- const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1, 2}, {});
add_stream(load_stream_map, 1002, {1}, {{2,
Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1}, {{2,
Status::InternalError("test")}});
- auto st =
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos,
load_stream_map,
- num_replicas);
+
+ auto writer = create_vtablet_writer();
+ auto st = writer->_create_commit_info(tablet_commit_infos,
load_stream_map);
// BE should detect and abort commit if majority of replicas failed
ASSERT_EQ(st, Status::InternalError("test"));
}
@@ -227,12 +243,12 @@ TEST_F(TestVTabletWriterV2,
fail_two_miss_one_same_tablet) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
std::shared_ptr<LoadStreamMap> load_stream_map =
std::make_shared<LoadStreamMap>(load_id, src_id, 1, 1, nullptr);
- const int num_replicas = 3;
add_stream(load_stream_map, 1001, {1}, {});
add_stream(load_stream_map, 1002, {1}, {{2,
Status::InternalError("test")}});
add_stream(load_stream_map, 1003, {1}, {{2,
Status::InternalError("test")}});
- auto st =
vectorized::VTabletWriterV2::_create_commit_info(tablet_commit_infos,
load_stream_map,
- num_replicas);
+
+ auto writer = create_vtablet_writer();
+ auto st = writer->_create_commit_info(tablet_commit_infos,
load_stream_map);
// BE should detect and abort commit if majority of replicas failed
ASSERT_EQ(st, Status::InternalError("test"));
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index ded68fafc5a..b94ccbb42d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -2364,8 +2364,12 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
tableProperty.buildMinLoadReplicaNum();
}
+ public int getPartitionTotalReplicasNum(long partitionId) {
+ return
partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum();
+ }
+
public int getLoadRequiredReplicaNum(long partitionId) {
- int totalReplicaNum =
partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum();
+ int totalReplicaNum = getPartitionTotalReplicasNum(partitionId);
int minLoadReplicaNum = getMinLoadReplicaNum();
if (minLoadReplicaNum > 0) {
return Math.min(minLoadReplicaNum, totalReplicaNum);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index d201c85eb35..329da4d5eb8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -256,6 +256,10 @@ public class OlapTableSink extends DataSink {
tOlapTableSchemaParam = createSchema(tSink.getDbId(), dstTable,
analyzer);
tOlapTablePartitionParam = createPartition(tSink.getDbId(), dstTable,
analyzer);
+ for (TOlapTablePartition partition :
tOlapTablePartitionParam.getPartitions()) {
+
partition.setTotalReplicaNum(dstTable.getPartitionTotalReplicasNum(partition.getId()));
+
partition.setLoadRequiredReplicaNum(dstTable.getLoadRequiredReplicaNum(partition.getId()));
+ }
tOlapTableLocationParams = createLocation(tSink.getDbId(), dstTable);
tSink.setSchema(tOlapTableSchemaParam);
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 12701800139..b758acd28b7 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -210,6 +210,8 @@ struct TOlapTablePartition {
10: optional bool is_default_partition;
// only used in random distribution scenario to make data distributed even
11: optional i64 load_tablet_idx
+ 12: optional i32 total_replica_num
+ 13: optional i32 load_required_replica_num
}
struct TOlapTablePartitionParam {
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy
index 7386d896ac3..cbf4cda6f8d 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy
@@ -111,6 +111,24 @@ class DebugPoint {
})
}
+ /* Enable specific debug point for major BE node in cluster */
+ def enableDebugPointForMajorBEs(String name, Map<String, String> params =
null) {
+ def skip_flag = false
+ operateDebugPointForAllBEs({ host, port ->
+ logger.info("enable debug point ${name} with params ${params} for
BE $host:$port")
+ if (port == -1) {
+ logger.info("skip for BE $host:$port")
+ return
+ }
+ if (!skip_flag) {
+ skip_flag = true
+ logger.info("skip for BE $host:$port")
+ return
+ }
+ enableDebugPoint(host, port, NodeType.BE, name, params)
+ })
+ }
+
/* Disable specific debug point for all BE node in cluster */
def disableDebugPointForAllBEs(String name) {
operateDebugPointForAllBEs { host, port ->
diff --git
a/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
b/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
new file mode 100644
index 00000000000..c55d80a70cd
--- /dev/null
+++ b/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_sink_tolerate", "docker") {
+ def options = new ClusterOptions()
+ options.beConfigs += [
+ 'enable_debug_points=true'
+ ]
+
+ docker(options) {
+ def tableName = "test_sink_tolerate"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` bigint(20) NULL,
+ `k2` bigint(20) NULL,
+ `v1` tinyint(4) SUM NULL,
+ `v2` tinyint(4) REPLACE NULL,
+ `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL,
+ `v4` smallint(6) REPLACE_IF_NOT_NULL NULL,
+ `v5` int(11) REPLACE_IF_NOT_NULL NULL,
+ `v6` bigint(20) REPLACE_IF_NOT_NULL NULL,
+ `v7` largeint(40) REPLACE_IF_NOT_NULL NULL,
+ `v8` datetime REPLACE_IF_NOT_NULL NULL,
+ `v9` date REPLACE_IF_NOT_NULL NULL,
+ `v10` char(10) REPLACE_IF_NOT_NULL NULL,
+ `v11` varchar(6) REPLACE_IF_NOT_NULL NULL,
+ `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL
+ ) ENGINE=OLAP
+ AGGREGATE KEY(`k1`, `k2`)
+ COMMENT 'OLAP'
+ PARTITION BY RANGE(`k1`)
+ (PARTITION partition_a VALUES [("-9223372036854775808"),
("100000")),
+ PARTITION partition_b VALUES [("100000"), ("1000000000")),
+ PARTITION partition_c VALUES [("1000000000"), ("10000000000")),
+ PARTITION partition_d VALUES [("10000000000"), (MAXVALUE)))
+ DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "3",
+ "min_load_replica_num" = "1"
+ );
+ """
+
+ try {
+
GetDebugPoint().enableDebugPointForMajorBEs("LoadChannel.add_batch.failed")
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '\t'
+ set 'columns', 'k1, k2, v2, v10, v11'
+ set 'partitions', 'partition_a, partition_b, partition_c,
partition_d'
+ set 'strict_mode', 'true'
+ file 'test_strict_mode.csv'
+ time 10000 // limit inflight 10s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(2, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+ def res = sql "select * from ${tableName}"
+ log.info("select result: ${res}".toString())
+ assertEquals(2, res.size())
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("LoadChannel.add_batch.failed")
+ }
+
+ try {
+
GetDebugPoint().enableDebugPointForMajorBEs("TabletStream.add_segment.add_segment_failed")
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '\t'
+ set 'columns', 'k1, k2, v2, v10, v11'
+ set 'partitions', 'partition_a, partition_b, partition_c,
partition_d'
+ set 'strict_mode', 'true'
+ set 'memtable_on_sink_node', 'true'
+ file 'test_strict_mode.csv'
+ time 10000 // limit inflight 10s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(2, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+ def res = sql "select * from ${tableName}"
+ log.info("select result: ${res}".toString())
+ assertEquals(2, res.size())
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("TabletStream.add_segment.add_segment_failed")
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]