This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c0b37982e9 [enhance](load) exclude version-gap replicas from success 
counting in quorum success (#60953)
8c0b37982e9 is described below

commit 8c0b37982e991fc7eb49a18e41079769df9b11f5
Author: hui lai <[email protected]>
AuthorDate: Mon Mar 16 11:23:53 2026 +0800

    [enhance](load) exclude version-gap replicas from success counting in 
quorum success (#60953)
    
    ## Summary
    
    When using majority write (quorum success), BE does not distinguish
    between replicas
    with continuous versions and replicas with version gaps
    (`lastFailedVersion >= 0`).
    This causes inconsistency with FE's commit check, which correctly
    excludes
    version-gap replicas from success counting.
    
    ## Bad Case
    
    Consider 3 replicas on nodes 1, 2, 3 with `load_required_replica_num =
    2`:
    
    1. **First write**: nodes 1,2 succeed, node 3 fails → overall success.
       Node 3 now has a version gap (`lastFailedVersion >= 0`).
    2. **Second write**: nodes 1,3 succeed, node 2 fails →
       - **BE** counts 2 successes (nodes 1,3), considers it quorum success.
    - **FE** commit only counts node 1 as success (node 3 has version gap),
         so `successReplicaNum = 1 < 2`, commit fails.
    - This wastes resources since BE already returned success to the client
         but FE rejects the transaction.
    
    The correct behavior for the second write:
    - nodes 1,3 succeed → should **FAIL** (node 3 has version gap, only node
    1 counts)
    - nodes 1,2 succeed → should **SUCCEED** (both have continuous versions)
    
    ## Solution
    
    Pass per-tablet version-gap backend information from FE to BE via a new
    thrift field
    `map<tablet_id, list<backend_id>> tablet_version_gap_backends` in
    `TOlapTablePartition`.
    
    On the BE side, when counting successful replicas for majority write in
    both
    `VTabletWriter` (V1) and `VTabletWriterV2`, exclude version-gap backends
    from
    the `finished_tablets_replica` counter. This makes BE's quorum check
    consistent
    with FE's commit check.
    
    ### Changes
    
    - **Descriptors.thrift**: Add `tablet_version_gap_backends` field to
    `TOlapTablePartition`
    - **OlapTable.java**: Add `getPartitionVersionGapBackends()` to compute
    gap backends per tablet
    - **OlapTableSink.java**: Populate the new field when building partition
    info
    - **tablet_info.h/cpp**: Parse and store gap backends from thrift
    - **vtablet_writer.cpp**: Exclude gap backends in `_quorum_success`
    - **vtablet_writer_v2.cpp**: Exclude gap backends in `_quorum_success`
    and `_create_commit_info`
---
 be/src/exec/sink/writer/vtablet_writer.cpp         | 13 +++++++-
 be/src/exec/sink/writer/vtablet_writer.h           |  4 +++
 be/src/exec/sink/writer/vtablet_writer_v2.cpp      | 38 ++++++++++++++++------
 be/src/exec/sink/writer/vtablet_writer_v2.h        |  3 ++
 be/src/storage/tablet_info.cpp                     |  8 +++++
 be/src/storage/tablet_info.h                       |  2 ++
 .../java/org/apache/doris/catalog/OlapTable.java   | 22 +++++++++++++
 .../org/apache/doris/planner/OlapTableSink.java    |  8 +++++
 gensrc/thrift/Descriptors.thrift                   |  3 ++
 9 files changed, 90 insertions(+), 11 deletions(-)

diff --git a/be/src/exec/sink/writer/vtablet_writer.cpp 
b/be/src/exec/sink/writer/vtablet_writer.cpp
index 452416189bc..52097d90357 100644
--- a/be/src/exec/sink/writer/vtablet_writer.cpp
+++ b/be/src/exec/sink/writer/vtablet_writer.cpp
@@ -424,7 +424,13 @@ bool IndexChannel::_quorum_success(const 
std::unordered_set<int64_t>& unfinished
             continue;
         }
         for (const auto& tablet_id : _tablets_by_channel[node_id]) {
-            finished_tablets_replica[tablet_id]++;
+            // Only count non-gap backends for quorum success.
+            // Gap backends' success doesn't count toward majority write.
+            auto gap_it = 
_parent->_tablet_version_gap_backends.find(tablet_id);
+            if (gap_it == _parent->_tablet_version_gap_backends.end() ||
+                gap_it->second.find(node_id) == gap_it->second.end()) {
+                finished_tablets_replica[tablet_id]++;
+            }
         }
     }
 
@@ -1725,6 +1731,11 @@ void VTabletWriter::_build_tablet_replica_info(const 
int64_t tablet_id,
                                                  : 
partition->load_required_replica_num;
         _tablet_replica_info.emplace(
                 tablet_id, std::make_pair(total_replicas_num, 
load_required_replicas_num));
+        // Copy version gap backends info for this tablet
+        if (auto it = partition->tablet_version_gap_backends.find(tablet_id);
+            it != partition->tablet_version_gap_backends.end()) {
+            _tablet_version_gap_backends[tablet_id] = it->second;
+        }
     } else {
         _tablet_replica_info.emplace(tablet_id,
                                      std::make_pair(_num_replicas, 
(_num_replicas + 1) / 2));
diff --git a/be/src/exec/sink/writer/vtablet_writer.h 
b/be/src/exec/sink/writer/vtablet_writer.h
index 19f5377124a..d9e3869e68e 100644
--- a/be/src/exec/sink/writer/vtablet_writer.h
+++ b/be/src/exec/sink/writer/vtablet_writer.h
@@ -771,5 +771,9 @@ private:
 
     // tablet_id -> <total replicas num, load required replicas num>
     std::unordered_map<int64_t, std::pair<int, int>> _tablet_replica_info;
+
+    // tablet_id -> set of backend_ids that have version gaps
+    // these backends' success should not be counted for majority write
+    std::unordered_map<int64_t, std::unordered_set<int64_t>> 
_tablet_version_gap_backends;
 };
 } // namespace doris
diff --git a/be/src/exec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/exec/sink/writer/vtablet_writer_v2.cpp
index c89c5e4747d..d978b95fb0b 100644
--- a/be/src/exec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/exec/sink/writer/vtablet_writer_v2.cpp
@@ -371,6 +371,11 @@ void VTabletWriterV2::_build_tablet_replica_info(const 
int64_t tablet_id,
                                                  : 
partition->load_required_replica_num;
         _tablet_replica_info[tablet_id] =
                 std::make_pair(total_replicas_num, load_required_replicas_num);
+        // Copy version gap backends info for this tablet
+        if (auto it = partition->tablet_version_gap_backends.find(tablet_id);
+            it != partition->tablet_version_gap_backends.end()) {
+            _tablet_version_gap_backends[tablet_id] = it->second;
+        }
     } else {
         _tablet_replica_info[tablet_id] = std::make_pair(_num_replicas, 
(_num_replicas + 1) / 2);
     }
@@ -876,7 +881,13 @@ bool VTabletWriterV2::_quorum_success(
             continue;
         }
         for (const auto& tablet_id : _tablets_by_node[dst_id]) {
-            finished_tablets_replica[tablet_id]++;
+            // Only count non-gap backends for quorum success.
+            // Gap backends' success doesn't count toward majority write.
+            auto gap_it = _tablet_version_gap_backends.find(tablet_id);
+            if (gap_it == _tablet_version_gap_backends.end() ||
+                gap_it->second.find(dst_id) == gap_it->second.end()) {
+                finished_tablets_replica[tablet_id]++;
+            }
         }
     }
 
@@ -1015,13 +1026,15 @@ void VTabletWriterV2::_calc_tablets_to_commit() {
 
 Status VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>& 
tablet_commit_infos,
                                             std::shared_ptr<LoadStreamMap> 
load_stream_map) {
-    std::unordered_map<int64_t, int> failed_tablets;
+    // Track per-tablet non-gap success count and failure reasons
+    std::unordered_map<int64_t, int> success_tablets_replica;
+    std::unordered_set<int64_t> failed_tablets;
     std::unordered_map<int64_t, Status> failed_reason;
     load_stream_map->for_each([&](int64_t dst_id, LoadStreamStubs& streams) {
         size_t num_success_tablets = 0;
         size_t num_failed_tablets = 0;
         for (auto [tablet_id, reason] : streams.failed_tablets()) {
-            failed_tablets[tablet_id]++;
+            failed_tablets.insert(tablet_id);
             failed_reason[tablet_id] = reason;
             num_failed_tablets++;
         }
@@ -1030,20 +1043,25 @@ Status 
VTabletWriterV2::_create_commit_info(std::vector<TTabletCommitInfo>& tabl
             commit_info.tabletId = tablet_id;
             commit_info.backendId = dst_id;
             tablet_commit_infos.emplace_back(std::move(commit_info));
+            // Only count non-gap backends toward success
+            auto gap_it = _tablet_version_gap_backends.find(tablet_id);
+            if (gap_it == _tablet_version_gap_backends.end() ||
+                gap_it->second.find(dst_id) == gap_it->second.end()) {
+                success_tablets_replica[tablet_id]++;
+            }
             num_success_tablets++;
         }
         LOG(INFO) << "streams to dst_id: " << dst_id << ", success tablets: " 
<< num_success_tablets
                   << ", failed tablets: " << num_failed_tablets;
     });
 
-    for (auto [tablet_id, replicas] : failed_tablets) {
-        auto [total_replicas_num, load_required_replicas_num] = 
_tablet_replica_info[tablet_id];
-        int max_failed_replicas = total_replicas_num == 0
-                                          ? (_num_replicas - 1) / 2
-                                          : total_replicas_num - 
load_required_replicas_num;
-        if (replicas > max_failed_replicas) {
+    for (auto tablet_id : failed_tablets) {
+        int succ_count = success_tablets_replica[tablet_id];
+        int required = _load_required_replicas_num(tablet_id);
+        if (succ_count < required) {
             LOG(INFO) << "tablet " << tablet_id
-                      << " failed on majority backends: " << 
failed_reason[tablet_id];
+                      << " failed on majority backends (success=" << succ_count
+                      << ", required=" << required << "): " << 
failed_reason[tablet_id];
             return Status::InternalError("tablet {} failed on majority 
backends: {}", tablet_id,
                                          failed_reason[tablet_id]);
         }
diff --git a/be/src/exec/sink/writer/vtablet_writer_v2.h 
b/be/src/exec/sink/writer/vtablet_writer_v2.h
index ec801bc0db4..914c17d1cdc 100644
--- a/be/src/exec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/exec/sink/writer/vtablet_writer_v2.h
@@ -253,6 +253,9 @@ private:
     // tablet_id -> <total replicas num, load required replicas num>
     std::unordered_map<int64_t, std::pair<int, int>> _tablet_replica_info;
 
+    // tablet_id -> set of backend_ids that have version gaps
+    std::unordered_map<int64_t, std::unordered_set<int64_t>> 
_tablet_version_gap_backends;
+
     std::atomic<int64_t> _load_back_pressure_version_block_ms {0};
 };
 
diff --git a/be/src/storage/tablet_info.cpp b/be/src/storage/tablet_info.cpp
index 8c91a8d1f53..0761396423e 100644
--- a/be/src/storage/tablet_info.cpp
+++ b/be/src/storage/tablet_info.cpp
@@ -747,6 +747,14 @@ Status 
VOlapTablePartitionParam::generate_partition_from(const TOlapTablePartiti
     if (t_part.__isset.load_required_replica_num) {
         part_result->load_required_replica_num = 
t_part.load_required_replica_num;
     }
+    if (t_part.__isset.tablet_version_gap_backends) {
+        for (const auto& [tablet_id, backend_ids] : 
t_part.tablet_version_gap_backends) {
+            auto& gap_set = 
part_result->tablet_version_gap_backends[tablet_id];
+            for (auto backend_id : backend_ids) {
+                gap_set.insert(backend_id);
+            }
+        }
+    }
     return Status::OK();
 }
 
diff --git a/be/src/storage/tablet_info.h b/be/src/storage/tablet_info.h
index 0d0fa88e719..4bb710c3d5e 100644
--- a/be/src/storage/tablet_info.h
+++ b/be/src/storage/tablet_info.h
@@ -167,6 +167,8 @@ struct VOlapTablePartition {
     int64_t load_tablet_idx = -1;
     int total_replica_num = 0;
     int load_required_replica_num = 0;
+    // tablet_id -> set of backend_ids that have version gaps
+    std::unordered_map<int64_t, std::unordered_set<int64_t>> 
tablet_version_gap_backends;
 
     VOlapTablePartition(Block* partition_block)
             // the default value of partition bound is -1.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 79ff3d91618..5489c0669c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -2527,6 +2527,28 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
         return 
partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum();
     }
 
+    public Map<Long, List<Long>> getPartitionVersionGapBackends(long 
partitionId) {
+        Map<Long, List<Long>> result = new HashMap<>();
+        Partition partition = getPartition(partitionId);
+        if (partition == null) {
+            return result;
+        }
+        for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.ALL)) {
+            for (Tablet tablet : index.getTablets()) {
+                List<Long> gapBackends = new ArrayList<>();
+                for (Replica replica : tablet.getReplicas()) {
+                    if (replica.getLastFailedVersion() >= 0) {
+                        
gapBackends.add(replica.getBackendIdWithoutException());
+                    }
+                }
+                if (!gapBackends.isEmpty()) {
+                    result.put(tablet.getId(), gapBackends);
+                }
+            }
+        }
+        return result;
+    }
+
     public int getLoadRequiredReplicaNum(long partitionId) {
         int totalReplicaNum = getPartitionTotalReplicasNum(partitionId);
         int minLoadReplicaNum = getMinLoadReplicaNum();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index 6e1b00bf2bf..699a4c31d49 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -228,6 +228,10 @@ public class OlapTableSink extends DataSink {
         for (TOlapTablePartition partition : 
tOlapTablePartitionParam.getPartitions()) {
             
partition.setTotalReplicaNum(dstTable.getPartitionTotalReplicasNum(partition.getId()));
             
partition.setLoadRequiredReplicaNum(dstTable.getLoadRequiredReplicaNum(partition.getId()));
+            Map<Long, List<Long>> gapBackends = 
dstTable.getPartitionVersionGapBackends(partition.getId());
+            if (!gapBackends.isEmpty()) {
+                partition.setTabletVersionGapBackends(gapBackends);
+            }
         }
         tOlapTableLocationParams = createLocation(tSink.getDbId(), dstTable);
 
@@ -282,6 +286,10 @@ public class OlapTableSink extends DataSink {
         for (TOlapTablePartition partition : 
tOlapTablePartitionParam.getPartitions()) {
             
partition.setTotalReplicaNum(dstTable.getPartitionTotalReplicasNum(partition.getId()));
             
partition.setLoadRequiredReplicaNum(dstTable.getLoadRequiredReplicaNum(partition.getId()));
+            Map<Long, List<Long>> gapBackends = 
dstTable.getPartitionVersionGapBackends(partition.getId());
+            if (!gapBackends.isEmpty()) {
+                partition.setTabletVersionGapBackends(gapBackends);
+            }
         }
         tOlapTableLocationParams = createLocation(tSink.getDbId(), dstTable);
 
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index eb12834fa18..730a74ae6f3 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -277,6 +277,9 @@ struct TOlapTablePartition {
     11: optional i64 load_tablet_idx
     12: optional i32 total_replica_num
     13: optional i32 load_required_replica_num
+    // tablet_id -> list of backend_ids that have version gaps 
(lastFailedVersion >= 0)
+    // used by BE to exclude these backends from success counting in majority 
write
+    14: optional map<i64, list<i64>> tablet_version_gap_backends
 }
 
 struct TOlapTablePartitionParam {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to