This is an automated email from the ASF dual-hosted git repository.

luwei16 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new da82d6a3c14 [fix](cloud-compaction) prevent EMPTY_CUMULATIVE / 
BASE-CUMU races on the same tablet (#64619)
da82d6a3c14 is described below

commit da82d6a3c14b094b753d8e296a3d59712e9268e5
Author: Lijia Liu <[email protected]>
AuthorDate: Mon Jun 22 20:40:01 2026 +0800

    [fix](cloud-compaction) prevent EMPTY_CUMULATIVE / BASE-CUMU races on the 
same tablet (#64619)
    
    Bug
    ---
    BE config enable_parallel_cumu_compaction to true.
    
    ---
    On the meta-service, start_compaction_job only rejected a new job when
    its type strictly equalled an in-flight job's type. This left two races:
    
    1. EMPTY_CUMULATIVE was treated as a different type from CUMULATIVE.
    While a real CUMULATIVE [v_lo, v_hi] was still running, an
    EMPTY_CUMULATIVE could be accepted and committed, advancing
    cumulative_point past v_hi. A subsequent BASE compaction could then pull
    rowsets in [v_lo, v_hi] as input and race with the in-flight CUMULATIVE
    on the same rowsets.
    2. With check_input_versions_range=true, BASE and CUMULATIVE were never
    cross-checked against each other, so overlapping input ranges across the
    two types could be accepted concurrently.
    
    Fix
    ---
    * Normalize EMPTY_CUMULATIVE to CUMULATIVE for conflict detection so
    they belong to the same conflict family.
    * Extend the version-range conflict check to the whole rowset compaction
    family (BASE / CUMULATIVE / EMPTY_CUMULATIVE / FULL) instead of
    same-type only. Non-overlapping ranges across types are still allowed.
    * Keep version_in_compaction notification scoped to the same family so
    BE retry semantics are unchanged.
    
    Behaviour matrix (new -> active, OK = accept, BUSY = JOB_TABLET_BUSY)
    ---------------------------------------------------------------------
                             before            after
    EMPTY_CUMU vs CUMU       OK   (race)       BUSY
    CUMU       vs EMPTY_CUMU OK                BUSY
    BASE  vs CUMU  overlap   OK   (race)       BUSY
    CUMU  vs BASE  overlap   OK   (race)       BUSY
    BASE  vs CUMU  disjoint  OK                OK   (unchanged)
    same-type / FULL / STOP_TOKEN / idempotent same-id : unchanged
    
    Tests
    -----
    * EmptyCumulativeBlockedByCumulativeTest
    * BaseCumulativeCrossTypeConflictTest
    
    The cluster log:
    ```
    1. start cc
    -------------- start cc(42326-42474)  ------------
    RuntimeLogger I20260616 06:05:58.687474  1715 
cloud_cumulative_compaction.cpp:111] start CloudCumulativeCompaction, 
tablet_id=1763693245218, 
range=[42326-42474]|job_id=7c02be46-86a3-43b7-9687-9c93b1f3affe|input_rowsets=5|input_rows=427170|input_segments=5|input_rowsets_data_size=52916937|input_rowsets_index_size=0|input_rowsets_total_size=52916937|tablet_max_version=42475|cumulative_point=42326|num_rowsets=27|cumu_num_rowsets=6
    
    --------------  meta service record this cc --------------
    I20260616 06:05:58.687247 3747094 meta_service_helper.h:174] begin 
start_tablet_job remote_caller=10.2.18.57:52036 
original_client_ip=10.2.18.57:9050 request=cloud_unique_id: 
"1:1753070360:jYdIZgSo" job { idx { table_id: 1753072815281 index_id: 
1753072815282 partition_id: 1763693245203 tablet_id: 1763693245218 } compaction 
{ initiator: "10.2.18.57:9050" type: CUMULATIVE input_versions: 42326 
input_versions: 42474 base_compaction_cnt: 91 cumulative_compaction_cnt: 3590 
id: "7c02be46-86 [...]
    meta_service.10.2.16.38.INFO:I20260616 06:05:58.688797 3747093 
meta_service_job.cpp:272] (1753070360)compaction job to save 
job={"initiator":"10.2.18.57:9050","type":"CUMULATIVE","input_versions":["42326","42474"],"base_compaction_cnt":"91","cumulative_compaction_cnt":"3590","id":"7c02be46-86a3-43b7-9687-9c93b1f3affe","expiration":"1781647558","lease":"1781561238","check_input_versions_range":true}
    
    2. DELETE trigger the increase of comulative point.
    --------- meta service record the EMPTY_CUMULATIVE job. The comulative 
point has become to 42476----------
    I20260616 06:05:58.792258 1621135 meta_service_helper.h:174] begin 
start_tablet_job remote_caller=10.2.18.57:33586 
original_client_ip=10.2.18.57:9050 request=cloud_unique_id: 
"1:1753070360:jYdIZgSo" job { idx { table_id: 1753072815281 index_id: 
1753072815282 partition_id: 1763693245203 tablet_id: 1763693245218 } compaction 
{ initiator: "10.2.18.57:9050" type: EMPTY_CUMULATIVE base_compaction_cnt: 91 
cumulative_compaction_cnt: 3590 id: "92277d99-b14f-42df-bfd8-c26e75ff8052" 
lease: 1781 [...]
    I20260616 06:05:58.793903 1621138 meta_service_job.cpp:272] 
(1753070360)compaction job to save 
job={"initiator":"10.2.18.57:9050","type":"EMPTY_CUMULATIVE","base_compaction_cnt":"91","cumulative_compaction_cnt":"3590","id":"92277d99-b14f-42df-bfd8-c26e75ff8052","lease":"1781561178"}
    I20260616 06:05:58.796713 1621137 meta_service_helper.h:174] begin 
finish_tablet_job remote_caller=10.2.18.57:60878 
original_client_ip=10.2.18.57:9050 request=cloud_unique_id: 
"1:1753070360:jYdIZgSo" action: COMMIT job { idx { table_id: 1753072815281 
index_id: 1753072815282 partition_id: 1763693245203 tablet_id: 1763693245218 } 
compaction { initiator: "10.2.18.57:9050" type: EMPTY_CUMULATIVE 
input_cumulative_point: 42326 output_cumulative_point: 42476 
base_compaction_cnt: 91 cumulativ [...]
    --------- BE log the EMPTY_CUMULATIVE --------------
    RuntimeLogger I20260616 06:05:58.801268  1715 
cloud_cumulative_compaction.cpp:533] do empty cumulative compaction to update 
cumulative 
point|job_id=92277d99-b14f-42df-bfd8-c26e75ff8052|tablet_id=1763693245218|input_cumulative_point=42326|output_cumulative_point=42476
    RuntimeLogger I20260616 06:05:58.801329  1715 
cloud_cumulative_compaction.cpp:539] tablet stats=idx { table_id: 1753072815281 
index_id: 1753072815282 partition_id: 1763693245203 tablet_id: 1763693245218 } 
data_size: 25302189990 num_rows: 303319387 num_rowsets: 27 num_segments: 43 
base_compaction_cnt: 91 cumulative_compaction_cnt: 3591 cumulative_point: 42476 
last_base_compaction_time_ms: 1781539519000 last_cumu_compaction_time_ms: 
1781561158000 index_size: 0 segment_size: 25302189990
    RuntimeLogger W20260616 06:05:58.801383  1715 cloud_storage_engine.cpp:529] 
failed to submit compaction task for tablet: 1763693245218, err: 
[E-2010]cumulative compaction meet delete version
    
    
    3. start bc
     -------------- be record the base compaction (2  ~ 42431)
    RuntimeLogger I20260616 06:06:01.042435  1715 cloud_base_compaction.cpp:84] 
start CloudBaseCompaction, tablet_id=1763693245218, 
range=[2-42431]|job_id=a8e92687-2211-4d74-82aa-d6e99c3fc360|input_rowsets=21|input_rows=303315005|input_segments=39|input_rowsets_data_size=25294029764|input_rowsets_index_size=0|input_rowsets_total_size=25294029764
    -------------- meta service record the bc --------------
    I20260616 06:06:01.043434 3747099 meta_service_helper.h:174] begin 
start_tablet_job remote_caller=10.2.18.57:52174 
original_client_ip=10.2.18.57:9050 request=cloud_unique_id: 
"1:1753070360:jYdIZgSo" job { idx { table_id: 1753072815281 index_id: 
1753072815282 partition_id: 1763693245203 tablet_id: 1763693245218 } compaction 
{ initiator: "10.2.18.57:9050" type: BASE input_versions: 2 input_versions: 
42431 base_compaction_cnt: 91 cumulative_compaction_cnt: 3591 id: 
"a8e92687-2211-4d74-82 [...]
    
    RuntimeLogger I20260616 06:06:13.279381  1708 
cloud_cumulative_compaction.cpp:208] finish CloudCumulativeCompaction, 
tablet_id=1763693245218, cost=14587ms, 
range=[42326-42474]|job_id=7c02be46-86a3-43b7-9687-9c93b1f3affe|input_rowsets=5|input_rows=427170|input_segments=5|input_rowsets_data_size=52916937|input_rowsets_index_size=0|input_rowsets_total_size=52916937|output_rows=427170|output_segments=1|output_rowset_data_size=45147495|output_rowset_index_size=0|output_rowset_total_size=45
 [...]
    
    4. cc complete and
    -------------- meta service record the cc and drop rowsets whitch version 
between 42326 and 42474
    RuntimeLogger I20260616 06:06:13.279381  1708 
cloud_cumulative_compaction.cpp:208] finish CloudCumulativeCompaction, 
tablet_id=1763693245218, cost=14587ms, 
range=[42326-42474]|job_id=7c02be46-86a3-43b7-9687-9c93b1f3affe|input_rowsets=5|input_rows=427170|input_segments=5|input_rowsets_data_size=52916937|input_rowsets_index_size=0|input_rowsets_total_size=52916937|output_rows=427170|output_segments=1|output_rowset_data_size=45147495|output_rowset_index_size=0|output_rowset_total_size=45
 [...]
    
    I20260616 06:06:13.269352 3747098 meta_service_helper.h:174] begin 
finish_tablet_job remote_caller=10.2.18.57:52070 
original_client_ip=10.2.18.57:9050 request=cloud_unique_id: 
"1:1753070360:jYdIZgSo" action: COMMIT job { idx { table_id: 1753072815281 
index_id: 1753072815282 partition_id: 1763693245203 tablet_id: 1763693245218 } 
compaction { initiator: "10.2.18.57:9050" type: CUMULATIVE 
input_cumulative_point: 42476 output_cumulative_point: 42475 num_input_rowsets: 
5 num_input_segments [...]
    
    
    5. bc complete
    -------------- bc complete and generate new rs (2-42431)--------------
    RuntimeLogger I20260616 06:54:02.973222  1699 
cloud_base_compaction.cpp:293] finish CloudBaseCompaction, 
tablet_id=1763693245218, cost=2881925ms 
range=[2-42431]|job_id=a8e92687-2211-4d74-82aa-d6e99c3fc360|input_rowsets=21|input_rows=303315005|input_segments=39|input_rowsets_data_size=25294029764|input_rowsets_index_size=0|input_rowsets_total=25294029764|output_rows=303315005|output_segments=24|output_rowset_data_size=25176822135|output_rowset_index_size=0|output_rowset_total_size=2517
 [...]
    ```
    The version (2-42431) generated by bc conficts with the version
    (42326-42474) gegerated by cc.
    The compaction info:
    ```json
    {
    "rowsets": [
            "[0-1] 0 DATA NONOVERLAPPING 
0200000000000000ffffffffffea4868ffffffffffffffe8 0",
            "[2-42431] 24 DATA NONOVERLAPPING 
020000000008d8262c4a4e54ca9c6c96f347dc1417ad5db8 23.45 GB",
            "[42326-42474] 1 DATA NONOVERLAPPING 
020000000008d8252c4a4e54ca9c6c96f347dc1417ad5db8 43.06 MB",
            "[42475-42475] 0 DELETE OVERLAP_UNKNOWN 
020000000008d8242c4a4e54ca9c6c96f347dc1417ad5db8 0"
    ],
    "missing_rowsets": [
            "[42432-42325]"
        ]
    }
    ```
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [x] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
    
    ---------
    
    Co-authored-by: liutang123 <[email protected]>
---
 cloud/src/meta-service/meta_service_job.cpp |  90 ++++++++++--
 cloud/test/meta_service_job_test.cpp        | 215 ++++++++++++++++++++++++++++
 2 files changed, 294 insertions(+), 11 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_job.cpp 
b/cloud/src/meta-service/meta_service_job.cpp
index dad84f6a36a..35aedd81852 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -97,6 +97,55 @@ bool check_compaction_input_verions(const 
TabletCompactionJobPB& compaction,
     return false;
 }
 
+// Normalize compaction type for conflict detection.
+// EMPTY_CUMULATIVE only updates cumulative_point and 
cumulative_compaction_cnt without producing
+// any output rowset. It MUST be considered the same family as CUMULATIVE so 
that an EMPTY_CUMULATIVE
+// cannot be accepted while a real CUMULATIVE is still running on the same 
tablet (which would
+// otherwise advance cumulative_point past the input range of the in-flight 
cumu and let base
+// compaction race with it).
+static inline TabletCompactionJobPB::CompactionType 
normalize_compaction_type_for_conflict(
+        TabletCompactionJobPB::CompactionType t) {
+    return t == TabletCompactionJobPB::EMPTY_CUMULATIVE ? 
TabletCompactionJobPB::CUMULATIVE : t;
+}
+
+// Two compaction jobs are considered to be in the same conflict family iff 
their normalized
+// compaction types are equal, OR either side is FULL (full compaction 
conflicts with anything),
+// OR they are different in (BASE, CUMULATIVE) - in this case the conflict 
still depends on whether
+// their input version ranges overlap, callers should additionally consult 
version range checks.
+static inline bool 
is_same_conflict_family(TabletCompactionJobPB::CompactionType a,
+                                           
TabletCompactionJobPB::CompactionType b) {
+    return normalize_compaction_type_for_conflict(a) == 
normalize_compaction_type_for_conflict(b);
+}
+
+// Whether a compaction type belongs to the "rowset compaction family", i.e. 
any compaction
+// kind that operates on rowsets bounded by `cumulative_point`. Only members 
of this family
+// are subject to the cross-type version-range conflict check below. Note that 
STOP_TOKEN is
+// already filtered out by the caller (see the early-return on STOP_TOKEN in 
start_compaction_job),
+// so it is intentionally excluded here.
+static inline bool 
is_rowset_compaction_family(TabletCompactionJobPB::CompactionType t) {
+    return t == TabletCompactionJobPB::BASE || t == 
TabletCompactionJobPB::CUMULATIVE ||
+           t == TabletCompactionJobPB::EMPTY_CUMULATIVE || t == 
TabletCompactionJobPB::FULL;
+}
+
+// Whether two compaction jobs MAY conflict on the rowset range (regardless of 
the actual
+// input version range). The caller still needs to compare `input_versions` to 
make the
+// final decision.
+//
+// Conflict matrix (Plan D extends this from same-type-only to cross-type 
within the family):
+//   FULL     vs anything-in-family : true (full compaction touches the whole 
rowset range)
+//   BASE     vs BASE / CUMULATIVE  : true (their rowset ranges may overlap 
around cumu_point)
+//   CUMU     vs CUMU / BASE        : true (symmetric of the above)
+//   EMPTY_CU vs anything-in-family : true (it advances cumu_point and would 
race with the others)
+//
+// Any compaction type outside this family (or that we don't yet model) is 
conservatively NOT
+// considered conflicting here - if a new type is added later, the author MUST 
revisit this
+// function and decide its conflict semantics explicitly, instead of silently 
inheriting the
+// "everything conflicts" behaviour.
+static inline bool may_conflict_by_type(TabletCompactionJobPB::CompactionType 
a,
+                                        TabletCompactionJobPB::CompactionType 
b) {
+    return is_rowset_compaction_family(a) && is_rowset_compaction_family(b);
+}
+
 void start_compaction_job(MetaServiceCode& code, std::string& msg, 
std::stringstream& ss,
                           std::unique_ptr<Transaction>& txn, const 
StartTabletJobRequest* request,
                           StartTabletJobResponse* response, std::string& 
instance_id,
@@ -250,10 +299,16 @@ void start_compaction_job(MetaServiceCode& code, 
std::string& msg, std::stringst
                     compaction.input_versions().empty()) ||
                    (compaction.has_check_input_versions_range() &&
                     !compaction.check_input_versions_range())) {
-            // Unknown input version range, doesn't support parallel 
compaction of same type
+            // Unknown input version range, doesn't support parallel 
compaction of same family.
+            // EMPTY_CUMULATIVE is normalized to CUMULATIVE here so it 
conflicts with an in-flight
+            // CUMULATIVE on the same tablet (otherwise EMPTY_CUMULATIVE could 
advance
+            // cumulative_point past the in-flight cumu's input range and let 
base compaction race
+            // with it).
             for (auto& c : compactions) {
-                if (c.type() != compaction.type() && c.type() != 
TabletCompactionJobPB::FULL)
+                if (!is_same_conflict_family(c.type(), compaction.type()) &&
+                    c.type() != TabletCompactionJobPB::FULL) {
                     continue;
+                }
                 if (c.id() == compaction.id()) return; // Same job, return OK 
to keep idempotency
                 msg = fmt::format("compaction has already started, 
tablet_id={} job={}", tablet_id,
                                   proto_to_json(c));
@@ -270,8 +325,14 @@ void start_compaction_job(MetaServiceCode& code, 
std::string& msg, std::stringst
                        a.input_versions(1) < b.input_versions(0);
             };
             for (auto& c : compactions) {
-                if (c.type() != compaction.type() && c.type() != 
TabletCompactionJobPB::FULL)
-                    continue;
+                // Plan D: BASE and CUMULATIVE on the same tablet may also 
conflict when their
+                // input version ranges overlap. Previously we only checked 
same-type conflicts,
+                // which left a window where BASE could be accepted with 
versions that overlap an
+                // in-flight CUMULATIVE (and vice versa) after 
cumulative_point was unsafely
+                // advanced. Now we treat any pair within (BASE, CUMULATIVE, 
EMPTY_CUMULATIVE,
+                // FULL) as potentially conflicting and rely on the 
input-version-range check
+                // below to make the final decision.
+                if (!may_conflict_by_type(c.type(), compaction.type())) 
continue;
                 if (c.input_versions_size() > 0 && version_not_conflict(c, 
compaction)) continue;
                 if (c.id() == compaction.id()) return; // Same job, return OK 
to keep idempotency
                 msg = fmt::format("compaction has already started, 
tablet_id={} job={}", tablet_id,
@@ -279,14 +340,21 @@ void start_compaction_job(MetaServiceCode& code, 
std::string& msg, std::stringst
                 code = MetaServiceCode::JOB_TABLET_BUSY;
                 // Unknown version range of started compaction, BE should not 
retry other version range
                 if (c.input_versions_size() == 0) return;
-                // Notify version ranges in started compaction to BE, so BE 
can retry other version range
+                // Notify version ranges of all in-flight compactions that may 
conflict with the
+                // incoming one, so BE can retry on a non-overlapping range. 
The notification
+                // predicate is intentionally kept consistent with the 
conflict predicate above
+                // (`may_conflict_by_type`); previously only same-family 
ranges were surfaced,
+                // which left BE blind to cross-family (BASE vs CUMULATIVE) 
conflicts.
+                //
+                // An in-flight EMPTY_CUMULATIVE (or any other family member 
without a concrete
+                // [v_lo, v_hi]) carries no usable range; surfacing fabricated 
zeros would
+                // mislead BE retry. Skip such entries defensively here - the 
real conflict is
+                // already enforced by the version-range check above.
                 for (auto& c : compactions) {
-                    if (c.type() == compaction.type() || c.type() == 
TabletCompactionJobPB::FULL) {
-                        // If there are multiple started compaction of same 
type, they all must has input version range
-                        DCHECK_EQ(c.input_versions_size(), 2) << 
proto_to_json(c);
-                        
response->add_version_in_compaction(c.input_versions(0));
-                        
response->add_version_in_compaction(c.input_versions(1));
-                    }
+                    if (!may_conflict_by_type(c.type(), compaction.type())) 
continue;
+                    if (c.input_versions_size() != 2) continue;
+                    response->add_version_in_compaction(c.input_versions(0));
+                    response->add_version_in_compaction(c.input_versions(1));
                 }
                 return;
             }
diff --git a/cloud/test/meta_service_job_test.cpp 
b/cloud/test/meta_service_job_test.cpp
index 1926f6c600a..089418745b3 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -4610,6 +4610,221 @@ TEST(MetaServiceJobTest, ParallelCumuCompactionTest) {
     ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
 }
 
+// Plan A regression test: EMPTY_CUMULATIVE must be considered the same 
conflict family as
+// CUMULATIVE so that an EMPTY_CUMULATIVE submitted while a real CUMULATIVE is 
still active on the
+// same tablet is rejected with JOB_TABLET_BUSY. Otherwise EMPTY_CUMULATIVE 
could advance
+// cumulative_point past the in-flight cumu's input range and let base 
compaction race with it.
+TEST(MetaServiceJobTest, EmptyCumulativeBlockedByCumulativeTest) {
+    auto meta_service = get_meta_service();
+
+    auto sp = SyncPoint::get_instance();
+    DORIS_CLOUD_DEFER {
+        SyncPoint::get_instance()->clear_all_call_backs();
+    };
+    sp->set_call_back("get_instance_id", [&](auto&& args) {
+        auto* ret = try_any_cast_ret<std::string>(args);
+        ret->first = instance_id;
+        ret->second = true;
+    });
+    sp->enable_processing();
+
+    constexpr int64_t table_id = 1;
+    constexpr int64_t index_id = 2;
+    constexpr int64_t partition_id = 3;
+    constexpr int64_t tablet_id = 4;
+    ASSERT_NO_FATAL_FAILURE(
+            create_tablet(meta_service.get(), table_id, index_id, 
partition_id, tablet_id, false));
+
+    // Helper to start an EMPTY_CUMULATIVE job. EMPTY_CUMULATIVE has no 
input_versions and no
+    // expiration (only cumulative_point/cumulative_compaction_cnt are 
bumped), which lets it
+    // bypass `STALE_TABLET_CACHE` when both sides carry the same 
cumulative_compaction_cnt.
+    auto start_empty_cumu = [&](const std::string& job_id, const std::string& 
initiator,
+                                int base_cnt, int cumu_cnt, 
StartTabletJobResponse& res) {
+        brpc::Controller cntl;
+        StartTabletJobRequest req;
+        req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
+        auto* compaction = req.mutable_job()->add_compaction();
+        compaction->set_id(job_id);
+        compaction->set_initiator(initiator);
+        compaction->set_base_compaction_cnt(base_cnt);
+        compaction->set_cumulative_compaction_cnt(cumu_cnt);
+        compaction->set_type(TabletCompactionJobPB::EMPTY_CUMULATIVE);
+        long now = ::time(nullptr);
+        compaction->set_lease(now + 3);
+        meta_service->start_tablet_job(&cntl, &req, &res, nullptr);
+    };
+
+    // Step 1: An in-flight CUMULATIVE job [42326-42474] is registered first 
(mimics the
+    // scenario from the production log).
+    StartTabletJobResponse res;
+    start_compaction_job(meta_service.get(), tablet_id, "cumu1", "BE1", 0, 0,
+                         TabletCompactionJobPB::CUMULATIVE, res, {42326, 
42474});
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+    // Step 2: An EMPTY_CUMULATIVE arrives carrying the same 
cumulative_compaction_cnt as
+    // cumu1. Before the fix this was wrongly accepted because MS only 
compared raw enum types.
+    // After the fix, EMPTY_CUMULATIVE must be normalized to CUMULATIVE for 
conflict detection
+    // and rejected as JOB_TABLET_BUSY.
+    res.Clear();
+    start_empty_cumu("empty1", "BE1", 0, 0, res);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) << 
res.status().msg();
+    // EMPTY_CUMULATIVE has no input_versions, so BE must NOT receive any 
version range hint
+    // (the BE retry on `version_in_compaction` is meaningless for 
EMPTY_CUMULATIVE).
+    EXPECT_EQ(res.version_in_compaction_size(), 0);
+
+    // Step 3: Idempotency check - the same job_id submitted twice should 
still return OK.
+    res.Clear();
+    start_compaction_job(meta_service.get(), tablet_id, "cumu1", "BE1", 0, 0,
+                         TabletCompactionJobPB::CUMULATIVE, res, {42326, 
42474});
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+    // Step 4: A BASE compaction arrives via the same code path used by 
EMPTY_CUMULATIVE -
+    // i.e. without `input_versions`. Because is_same_conflict_family(BASE, 
CUMULATIVE) is
+    // false, BASE should still be accepted on this branch (the cross-family 
conflict is
+    // enforced only on the version-range branch validated by Plan D test 
below).
+    res.Clear();
+    start_compaction_job(meta_service.get(), tablet_id, "base1", "BE1", 0, 0,
+                         TabletCompactionJobPB::BASE, res);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+    // Step 5: A second EMPTY_CUMULATIVE should also be rejected by the 
now-active CUMULATIVE.
+    // (Even though job_pb already contains an EMPTY_CUMULATIVE-equivalent, 
the same-family
+    // check primarily catches the CUMULATIVE side here.)
+    res.Clear();
+    start_empty_cumu("empty2", "BE2", 0, 0, res);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) << 
res.status().msg();
+}
+
+// Plan D regression test: when a CUMULATIVE compaction is already running 
with `input_versions`
+// and `check_input_versions_range = true`, a BASE compaction whose version 
range overlaps with
+// the in-flight CUMULATIVE must be rejected with JOB_TABLET_BUSY. 
Non-overlapping BASE jobs are
+// still allowed, which is the typical safe case (BASE handles [0, cumu_point 
- 1] while
+// CUMULATIVE handles versions above cumu_point).
+TEST(MetaServiceJobTest, BaseCumulativeCrossTypeConflictTest) {
+    auto meta_service = get_meta_service();
+
+    auto sp = SyncPoint::get_instance();
+    DORIS_CLOUD_DEFER {
+        SyncPoint::get_instance()->clear_all_call_backs();
+    };
+    sp->set_call_back("get_instance_id", [&](auto&& args) {
+        auto* ret = try_any_cast_ret<std::string>(args);
+        ret->first = instance_id;
+        ret->second = true;
+    });
+    sp->enable_processing();
+
+    constexpr int64_t table_id = 1;
+    constexpr int64_t index_id = 2;
+    constexpr int64_t partition_id = 3;
+    constexpr int64_t tablet_id = 4;
+    ASSERT_NO_FATAL_FAILURE(
+            create_tablet(meta_service.get(), table_id, index_id, 
partition_id, tablet_id, false));
+
+    // Local helper: start a BASE compaction request that carries 
`input_versions` (matching
+    // production BE behaviour: cloud_base_compaction.cpp always calls 
add_input_versions).
+    // Note: BASE does NOT call set_check_input_versions_range, so it's left 
as default false
+    // BUT input_versions is non-empty - this routes the request into the "has 
input_versions"
+    // branch on MS, which is the branch Plan D guards.
+    auto start_base = [&](const std::string& job_id, const std::string& 
initiator, int base_cnt,
+                          int cumu_cnt, std::pair<int64_t, int64_t> versions,
+                          StartTabletJobResponse& res) {
+        brpc::Controller cntl;
+        StartTabletJobRequest req;
+        req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
+        auto* compaction = req.mutable_job()->add_compaction();
+        compaction->set_id(job_id);
+        compaction->set_initiator(initiator);
+        compaction->set_base_compaction_cnt(base_cnt);
+        compaction->set_cumulative_compaction_cnt(cumu_cnt);
+        compaction->set_type(TabletCompactionJobPB::BASE);
+        long now = ::time(nullptr);
+        compaction->set_expiration(now + 12);
+        compaction->set_lease(now + 3);
+        compaction->add_input_versions(versions.first);
+        compaction->add_input_versions(versions.second);
+        // Intentionally NOT calling set_check_input_versions_range - BASE 
relies on the
+        // default false to mimic real BE behaviour.
+        meta_service->start_tablet_job(&cntl, &req, &res, nullptr);
+    };
+
+    // Step 1: A CUMULATIVE compaction with versions [10, 20] is started with 
parallel-cumu
+    // mode enabled (check_input_versions_range = true). This routes into the
+    // version-range-aware branch on MS.
+    StartTabletJobResponse res;
+    start_compaction_job(meta_service.get(), tablet_id, "cumu1", "BE1", 0, 0,
+                         TabletCompactionJobPB::CUMULATIVE, res, {10, 20});
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+    // Step 2: BASE [5, 15] overlaps with CUMULATIVE [10, 20]. Plan D requires 
this to be
+    // rejected. Before the fix it would succeed (because the old `c.type() != 
compaction.type()`
+    // check skipped the active CUMULATIVE for a BASE submission).
+    res.Clear();
+    start_base("base_overlap_left", "BE1", 0, 0, {5, 15}, res);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) << 
res.status().msg();
+
+    // Step 3: BASE [15, 25] also overlaps. Should be rejected.
+    res.Clear();
+    start_base("base_overlap_right", "BE1", 0, 0, {15, 25}, res);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) << 
res.status().msg();
+
+    // Step 4: BASE [12, 18] is fully contained inside CUMULATIVE's range. 
Should be rejected.
+    res.Clear();
+    start_base("base_overlap_inside", "BE1", 0, 0, {12, 18}, res);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) << 
res.status().msg();
+
+    // Step 5: BASE [5, 25] fully covers the CUMULATIVE range. Should be 
rejected.
+    res.Clear();
+    start_base("base_overlap_cover", "BE1", 0, 0, {5, 25}, res);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) << 
res.status().msg();
+
+    // Step 6: BASE [0, 9] is BELOW the CUMULATIVE range. This is the typical 
safe case
+    // (base handles [0, cumu_point - 1]) and must still be accepted after 
Plan D.
+    res.Clear();
+    start_base("base_safe_below", "BE1", 0, 0, {0, 9}, res);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
+
+    // Step 7: A second BASE [21, 30] is ABOVE the CUMULATIVE range AND 
non-overlapping with the
+    // already-accepted base_safe_below [0, 9]. This is also a safe 
non-overlap case - although
+    // unusual in production (BASE rarely operates above cumu_point), MS 
should accept it.
+    res.Clear();
+    start_base("base_safe_above", "BE2", 0, 0, {21, 30}, res);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
+
+    // Step 8: A new CUMULATIVE [22, 28] overlaps with the just-accepted 
base_safe_above and
+    // must be rejected. Verifies the conflict is symmetric - CUMULATIVE 
submissions also
+    // see BASE jobs as conflicting.
+    res.Clear();
+    start_compaction_job(meta_service.get(), tablet_id, "cumu_overlap_base", 
"BE1", 0, 0,
+                         TabletCompactionJobPB::CUMULATIVE, res, {22, 28});
+    ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) << 
res.status().msg();
+    // The version_in_compaction notification predicate is kept consistent 
with the conflict
+    // predicate (`may_conflict_by_type`): every in-flight job in the rowset 
compaction family
+    // (BASE / CUMULATIVE) is surfaced so BE can pick a non-overlapping range 
to retry.
+    // Active jobs at this point: cumu1[10,20], base_safe_below[0,9], 
base_safe_above[21,30].
+    // All three carry concrete input_versions so all three must be reported.
+    ASSERT_EQ(res.version_in_compaction_size(), 6);
+    EXPECT_EQ(res.version_in_compaction(0), 10);
+    EXPECT_EQ(res.version_in_compaction(1), 20);
+    EXPECT_EQ(res.version_in_compaction(2), 0);
+    EXPECT_EQ(res.version_in_compaction(3), 9);
+    EXPECT_EQ(res.version_in_compaction(4), 21);
+    EXPECT_EQ(res.version_in_compaction(5), 30);
+
+    // Step 9: A new CUMULATIVE [30, 35] does not overlap with cumu1 [10, 20] 
but DOES overlap
+    // with base_safe_above [21, 30] (sharing version 30). Must be rejected.
+    res.Clear();
+    start_compaction_job(meta_service.get(), tablet_id, "cumu_overlap_base2", 
"BE1", 0, 0,
+                         TabletCompactionJobPB::CUMULATIVE, res, {30, 35});
+    ASSERT_EQ(res.status().code(), MetaServiceCode::JOB_TABLET_BUSY) << 
res.status().msg();
+
+    // Step 10: A new CUMULATIVE [31, 40] is fully above all active jobs and 
must be accepted.
+    res.Clear();
+    start_compaction_job(meta_service.get(), tablet_id, "cumu_safe_above", 
"BE1", 0, 0,
+                         TabletCompactionJobPB::CUMULATIVE, res, {31, 40});
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
+}
+
 TEST(MetaServiceJobTest, SchemaChangeJobPersistTest) {
     auto meta_service = get_meta_service();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to