This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 12572b01bc7 branch-3.0: [fix](cloud-mow)Schema change fail should
release delete bitmap lock #49889 (#50335)
12572b01bc7 is described below
commit 12572b01bc7b74da754798ded9770b7c1859e030
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Apr 24 09:51:58 2025 +0800
branch-3.0: [fix](cloud-mow)Schema change fail should release delete bitmap
lock #49889 (#50335)
Cherry-picked from #49889
Co-authored-by: huanghaibin <[email protected]>
---
be/src/cloud/cloud_schema_change_job.cpp | 17 ++-
be/src/cloud/cloud_schema_change_job.h | 1 +
be/src/olap/compaction.cpp | 3 +-
cloud/src/meta-service/meta_service_job.cpp | 2 +-
.../test_sc_fail_release_delete_bitmap_lock.groovy | 136 +++++++++++++++++++++
5 files changed, 152 insertions(+), 7 deletions(-)
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index 60d2fb42a31..bd043a0d919 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -58,7 +58,10 @@
CloudSchemaChangeJob::CloudSchemaChangeJob(CloudStorageEngine& cloud_storage_eng
std::string job_id, int64_t
expiration)
: _cloud_storage_engine(cloud_storage_engine),
_job_id(std::move(job_id)),
- _expiration(expiration) {}
+ _expiration(expiration) {
+ _initiator =
boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) &
+ std::numeric_limits<int64_t>::max();
+}
CloudSchemaChangeJob::~CloudSchemaChangeJob() = default;
@@ -375,16 +378,14 @@ Status
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
}};
if (_new_tablet->enable_unique_key_merge_on_write()) {
has_stop_token = true;
- int64_t initiator =
boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) &
- std::numeric_limits<int64_t>::max();
// If there are historical versions of rowsets, we need to recalculate
their delete
// bitmaps, otherwise we will miss the delete bitmaps of incremental
rowsets
int64_t start_calc_delete_bitmap_version =
// [0-1] is a placeholder rowset, start from 2.
already_exist_any_version ? 2 : sc_job->alter_version() + 1;
RETURN_IF_ERROR(_process_delete_bitmap(sc_job->alter_version(),
-
start_calc_delete_bitmap_version, initiator));
- sc_job->set_delete_bitmap_lock_initiator(initiator);
+
start_calc_delete_bitmap_version, _initiator));
+ sc_job->set_delete_bitmap_lock_initiator(_initiator);
}
cloud::FinishTabletJobResponse finish_resp;
@@ -518,6 +519,12 @@ Status
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
}
void CloudSchemaChangeJob::clean_up_on_failed() {
+ if (_new_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
+ _new_tablet->enable_unique_key_merge_on_write()) {
+ _cloud_storage_engine.meta_mgr().remove_delete_bitmap_update_lock(
+ _new_tablet->table_id(), SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID,
_initiator,
+ _new_tablet->tablet_id());
+ }
for (const auto& output_rs : _output_rowsets) {
if (output_rs.use_count() > 2) {
LOG(WARNING) << "Rowset " << output_rs->rowset_id().to_string() <<
" has "
diff --git a/be/src/cloud/cloud_schema_change_job.h
b/be/src/cloud/cloud_schema_change_job.h
index dee71cd3104..2a33443137e 100644
--- a/be/src/cloud/cloud_schema_change_job.h
+++ b/be/src/cloud/cloud_schema_change_job.h
@@ -56,6 +56,7 @@ private:
int64_t _output_cumulative_point = 0;
// absolute expiration time in second
int64_t _expiration;
+ int64_t _initiator;
};
} // namespace doris
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 5331c0d6f02..e5012a36eb4 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -1335,7 +1335,8 @@ Status CloudCompactionMixin::execute_compact() {
HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(
execute_compact_impl(permits), [&](const doris::Exception& ex) {
auto st = garbage_collection();
- if (!st.ok() && initiator() !=
INVALID_COMPACTION_INITIATOR_ID) {
+ if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
+ _tablet->enable_unique_key_merge_on_write() && !st.ok()) {
// if compaction fail, be will try to abort compaction,
and delete bitmap lock
// will release if abort job successfully, but if abort
failed, delete bitmap
// lock will not release, in this situation, be need to
send this rpc to ms
diff --git a/cloud/src/meta-service/meta_service_job.cpp
b/cloud/src/meta-service/meta_service_job.cpp
index 186c80dad4a..1fcfa24deca 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -549,7 +549,7 @@ static void
remove_delete_bitmap_update_lock(std::unique_ptr<Transaction>& txn,
std::string lock_val;
TxnErrorCode err = txn->get(lock_key, &lock_val);
LOG(INFO) << "get remove delete bitmap update lock info, table_id=" <<
table_id
- << " key=" << hex(lock_key) << " err=" << err;
+ << " key=" << hex(lock_key) << " err=" << err << " initiator="
<< lock_initiator;
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to get delete bitmap update lock key,
instance_id=" << instance_id
<< " table_id=" << table_id << " key=" << hex(lock_key)
<< " err=" << err;
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_sc_fail_release_delete_bitmap_lock.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_sc_fail_release_delete_bitmap_lock.groovy
new file mode 100644
index 00000000000..60d995b3238
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/cloud/test_sc_fail_release_delete_bitmap_lock.groovy
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_sc_fail_release_delete_bitmap_lock", "nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
+ GetDebugPoint().clearDebugPointsForAllFEs()
+
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ def backendId_to_params = [string: [:]]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def set_be_param = { paramName, paramValue ->
+ // for eache be node, set paramName=paramValue
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
paramValue))
+ assertTrue(out.contains("OK"))
+ }
+ }
+
+ def reset_be_param = { paramName ->
+ // for eache be node, reset paramName to default
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ def original_value = backendId_to_params.get(id).get(paramName)
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
original_value))
+ assertTrue(out.contains("OK"))
+ }
+ }
+
+ def get_be_param = { paramName ->
+ // for eache be node, get param value by default
+ def paramValue = ""
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ // get the config value from be
+ def (code, out, err) = curl("GET",
String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort,
paramName))
+ assertTrue(code == 0)
+ assertTrue(out.contains(paramName))
+ // parsing
+ def resultList = parseJson(out)[0]
+ assertTrue(resultList.size() == 4)
+ // get original value
+ paramValue = resultList[2]
+ backendId_to_params.get(id, [:]).put(paramName, paramValue)
+ }
+ }
+ def customFeConfig = [schema_change_max_retry_time: 0]
+ def tableName = "tbl_basic"
+
+ def do_insert_into = {
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1, "Emily",
25),(2, "Benjamin", 35);"""
+ }
+
+ def waitForSC = {
+ Awaitility.await().atMost(120, TimeUnit.SECONDS).pollDelay(100,
TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until(() -> {
+ def res = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE
TableName='${tableName}' ORDER BY createtime DESC LIMIT 1"
+ assert res.size() == 1
+ log.info("res[0].State:" + res[0].State)
+ if (res[0].State == "FINISHED" || res[0].State == "CANCELLED") {
+ return true;
+ }
+ return false;
+ });
+ }
+
+ try {
+ // create table
+ sql """ drop table if exists ${tableName}; """
+
+ sql """
+ CREATE TABLE `${tableName}` (
+ `id` int(11) NOT NULL,
+ `name` varchar(10) NULL,
+ `score` int(11) NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "replication_num" = "1"
+ );
+ """
+ setFeConfigTemporary(customFeConfig) {
+ get_be_param("delete_bitmap_lock_expiration_seconds")
+ set_be_param("delete_bitmap_lock_expiration_seconds", "60")
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (1,
"AAA", 15);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (2,
"BBB", 25);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (3,
"CCC", 35);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (4,
"DDD", 45);"""
+ sql """ INSERT INTO ${tableName} (id, name, score) VALUES (5,
"EEE", 55);"""
+ def tablets = sql_return_maparray """ show tablets from
${tableName}; """
+ logger.info("tablets: " + tablets)
+
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::test_update_delete_bitmap_fail")
+ sql "alter table ${tableName} modify column score varchar(100);"
+ waitForSC()
+ def res = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE
TableName='${tableName}' ORDER BY createtime DESC LIMIT 1"
+ assert res[0].State == "CANCELLED"
+ assert res[0].Msg.contains("[DELETE_BITMAP_LOCK_ERROR]test update
delete bitmap failed")
+
GetDebugPoint().disableDebugPointForAllBEs("CloudMetaMgr::test_update_delete_bitmap_fail")
+ now = System.currentTimeMillis()
+ do_insert_into()
+ time_cost = System.currentTimeMillis() - now
+ log.info("time_cost(ms): ${time_cost}")
+ assertTrue(time_cost < 10000, "wait time should less than 10s")
+ }
+ } finally {
+ reset_be_param("delete_bitmap_lock_expiration_seconds")
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]