This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 7e8c81d1983 branch-3.0: [Opt](cloud-sc) Clear stop token when
`commit_tablet_job` fails #49275 (#49494)
7e8c81d1983 is described below
commit 7e8c81d1983616426cca96d2c4eef825e298ce12
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Mar 27 10:22:53 2025 +0800
branch-3.0: [Opt](cloud-sc) Clear stop token when `commit_tablet_job` fails
#49275 (#49494)
Cherry-picked from #49275
Co-authored-by: bobhan1 <[email protected]>
---
be/src/cloud/cloud_schema_change_job.cpp | 20 ++++--
be/src/cloud/cloud_storage_engine.cpp | 16 +++--
be/src/cloud/cloud_storage_engine.h | 2 +-
.../test_cloud_sc_self_retry_with_stop_token.out | Bin 0 -> 167 bytes
...test_cloud_sc_self_retry_with_stop_token.groovy | 76 +++++++++++++++++++++
5 files changed, 102 insertions(+), 12 deletions(-)
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index debdd587037..db2dacd7663 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -364,7 +364,16 @@ Status
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
DBUG_EXECUTE_IF("CloudSchemaChangeJob.process_alter_tablet.sleep",
DBUG_BLOCK);
// process delete bitmap if the table is MOW
+ bool has_stop_token {false};
+ bool should_clear_stop_token {true};
+ Defer defer {[&]() {
+ if (has_stop_token) {
+
static_cast<void>(_cloud_storage_engine.unregister_compaction_stop_token(
+ _new_tablet, should_clear_stop_token));
+ }
+ }};
if (_new_tablet->enable_unique_key_merge_on_write()) {
+ has_stop_token = true;
int64_t initiator =
boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) &
std::numeric_limits<int64_t>::max();
// If there are historical versions of rowsets, we need to recalculate
their delete
@@ -385,6 +394,11 @@ Status
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>("test
txn conflict");
}
});
+
DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.fail.before.commit_job",
{
+ LOG_INFO("inject retryable error before commit sc job, tablet={}",
+ _new_tablet->tablet_id());
+ return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>("injected
retryable error");
+ });
auto st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job,
&finish_resp);
if (!st.ok()) {
if (finish_resp.status().code() == cloud::JOB_ALREADY_SUCCESS) {
@@ -397,6 +411,8 @@ Status
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
return Status::OK();
}
return st;
+ } else {
+ should_clear_stop_token = false;
}
const auto& stats = finish_resp.stats();
{
@@ -419,10 +435,6 @@ Status
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
.tag("start_calc_delete_bitmap_version",
start_calc_delete_bitmap_version)
.tag("alter_version", alter_version);
RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet,
initiator));
- Defer defer {[&]() {
-
static_cast<void>(_cloud_storage_engine.unregister_compaction_stop_token(_new_tablet));
- }};
-
TabletMetaSharedPtr tmp_meta =
std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta()));
tmp_meta->delete_bitmap().delete_bitmap.clear();
std::shared_ptr<CloudTablet> tmp_tablet =
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index 2dcc01695d0..45d53642140 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -906,7 +906,7 @@ Status
CloudStorageEngine::register_compaction_stop_token(CloudTabletSPtr tablet
return st;
}
-Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr
tablet) {
+Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr
tablet, bool clear_ms) {
std::shared_ptr<CloudCompactionStopToken> stop_token;
{
std::lock_guard lock(_compaction_mtx);
@@ -918,12 +918,14 @@ Status
CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr tabl
}
_active_compaction_stop_tokens.erase(tablet->tablet_id());
}
- // stop token will be removed when SC commit or abort
- // RETURN_IF_ERROR(stop_token->do_unregister());
- LOG_INFO(
- "successfully unregister compaction stop token for tablet_id={}, "
- "delete_bitmap_lock_initiator={}",
- tablet->tablet_id(), stop_token->initiator());
+ LOG_INFO("successfully unregister compaction stop token for tablet_id={}",
tablet->tablet_id());
+ if (stop_token && clear_ms) {
+ RETURN_IF_ERROR(stop_token->do_unregister());
+ LOG_INFO(
+ "successfully remove compaction stop token from MS for
tablet_id={}, "
+ "delete_bitmap_lock_initiator={}",
+ tablet->tablet_id(), stop_token->initiator());
+ }
return Status::OK();
}
diff --git a/be/src/cloud/cloud_storage_engine.h
b/be/src/cloud/cloud_storage_engine.h
index 35be442195e..046116e3a24 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -150,7 +150,7 @@ public:
Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t
initiator);
- Status unregister_compaction_stop_token(CloudTabletSPtr tablet);
+ Status unregister_compaction_stop_token(CloudTabletSPtr tablet, bool
clear_ms);
private:
void _refresh_storage_vault_info_thread_callback();
diff --git
a/regression-test/data/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.out
b/regression-test/data/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.out
new file mode 100644
index 00000000000..69c340fa35a
Binary files /dev/null and
b/regression-test/data/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.out
differ
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.groovy
new file mode 100644
index 00000000000..5978320aa19
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.groovy
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_cloud_sc_self_retry_with_stop_token", "nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
+
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+
+ def table1 = "test_cloud_sc_self_retry_with_stop_token"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int,
+ `c3` int
+ )UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"); """
+
+ sql "insert into ${table1} values(1,1,1,1);"
+ sql "insert into ${table1} values(2,2,2,2);"
+ sql "insert into ${table1} values(3,3,3,2);"
+ sql "sync;"
+ qt_sql "select * from ${table1} order by k1;"
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.fail.before.commit_job")
+
+ sql "alter table ${table1} modify column c2 varchar(100);"
+
+ def res
+ Awaitility.await().atMost(40, TimeUnit.SECONDS).pollDelay(1,
TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> {
+ res = sql_return_maparray """ SHOW ALTER TABLE COLUMN WHERE
TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """
+ logger.info("res: ${res}")
+ if (res[0].State == "FINISHED" || res[0].State == "CANCELLED") {
+ return true;
+ }
+ return false;
+ });
+
+ assert res[0].State == "CANCELLED"
+ assert !res[0].Msg.contains("compactions are not allowed on
tablet_id") && !res[0].Msg.contains("stop token already exists")
+ assert res[0].Msg.contains("DELETE_BITMAP_LOCK_ERROR")
+
+ qt_sql "select * from ${table1} order by k1;"
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]