This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 12572b01bc7 branch-3.0: [fix](cloud-mow)Schema change fail should 
release delete bitmap lock #49889 (#50335)
12572b01bc7 is described below

commit 12572b01bc7b74da754798ded9770b7c1859e030
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Apr 24 09:51:58 2025 +0800

    branch-3.0: [fix](cloud-mow)Schema change fail should release delete bitmap 
lock #49889 (#50335)
    
    Cherry-picked from #49889
    
    Co-authored-by: huanghaibin <[email protected]>
---
 be/src/cloud/cloud_schema_change_job.cpp           |  17 ++-
 be/src/cloud/cloud_schema_change_job.h             |   1 +
 be/src/olap/compaction.cpp                         |   3 +-
 cloud/src/meta-service/meta_service_job.cpp        |   2 +-
 .../test_sc_fail_release_delete_bitmap_lock.groovy | 136 +++++++++++++++++++++
 5 files changed, 152 insertions(+), 7 deletions(-)

diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index 60d2fb42a31..bd043a0d919 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -58,7 +58,10 @@ 
CloudSchemaChangeJob::CloudSchemaChangeJob(CloudStorageEngine& cloud_storage_eng
                                            std::string job_id, int64_t 
expiration)
         : _cloud_storage_engine(cloud_storage_engine),
           _job_id(std::move(job_id)),
-          _expiration(expiration) {}
+          _expiration(expiration) {
+    _initiator = 
boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) &
+                 std::numeric_limits<int64_t>::max();
+}
 
 CloudSchemaChangeJob::~CloudSchemaChangeJob() = default;
 
@@ -375,16 +378,14 @@ Status 
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
     }};
     if (_new_tablet->enable_unique_key_merge_on_write()) {
         has_stop_token = true;
-        int64_t initiator = 
boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) &
-                            std::numeric_limits<int64_t>::max();
         // If there are historical versions of rowsets, we need to recalculate 
their delete
         // bitmaps, otherwise we will miss the delete bitmaps of incremental 
rowsets
         int64_t start_calc_delete_bitmap_version =
                 // [0-1] is a placeholder rowset, start from 2.
                 already_exist_any_version ? 2 : sc_job->alter_version() + 1;
         RETURN_IF_ERROR(_process_delete_bitmap(sc_job->alter_version(),
-                                               
start_calc_delete_bitmap_version, initiator));
-        sc_job->set_delete_bitmap_lock_initiator(initiator);
+                                               
start_calc_delete_bitmap_version, _initiator));
+        sc_job->set_delete_bitmap_lock_initiator(_initiator);
     }
 
     cloud::FinishTabletJobResponse finish_resp;
@@ -518,6 +519,12 @@ Status 
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
 }
 
 void CloudSchemaChangeJob::clean_up_on_failed() {
+    if (_new_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
+        _new_tablet->enable_unique_key_merge_on_write()) {
+        _cloud_storage_engine.meta_mgr().remove_delete_bitmap_update_lock(
+                _new_tablet->table_id(), SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, 
_initiator,
+                _new_tablet->tablet_id());
+    }
     for (const auto& output_rs : _output_rowsets) {
         if (output_rs.use_count() > 2) {
             LOG(WARNING) << "Rowset " << output_rs->rowset_id().to_string() << 
" has "
diff --git a/be/src/cloud/cloud_schema_change_job.h 
b/be/src/cloud/cloud_schema_change_job.h
index dee71cd3104..2a33443137e 100644
--- a/be/src/cloud/cloud_schema_change_job.h
+++ b/be/src/cloud/cloud_schema_change_job.h
@@ -56,6 +56,7 @@ private:
     int64_t _output_cumulative_point = 0;
     // absolute expiration time in second
     int64_t _expiration;
+    int64_t _initiator;
 };
 
 } // namespace doris
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 5331c0d6f02..e5012a36eb4 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -1335,7 +1335,8 @@ Status CloudCompactionMixin::execute_compact() {
     HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(
             execute_compact_impl(permits), [&](const doris::Exception& ex) {
                 auto st = garbage_collection();
-                if (!st.ok() && initiator() != 
INVALID_COMPACTION_INITIATOR_ID) {
+                if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
+                    _tablet->enable_unique_key_merge_on_write() && !st.ok()) {
                     // if compaction fail, be will try to abort compaction, 
and delete bitmap lock
                     // will release if abort job successfully, but if abort 
failed, delete bitmap
                     // lock will not release, in this situation, be need to 
send this rpc to ms
diff --git a/cloud/src/meta-service/meta_service_job.cpp 
b/cloud/src/meta-service/meta_service_job.cpp
index 186c80dad4a..1fcfa24deca 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -549,7 +549,7 @@ static void 
remove_delete_bitmap_update_lock(std::unique_ptr<Transaction>& txn,
     std::string lock_val;
     TxnErrorCode err = txn->get(lock_key, &lock_val);
     LOG(INFO) << "get remove delete bitmap update lock info, table_id=" << 
table_id
-              << " key=" << hex(lock_key) << " err=" << err;
+              << " key=" << hex(lock_key) << " err=" << err << " initiator=" 
<< lock_initiator;
     if (err != TxnErrorCode::TXN_OK) {
         LOG(WARNING) << "failed to get delete bitmap update lock key, 
instance_id=" << instance_id
                      << " table_id=" << table_id << " key=" << hex(lock_key) 
<< " err=" << err;
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_sc_fail_release_delete_bitmap_lock.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_sc_fail_release_delete_bitmap_lock.groovy
new file mode 100644
index 00000000000..60d995b3238
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_sc_fail_release_delete_bitmap_lock.groovy
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_sc_fail_release_delete_bitmap_lock", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+    GetDebugPoint().clearDebugPointsForAllFEs()
+
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    def backendId_to_params = [string: [:]]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+    def set_be_param = { paramName, paramValue ->
+        // for eache be node, set paramName=paramValue
+        for (String id in backendId_to_backendIP.keySet()) {
+            def beIp = backendId_to_backendIP.get(id)
+            def bePort = backendId_to_backendHttpPort.get(id)
+            def (code, out, err) = curl("POST", 
String.format("http://%s:%s/api/update_config?%s=%s";, beIp, bePort, paramName, 
paramValue))
+            assertTrue(out.contains("OK"))
+        }
+    }
+
+    def reset_be_param = { paramName ->
+        // for eache be node, reset paramName to default
+        for (String id in backendId_to_backendIP.keySet()) {
+            def beIp = backendId_to_backendIP.get(id)
+            def bePort = backendId_to_backendHttpPort.get(id)
+            def original_value = backendId_to_params.get(id).get(paramName)
+            def (code, out, err) = curl("POST", 
String.format("http://%s:%s/api/update_config?%s=%s";, beIp, bePort, paramName, 
original_value))
+            assertTrue(out.contains("OK"))
+        }
+    }
+
+    def get_be_param = { paramName ->
+        // for eache be node, get param value by default
+        def paramValue = ""
+        for (String id in backendId_to_backendIP.keySet()) {
+            def beIp = backendId_to_backendIP.get(id)
+            def bePort = backendId_to_backendHttpPort.get(id)
+            // get the config value from be
+            def (code, out, err) = curl("GET", 
String.format("http://%s:%s/api/show_config?conf_item=%s";, beIp, bePort, 
paramName))
+            assertTrue(code == 0)
+            assertTrue(out.contains(paramName))
+            // parsing
+            def resultList = parseJson(out)[0]
+            assertTrue(resultList.size() == 4)
+            // get original value
+            paramValue = resultList[2]
+            backendId_to_params.get(id, [:]).put(paramName, paramValue)
+        }
+    }
+    def customFeConfig = [schema_change_max_retry_time: 0]
+    def tableName = "tbl_basic"
+
+    def do_insert_into = {
+        sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1, "Emily", 
25),(2, "Benjamin", 35);"""
+    }
+
+    def waitForSC = {
+        Awaitility.await().atMost(120, TimeUnit.SECONDS).pollDelay(100, 
TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until(() -> {
+            def res = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE 
TableName='${tableName}' ORDER BY createtime DESC LIMIT 1"
+            assert res.size() == 1
+            log.info("res[0].State:" + res[0].State)
+            if (res[0].State == "FINISHED" || res[0].State == "CANCELLED") {
+                return true;
+            }
+            return false;
+        });
+    }
+
+    try {
+        // create table
+        sql """ drop table if exists ${tableName}; """
+
+        sql """
+        CREATE TABLE `${tableName}` (
+            `id` int(11) NOT NULL,
+            `name` varchar(10) NULL,
+            `score` int(11) NULL
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+            "disable_auto_compaction" = "true",
+            "enable_unique_key_merge_on_write" = "true",
+            "replication_num" = "1"
+        );
+        """
+        setFeConfigTemporary(customFeConfig) {
+            get_be_param("delete_bitmap_lock_expiration_seconds")
+            set_be_param("delete_bitmap_lock_expiration_seconds", "60")
+            sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1, 
"AAA", 15);"""
+            sql """ INSERT INTO ${tableName} (id, name, score) VALUES (2, 
"BBB", 25);"""
+            sql """ INSERT INTO ${tableName} (id, name, score) VALUES (3, 
"CCC", 35);"""
+            sql """ INSERT INTO ${tableName} (id, name, score) VALUES (4, 
"DDD", 45);"""
+            sql """ INSERT INTO ${tableName} (id, name, score) VALUES (5, 
"EEE", 55);"""
+            def tablets = sql_return_maparray """ show tablets from 
${tableName}; """
+            logger.info("tablets: " + tablets)
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::test_update_delete_bitmap_fail")
+            sql "alter table ${tableName} modify column score varchar(100);"
+            waitForSC()
+            def res = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE 
TableName='${tableName}' ORDER BY createtime DESC LIMIT 1"
+            assert res[0].State == "CANCELLED"
+            assert res[0].Msg.contains("[DELETE_BITMAP_LOCK_ERROR]test update 
delete bitmap failed")
+            
GetDebugPoint().disableDebugPointForAllBEs("CloudMetaMgr::test_update_delete_bitmap_fail")
+            now = System.currentTimeMillis()
+            do_insert_into()
+            time_cost = System.currentTimeMillis() - now
+            log.info("time_cost(ms): ${time_cost}")
+            assertTrue(time_cost < 10000, "wait time should less than 10s")
+        }
+    } finally {
+        reset_be_param("delete_bitmap_lock_expiration_seconds")
+        GetDebugPoint().clearDebugPointsForAllBEs()
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to