This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 84f61c986bc branch-3.0-pick: [Fix](cloud) Fix dup key problem when 
`enable_new_tablet_do_compaction=true` (#48399) (#49019)
84f61c986bc is described below

commit 84f61c986bc6596e5b83e24d9e59a6df214fb553
Author: bobhan1 <bao...@selectdb.com>
AuthorDate: Fri Mar 14 09:59:59 2025 +0800

    branch-3.0-pick: [Fix](cloud) Fix dup key problem when 
`enable_new_tablet_do_compaction=true` (#48399) (#49019)
    
    pick https://github.com/apache/doris/pull/48399
---
 be/src/cloud/cloud_compaction_stop_token.cpp       | 125 ++++++++++++++++++
 be/src/cloud/cloud_compaction_stop_token.h         |  45 +++++++
 .../cloud/cloud_cumulative_compaction_policy.cpp   |   7 +-
 be/src/cloud/cloud_schema_change_job.cpp           |   7 +
 be/src/cloud/cloud_storage_engine.cpp              |  65 ++++++++++
 be/src/cloud/cloud_storage_engine.h                |   8 ++
 cloud/src/meta-service/meta_service_job.cpp        |  79 +++++++++++-
 gensrc/proto/cloud.proto                           |   1 +
 .../cloud/test_cloud_mow_new_tablet_compaction.out | Bin 0 -> 206 bytes
 .../test_cloud_mow_new_tablet_compaction.groovy    | 143 +++++++++++++++++++++
 10 files changed, 477 insertions(+), 3 deletions(-)

diff --git a/be/src/cloud/cloud_compaction_stop_token.cpp 
b/be/src/cloud/cloud_compaction_stop_token.cpp
new file mode 100644
index 00000000000..9d6f1b614a6
--- /dev/null
+++ b/be/src/cloud/cloud_compaction_stop_token.cpp
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_compaction_stop_token.h"
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/config.h"
+#include "common/logging.h"
+#include "gen_cpp/cloud.pb.h"
+
+namespace doris {
+
+CloudCompactionStopToken::CloudCompactionStopToken(CloudStorageEngine& engine,
+                                                   CloudTabletSPtr tablet, 
int64_t initiator)
+        : _engine {engine}, _tablet {std::move(tablet)}, _initiator(initiator) 
{
+    auto uuid = UUIDGenerator::instance()->next_uuid();
+    std::stringstream ss;
+    ss << uuid;
+    _uuid = ss.str();
+}
+
+void CloudCompactionStopToken::do_lease() {
+    cloud::TabletJobInfoPB job;
+    auto* idx = job.mutable_idx();
+    idx->set_tablet_id(_tablet->tablet_id());
+    idx->set_table_id(_tablet->table_id());
+    idx->set_index_id(_tablet->index_id());
+    idx->set_partition_id(_tablet->partition_id());
+    auto* compaction_job = job.add_compaction();
+    compaction_job->set_id(_uuid);
+    using namespace std::chrono;
+    int64_t lease_time = 
duration_cast<seconds>(system_clock::now().time_since_epoch()).count() +
+                         (config::lease_compaction_interval_seconds * 4);
+    compaction_job->set_lease(lease_time);
+    auto st = _engine.meta_mgr().lease_tablet_job(job);
+    if (!st.ok()) {
+        LOG_WARNING("failed to lease compaction stop token")
+                .tag("job_id", _uuid)
+                .tag("delete_bitmap_lock_initiator", _initiator)
+                .tag("tablet_id", _tablet->tablet_id())
+                .error(st);
+    }
+}
+
+Status CloudCompactionStopToken::do_register() {
+    int64_t base_compaction_cnt = 0;
+    int64_t cumulative_compaction_cnt = 0;
+    {
+        std::lock_guard lock {_tablet->get_header_lock()};
+        base_compaction_cnt = _tablet->base_compaction_cnt();
+        cumulative_compaction_cnt = _tablet->cumulative_compaction_cnt();
+    }
+    cloud::TabletJobInfoPB job;
+    auto* idx = job.mutable_idx();
+    idx->set_tablet_id(_tablet->tablet_id());
+    idx->set_table_id(_tablet->table_id());
+    idx->set_index_id(_tablet->index_id());
+    idx->set_partition_id(_tablet->partition_id());
+    auto* compaction_job = job.add_compaction();
+    compaction_job->set_id(_uuid);
+    compaction_job->set_delete_bitmap_lock_initiator(_initiator);
+    compaction_job->set_initiator(BackendOptions::get_localhost() + ':' +
+                                  
std::to_string(config::heartbeat_service_port));
+    compaction_job->set_type(cloud::TabletCompactionJobPB::STOP_TOKEN);
+    // required by MS to check if it's a valid compaction job
+    compaction_job->set_base_compaction_cnt(base_compaction_cnt);
+    compaction_job->set_cumulative_compaction_cnt(cumulative_compaction_cnt);
+    using namespace std::chrono;
+    int64_t now = 
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
+    compaction_job->set_expiration(now + config::compaction_timeout_seconds);
+    compaction_job->set_lease(now + (config::lease_compaction_interval_seconds 
* 4));
+    cloud::StartTabletJobResponse resp;
+    auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
+    if (!st.ok()) {
+        LOG_WARNING("failed to register compaction stop token")
+                .tag("job_id", _uuid)
+                .tag("delete_bitmap_lock_initiator", _initiator)
+                .tag("tablet_id", _tablet->tablet_id())
+                .error(st);
+    }
+    return st;
+}
+
+Status CloudCompactionStopToken::do_unregister() {
+    cloud::TabletJobInfoPB job;
+    auto* idx = job.mutable_idx();
+    idx->set_tablet_id(_tablet->tablet_id());
+    idx->set_table_id(_tablet->table_id());
+    idx->set_index_id(_tablet->index_id());
+    idx->set_partition_id(_tablet->partition_id());
+    auto* compaction_job = job.add_compaction();
+    compaction_job->set_id(_uuid);
+    compaction_job->set_delete_bitmap_lock_initiator(_initiator);
+    compaction_job->set_initiator(BackendOptions::get_localhost() + ':' +
+                                  
std::to_string(config::heartbeat_service_port));
+    compaction_job->set_type(cloud::TabletCompactionJobPB::STOP_TOKEN);
+    auto st = _engine.meta_mgr().abort_tablet_job(job);
+    if (!st.ok()) {
+        LOG_WARNING("failed to unregister compaction stop token")
+                .tag("job_id", _uuid)
+                .tag("delete_bitmap_lock_initiator", _initiator)
+                .tag("tablet_id", _tablet->tablet_id())
+                .error(st);
+    }
+    return st;
+}
+
+int64_t CloudCompactionStopToken::initiator() const {
+    return _initiator;
+}
+} // namespace doris
diff --git a/be/src/cloud/cloud_compaction_stop_token.h 
b/be/src/cloud/cloud_compaction_stop_token.h
new file mode 100644
index 00000000000..ce61ebc3747
--- /dev/null
+++ b/be/src/cloud/cloud_compaction_stop_token.h
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet.h"
+
+namespace doris {
+
+class CloudCompactionStopToken {
+public:
+    CloudCompactionStopToken(CloudStorageEngine& engine, CloudTabletSPtr 
tablet, int64_t initiator);
+    ~CloudCompactionStopToken() = default;
+
+    void do_lease();
+    Status do_register();
+    Status do_unregister();
+
+    int64_t initiator() const;
+
+private:
+    CloudStorageEngine& _engine;
+    CloudTabletSPtr _tablet;
+    std::string _uuid;
+    int64_t _initiator;
+};
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp 
b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
index 9e3ca3eb3db..c49448e1998 100644
--- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
@@ -66,8 +66,13 @@ int64_t 
CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
                             input_rowsets->push_back(rowset);
                         }
                     }
+                    LOG_INFO(
+                            
"[CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_"
+                            "input_rowsets] tablet_id={}, start={}, end={}, "
+                            "input_rowsets->size()={}",
+                            target_tablet_id, start_version, end_version, 
input_rowsets->size());
+                    return input_rowsets->size();
                 }
-                return input_rowsets->size();
             })
 
     size_t promotion_size = cloud_promotion_size(tablet);
diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index a109640e467..79a502587f6 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -418,6 +418,11 @@ Status 
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
             .tag("out_rowset_size", _output_rowsets.size())
             .tag("start_calc_delete_bitmap_version", 
start_calc_delete_bitmap_version)
             .tag("alter_version", alter_version);
+    
RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet,
 initiator));
+    Defer defer {[&]() {
+        
static_cast<void>(_cloud_storage_engine.unregister_compaction_stop_token(_new_tablet));
+    }};
+
     TabletMetaSharedPtr tmp_meta = 
std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta()));
     tmp_meta->delete_bitmap().delete_bitmap.clear();
     std::shared_ptr<CloudTablet> tmp_tablet =
@@ -437,6 +442,8 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t 
alter_version,
     if (max_version >= start_calc_delete_bitmap_version) {
         RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
                 {start_calc_delete_bitmap_version, max_version}, 
&incremental_rowsets));
+        
DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock",
+                        DBUG_BLOCK);
         for (auto rowset : incremental_rowsets) {
             
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, 
rowset));
         }
diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index 9c403ac8e3b..52ab28b52c3 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -25,9 +25,11 @@
 #include <rapidjson/stringbuffer.h>
 
 #include <algorithm>
+#include <mutex>
 #include <variant>
 
 #include "cloud/cloud_base_compaction.h"
+#include "cloud/cloud_compaction_stop_token.h"
 #include "cloud/cloud_cumulative_compaction.h"
 #include "cloud/cloud_cumulative_compaction_policy.h"
 #include "cloud/cloud_full_compaction.h"
@@ -37,6 +39,8 @@
 #include "cloud/cloud_txn_delete_bitmap_cache.h"
 #include "cloud/cloud_warm_up_manager.h"
 #include "cloud/config.h"
+#include "common/config.h"
+#include "common/status.h"
 #include "io/cache/block_file_cache_downloader.h"
 #include "io/cache/block_file_cache_factory.h"
 #include "io/cache/file_cache_common.h"
@@ -758,6 +762,7 @@ void 
CloudStorageEngine::_lease_compaction_thread_callback() {
         std::vector<std::shared_ptr<CloudFullCompaction>> full_compactions;
         std::vector<std::shared_ptr<CloudBaseCompaction>> base_compactions;
         std::vector<std::shared_ptr<CloudCumulativeCompaction>> 
cumu_compactions;
+        std::vector<std::shared_ptr<CloudCompactionStopToken>> 
compation_stop_tokens;
         {
             std::lock_guard lock(_compaction_mtx);
             for (auto& [_, base] : _submitted_base_compactions) {
@@ -775,8 +780,16 @@ void 
CloudStorageEngine::_lease_compaction_thread_callback() {
                     full_compactions.push_back(full);
                 }
             }
+            for (auto& [_, stop_token] : _active_compaction_stop_tokens) {
+                if (stop_token) {
+                    compation_stop_tokens.push_back(stop_token);
+                }
+            }
         }
         // TODO(plat1ko): Support batch lease rpc
+        for (auto& stop_token : compation_stop_tokens) {
+            stop_token->do_lease();
+        }
         for (auto& comp : full_compactions) {
             comp->do_lease();
         }
@@ -854,5 +867,57 @@ std::shared_ptr<CloudCumulativeCompactionPolicy> 
CloudStorageEngine::cumu_compac
     return _cumulative_compaction_policies.at(compaction_policy);
 }
 
+Status CloudStorageEngine::register_compaction_stop_token(CloudTabletSPtr 
tablet,
+                                                          int64_t initiator) {
+    {
+        std::lock_guard lock(_compaction_mtx);
+        auto [_, success] = 
_active_compaction_stop_tokens.emplace(tablet->tablet_id(), nullptr);
+        if (!success) {
+            return Status::AlreadyExist("stop token already exists for 
tablet_id={}",
+                                        tablet->tablet_id());
+        }
+    }
+
+    auto stop_token = std::make_shared<CloudCompactionStopToken>(*this, 
tablet, initiator);
+    auto st = stop_token->do_register();
+
+    if (!st.ok()) {
+        std::lock_guard lock(_compaction_mtx);
+        _active_compaction_stop_tokens.erase(tablet->tablet_id());
+        return st;
+    }
+
+    {
+        std::lock_guard lock(_compaction_mtx);
+        _active_compaction_stop_tokens[tablet->tablet_id()] = stop_token;
+    }
+    LOG_INFO(
+            "successfully register compaction stop token for tablet_id={}, "
+            "delete_bitmap_lock_initiator={}",
+            tablet->tablet_id(), initiator);
+    return st;
+}
+
+Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr 
tablet) {
+    std::shared_ptr<CloudCompactionStopToken> stop_token;
+    {
+        std::lock_guard lock(_compaction_mtx);
+        if (auto it = _active_compaction_stop_tokens.find(tablet->tablet_id());
+            it != _active_compaction_stop_tokens.end()) {
+            stop_token = it->second;
+        } else {
+            return Status::NotFound("stop token not found for tablet_id={}", 
tablet->tablet_id());
+        }
+        _active_compaction_stop_tokens.erase(tablet->tablet_id());
+    }
+    // stop token will be removed when SC commit or abort
+    // RETURN_IF_ERROR(stop_token->do_unregister());
+    LOG_INFO(
+            "successfully unregister compaction stop token for tablet_id={}, "
+            "delete_bitmap_lock_initiator={}",
+            tablet->tablet_id(), stop_token->initiator());
+    return Status::OK();
+}
+
 #include "common/compile_check_end.h"
 } // namespace doris
diff --git a/be/src/cloud/cloud_storage_engine.h 
b/be/src/cloud/cloud_storage_engine.h
index 5e51285d93a..6381fbe6001 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -44,6 +44,7 @@ class CloudBaseCompaction;
 class CloudFullCompaction;
 class TabletHotspot;
 class CloudWarmUpManager;
+class CloudCompactionStopToken;
 
 class CloudStorageEngine final : public BaseStorageEngine {
 public:
@@ -143,6 +144,10 @@ public:
         return *_sync_load_for_tablets_thread_pool;
     }
 
+    Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t 
initiator);
+
+    Status unregister_compaction_stop_token(CloudTabletSPtr tablet);
+
 private:
     void _refresh_storage_vault_info_thread_callback();
     void _vacuum_stale_rowsets_thread_callback();
@@ -188,6 +193,9 @@ private:
     // tablet_id -> submitted cumu compactions, guarded by `_compaction_mtx`
     std::unordered_map<int64_t, 
std::vector<std::shared_ptr<CloudCumulativeCompaction>>>
             _submitted_cumu_compactions;
+    // tablet_id -> active compaction stop tokens
+    std::unordered_map<int64_t, std::shared_ptr<CloudCompactionStopToken>>
+            _active_compaction_stop_tokens;
 
     std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
     std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
diff --git a/cloud/src/meta-service/meta_service_job.cpp 
b/cloud/src/meta-service/meta_service_job.cpp
index 5299b85f41d..99b61e7ebf0 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -20,6 +20,7 @@
 #include <gen_cpp/cloud.pb.h>
 #include <glog/logging.h>
 
+#include <algorithm>
 #include <chrono>
 #include <cstddef>
 #include <sstream>
@@ -61,7 +62,8 @@ bool check_compaction_input_verions(const 
TabletCompactionJobPB& compaction,
     if (!job_pb.has_schema_change() || 
!job_pb.schema_change().has_alter_version()) {
         return true;
     }
-    if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE) {
+    if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE ||
+        compaction.type() == TabletCompactionJobPB::STOP_TOKEN) {
         return true;
     }
     if (compaction.input_versions_size() != 2 ||
@@ -192,11 +194,27 @@ void start_compaction_job(MetaServiceCode& code, 
std::string& msg, std::stringst
         }), compactions.end());
         // clang-format on
         // Check conflict job
+        if (std::ranges::any_of(compactions, [](const auto& c) {
+                return c.type() == TabletCompactionJobPB::STOP_TOKEN;
+            })) {
+            auto it = std::ranges::find_if(compactions, [](const auto& c) {
+                return c.type() == TabletCompactionJobPB::STOP_TOKEN;
+            });
+            msg = fmt::format(
+                    "compactions are not allowed on tablet_id={} currently, 
blocked by schema "
+                    "change job delete_bitmap_initiator={}",
+                    tablet_id, it->delete_bitmap_lock_initiator());
+            code = MetaServiceCode::JOB_TABLET_BUSY;
+            return;
+        }
         if (compaction.type() == TabletCompactionJobPB::FULL) {
             // Full compaction is generally used for data correctness repair
             // for MOW table, so priority should be given to performing full
             // compaction operations and canceling other types of compaction.
             compactions.Clear();
+        } else if (compaction.type() == TabletCompactionJobPB::STOP_TOKEN) {
+            // fail all existing compactions
+            compactions.Clear();
         } else if ((!compaction.has_check_input_versions_range() &&
                     compaction.input_versions().empty()) ||
                    (compaction.has_check_input_versions_range() &&
@@ -1111,6 +1129,25 @@ void process_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::str
             auto job_val = recorded_job.SerializeAsString();
             txn->put(job_key, job_val);
             if (!new_tablet_job_val.empty()) {
+                auto& compactions = *new_recorded_job.mutable_compaction();
+                auto origin_size = compactions.size();
+                compactions.erase(
+                        std::remove_if(
+                                compactions.begin(), compactions.end(),
+                                [&](auto& c) {
+                                    return 
c.has_delete_bitmap_lock_initiator() &&
+                                           c.delete_bitmap_lock_initiator() ==
+                                                   
schema_change.delete_bitmap_lock_initiator();
+                                }),
+                        compactions.end());
+                if (compactions.size() < origin_size) {
+                    INSTANCE_LOG(INFO)
+                            << "remove " << (origin_size - compactions.size())
+                            << " STOP_TOKEN for schema_change job tablet_id=" 
<< tablet_id
+                            << " delete_bitmap_lock_initiator="
+                            << schema_change.delete_bitmap_lock_initiator()
+                            << " key=" << hex(job_key);
+                }
                 new_recorded_job.clear_schema_change();
                 new_tablet_job_val = new_recorded_job.SerializeAsString();
                 txn->put(new_tablet_job_key, new_tablet_job_val);
@@ -1150,7 +1187,28 @@ void process_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::str
         return;
     }
     if (schema_change.alter_version() < 2) { // no need to update stats
-        // TODO(cyx): clear schema_change job?
+                                             // TODO(cyx): clear schema_change 
job?
+        if (!new_tablet_job_val.empty()) {
+            auto& compactions = *new_recorded_job.mutable_compaction();
+            auto origin_size = compactions.size();
+            compactions.erase(
+                    std::remove_if(compactions.begin(), compactions.end(),
+                                   [&](auto& c) {
+                                       return 
c.has_delete_bitmap_lock_initiator() &&
+                                              c.delete_bitmap_lock_initiator() 
==
+                                                      
schema_change.delete_bitmap_lock_initiator();
+                                   }),
+                    compactions.end());
+            if (compactions.size() < origin_size) {
+                INSTANCE_LOG(INFO)
+                        << "remove " << (origin_size - compactions.size())
+                        << " STOP_TOKEN for schema_change job tablet_id=" << 
tablet_id
+                        << " delete_bitmap_lock_initiator="
+                        << schema_change.delete_bitmap_lock_initiator() << " 
key=" << hex(job_key);
+            }
+            new_tablet_job_val = new_recorded_job.SerializeAsString();
+            txn->put(new_tablet_job_key, new_tablet_job_val);
+        }
         need_commit = true;
         return;
     }
@@ -1287,6 +1345,23 @@ void process_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::str
     auto job_val = recorded_job.SerializeAsString();
     txn->put(job_key, job_val);
     if (!new_tablet_job_val.empty()) {
+        auto& compactions = *new_recorded_job.mutable_compaction();
+        auto origin_size = compactions.size();
+        compactions.erase(
+                std::remove_if(compactions.begin(), compactions.end(),
+                               [&](auto& c) {
+                                   return c.has_delete_bitmap_lock_initiator() 
&&
+                                          c.delete_bitmap_lock_initiator() ==
+                                                  
schema_change.delete_bitmap_lock_initiator();
+                               }),
+                compactions.end());
+        if (compactions.size() < origin_size) {
+            INSTANCE_LOG(INFO) << "remove " << (origin_size - 
compactions.size())
+                               << " STOP_TOKEN for schema_change job 
tablet_id=" << tablet_id
+                               << " delete_bitmap_lock_initiator="
+                               << schema_change.delete_bitmap_lock_initiator()
+                               << " key=" << hex(job_key);
+        }
         new_recorded_job.clear_schema_change();
         new_tablet_job_val = new_recorded_job.SerializeAsString();
         txn->put(new_tablet_job_key, new_tablet_job_val);
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index ff0279990ee..28ec3ba67d8 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -514,6 +514,7 @@ message TabletCompactionJobPB {
         CUMULATIVE = 2;
         EMPTY_CUMULATIVE = 3; // just update cumulative point
         FULL = 4;
+        STOP_TOKEN = 5; // fail existing compactions and deny newly incomming 
compactions
     }
     // IP and port of the node which initiates this job
     optional string initiator = 1; // prepare
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.out
new file mode 100644
index 00000000000..4f02f6683a2
Binary files /dev/null and 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.out
 differ
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.groovy
new file mode 100644
index 00000000000..467e9fddb43
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.groovy
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_cloud_mow_new_tablet_compaction", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+
+    GetDebugPoint().clearDebugPointsForAllFEs()
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    def customBeConfig = [
+        enable_new_tablet_do_compaction : true
+    ]
+
+    setBeConfigTemporary(customBeConfig) {
+        def table1 = "test_cloud_mow_new_tablet_compaction"
+        sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+        sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+                `k1` int NOT NULL,
+                `c1` int,
+                `c2` int,
+                `c3` int
+                )UNIQUE KEY(k1)
+            DISTRIBUTED BY HASH(k1) BUCKETS 1
+            PROPERTIES (
+                "enable_unique_key_merge_on_write" = "true",
+                "disable_auto_compaction" = "true",
+                "replication_num" = "1"); """
+
+        sql "insert into ${table1} values(1,1,1,1);"
+        sql "insert into ${table1} values(2,2,2,2);"
+        sql "insert into ${table1} values(3,3,3,2);"
+        sql "sync;"
+        qt_sql "select * from ${table1} order by k1;"
+
+        def backends = sql_return_maparray('show backends')
+        def tabletStats = sql_return_maparray("show tablets from ${table1};")
+        assert tabletStats.size() == 1
+        def tabletId = tabletStats[0].TabletId
+        def tabletBackendId = tabletStats[0].BackendId
+        def tabletBackend
+        for (def be : backends) {
+            if (be.BackendId == tabletBackendId) {
+                tabletBackend = be
+                break;
+            }
+        }
+        logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with 
backendId=${tabletBackend.BackendId}");
+
+        try {
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block")
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock")
+            sql "alter table ${table1} modify column c1 varchar(100);"
+
+            Thread.sleep(3000)
+
+            tabletStats = sql_return_maparray("show tablets from ${table1};")
+            def newTabletId = "-1"
+            for (def stat : tabletStats) {
+                if (stat.TabletId != tabletId) {
+                    newTabletId = stat.TabletId
+                    break
+                }
+            }
+
+            logger.info("new_tablet_id=${newTabletId}")
+
+            int start_ver = 5
+            int end_ver = 4
+
+            // these load will skip to calculate bitmaps in publish phase on 
new tablet because it's in NOT_READY state
+            
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+            
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+            def threads = []
+            threads << Thread.start { sql "insert into ${table1} 
values(1,99,99,99),(3,99,99,99);"}
+            ++end_ver
+            Thread.sleep(200)
+            threads << Thread.start { sql "insert into ${table1} 
values(5,88,88,88),(1,88,88,88);" }
+            ++end_ver
+            Thread.sleep(200)
+            threads << Thread.start { sql "insert into ${table1} 
values(3,77,77,77),(5,77,77,77);" }
+            ++end_ver
+            Thread.sleep(2000)
+            
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+            
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+            threads.each { it.join() }
+
+
+            // let sc capture these rowsets when calculating increment rowsets 
without lock
+            
GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block")
+            Thread.sleep(1000)
+
+            // do cumu compaction on these rowsets on new tablet
+            // this can happen when enable_new_tablet_do_compaction=true
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
+                        [tablet_id:"${newTabletId}", start_version: start_ver, 
end_version: end_ver]);
+            {
+                // trigger cumu compaction, should fail
+                logger.info("trigger cumu compaction on tablet=${newTabletId} 
BE.Host=${tabletBackend.Host} with backendId=${tabletBackend.BackendId}")
+                def (code, out, err) = 
be_run_cumulative_compaction(tabletBackend.Host, tabletBackend.HttpPort, 
newTabletId)
+                logger.info("Run compaction: code=" + code + ", out=" + out + 
", err=" + err)
+                assert code == 0
+                def compactJson = parseJson(out.trim())
+                assert "success" != compactJson.status.toLowerCase()
+            }
+
+            
GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock")
+            // wait for sc to finish
+            waitForSchemaChangeDone {
+                sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' 
ORDER BY createtime DESC LIMIT 1 """
+                time 1000
+            }
+
+            qt_dup_key_count "select k1,count() as cnt from ${table1} group by 
k1 having cnt>1;"
+            order_qt_sql "select * from ${table1};"
+            
+        } catch(Exception e) {
+            logger.info(e.getMessage())
+            throw e
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+            GetDebugPoint().clearDebugPointsForAllFEs()
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to