This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 70a44489e23 branch-3.0: [Fix](cloud-mow) Rollup task should retry when
encouter TXN_CONFILCT in cloud mode #50705 (#50836)
70a44489e23 is described below
commit 70a44489e236ba445267724c09f65f298089d50c
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue May 13 21:31:19 2025 +0800
branch-3.0: [Fix](cloud-mow) Rollup task should retry when encouter
TXN_CONFILCT in cloud mode #50705 (#50836)
Cherry-picked from #50705
Co-authored-by: bobhan1 <[email protected]>
---
.../java/org/apache/doris/alter/RollupJobV2.java | 13 ++-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 5 +-
.../cloud/test_cloud_mow_sync_mv.out | Bin 0 -> 454 bytes
.../cloud/test_cloud_mow_sync_mv.groovy | 115 +++++++++++++++++++++
4 files changed, 128 insertions(+), 5 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 1d09224e6fa..54b62041889 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -64,6 +64,7 @@ import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterReplicaTask;
import org.apache.doris.task.CreateReplicaTask;
+import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
@@ -526,10 +527,18 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
List<AgentTask> tasks = rollupBatchTask.getUnfinishedTasks(2000);
ensureCloudClusterExist(tasks);
for (AgentTask task : tasks) {
- if (task.getFailedTimes() > 0) {
+ int maxFailedTimes = 0;
+ if (Config.isCloudMode() &&
Config.enable_schema_change_retry_in_cloud_mode) {
+ if (task.getErrorCode() != null && task.getErrorCode()
+ .equals(TStatusCode.DELETE_BITMAP_LOCK_ERROR)) {
+ maxFailedTimes = Config.schema_change_max_retry_time;
+ }
+ }
+ if (task.getFailedTimes() > maxFailedTimes) {
task.setFinished(true);
AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.ALTER, task.getSignature());
- LOG.warn("rollup task failed: " + task.getErrorMsg());
+ LOG.warn("rollup task failed, failedTimes: {},
maxFailedTimes: {}, err: {}",
+ task.getFailedTimes(), maxFailedTimes,
task.getErrorMsg());
List<Long> failedBackends =
failedTabletBackends.get(task.getTabletId());
if (failedBackends == null) {
failedBackends = Lists.newArrayList();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 89e3c6249c8..8141936033c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -583,15 +583,14 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
if (task.getErrorCode() != null && task.getErrorCode()
.equals(TStatusCode.DELETE_BITMAP_LOCK_ERROR)) {
maxFailedTimes = Config.schema_change_max_retry_time;
- LOG.warn("schema change task failed: {}, set
maxFailedTimes {}", task.getErrorMsg(),
- maxFailedTimes);
}
}
if (task.getFailedTimes() > maxFailedTimes) {
task.setFinished(true);
if (!FeConstants.runningUnitTest) {
AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.ALTER, task.getSignature());
- LOG.warn("schema change task failed: {}",
task.getErrorMsg());
+ LOG.warn("schema change task failed, failedTimes: {},
maxFailedTimes: {}, err: {}",
+ task.getFailedTimes(), maxFailedTimes,
task.getErrorMsg());
List<Long> failedBackends =
failedTabletBackends.get(task.getTabletId());
if (failedBackends == null) {
failedBackends = Lists.newArrayList();
diff --git
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_sync_mv.out
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_sync_mv.out
new file mode 100644
index 00000000000..47a82e61109
Binary files /dev/null and
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_sync_mv.out
differ
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_sync_mv.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_sync_mv.groovy
new file mode 100644
index 00000000000..eec2a0adb6f
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_sync_mv.groovy
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_mow_sync_mv", "nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
+
+ try {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ String db = context.config.getDbNameByFile(context.file)
+ sql "use ${db}"
+
+ def table1 = "test_cloud_mow_sync_mv"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """
+ CREATE TABLE ${table1} (
+ `l_orderkey` BIGINT NULL,
+ `l_linenumber` INT NULL,
+ `l_partkey` INT NULL,
+ `l_suppkey` INT NULL,
+ `l_shipdate` DATE not NULL,
+ `l_quantity` DECIMAL(15, 2) NULL,
+ `l_extendedprice` DECIMAL(15, 2) NULL,
+ `l_discount` DECIMAL(15, 2) NULL,
+ `l_tax` DECIMAL(15, 2) NULL,
+ `l_returnflag` VARCHAR(1) NULL,
+ `l_linestatus` VARCHAR(1) NULL,
+ `l_commitdate` DATE NULL,
+ `l_receiptdate` DATE NULL,
+ `l_shipinstruct` VARCHAR(25) NULL,
+ `l_shipmode` VARCHAR(10) NULL,
+ `l_comment` VARCHAR(44) NULL
+ ) ENGINE=OLAP
+ unique KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey, l_shipdate )
+ DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "disable_auto_compaction" = "true");
+ """
+
+ sql """
+ insert into ${table1} values
+ (null, 1, 2, 3, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k',
'2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'),
+ (null, 1, 2, 3, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k',
'2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'),
+ (1, null, 3, 1, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k',
'2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy'),
+ (1, null, 3, 1, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k',
'2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy'),
+ (3, 3, null, 2, '2023-10-19', 7.5, 8.5, 9.5, 10.5, 'k', 'o',
'2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx'),
+ (3, 3, null, 2, '2023-10-19', 7.5, 8.5, 9.5, 10.5, 'k', 'o',
'2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx'),
+ (1, 2, 3, null, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k',
'2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'),
+ (1, 2, 3, null, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k',
'2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'),
+ (2, 3, 2, 1, '2023-10-18', 5.5, 6.5, 7.5, 8.5, 'o', 'k', null,
'2023-10-18', 'a', 'b', 'yyyyyyyyy'),
+ (2, 3, 2, 1, '2023-10-18', 5.5, 6.5, 7.5, 8.5, 'o', 'k', null,
'2023-10-18', 'a', 'b', 'yyyyyyyyy'),
+ (3, 1, 1, 2, '2023-10-19', 7.5, 8.5, 9.5, 10.5, 'k', 'o',
'2023-10-19', null, 'c', 'd', 'xxxxxxxxx'),
+ (3, 1, 1, 2, '2023-10-19', 7.5, 8.5, 9.5, 10.5, 'k', 'o',
'2023-10-19', null, 'c', 'd', 'xxxxxxxxx'),
+ (1, 3, 2, 2, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17',
'2023-10-17', 'a', 'b', 'yyyyyyyyy'),
+ (1, 3, 2, 2, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17',
'2023-10-17', 'a', 'b', 'yyyyyyyyy');
+ """
+
+ def mv1 = """
+ select l_orderkey, l_linenumber, l_partkey, l_suppkey, l_shipdate,
+ substring(concat(l_returnflag, l_linestatus), 1)
+ from ${table1};
+ """
+
+ def query1 = """
+ select l_orderkey, l_linenumber, l_partkey, l_suppkey, l_shipdate,
+ substring(concat(l_returnflag, l_linestatus), 1)
+ from ${table1};
+ """
+ order_qt_query1_before "${query1}"
+
+
GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.fail.before.commit_job")
+ Thread.sleep(1000)
+
+ def t1 = Thread.start {
+ Thread.sleep(5000)
+
GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.fail.before.commit_job")
+ }
+
+ create_sync_mv(db, table1, "mv1", mv1)
+ t1.join()
+
+ explain {
+ sql("""${query1}""")
+ check {result ->
+ result.contains("(mv1)") &&
result.contains("__DORIS_DELETE_SIGN__")
+ }
+ }
+ order_qt_query1_after "${query1}"
+
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]