This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 70a44489e23 branch-3.0: [Fix](cloud-mow) Rollup task should retry when 
encouter TXN_CONFILCT in cloud mode #50705 (#50836)
70a44489e23 is described below

commit 70a44489e236ba445267724c09f65f298089d50c
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue May 13 21:31:19 2025 +0800

    branch-3.0: [Fix](cloud-mow) Rollup task should retry when encouter 
TXN_CONFILCT in cloud mode #50705 (#50836)
    
    Cherry-picked from #50705
    
    Co-authored-by: bobhan1 <[email protected]>
---
 .../java/org/apache/doris/alter/RollupJobV2.java   |  13 ++-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |   5 +-
 .../cloud/test_cloud_mow_sync_mv.out               | Bin 0 -> 454 bytes
 .../cloud/test_cloud_mow_sync_mv.groovy            | 115 +++++++++++++++++++++
 4 files changed, 128 insertions(+), 5 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 1d09224e6fa..54b62041889 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -64,6 +64,7 @@ import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.AlterReplicaTask;
 import org.apache.doris.task.CreateReplicaTask;
+import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TStorageType;
@@ -526,10 +527,18 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
             List<AgentTask> tasks = rollupBatchTask.getUnfinishedTasks(2000);
             ensureCloudClusterExist(tasks);
             for (AgentTask task : tasks) {
-                if (task.getFailedTimes() > 0) {
+                int maxFailedTimes = 0;
+                if (Config.isCloudMode() && 
Config.enable_schema_change_retry_in_cloud_mode) {
+                    if (task.getErrorCode() != null && task.getErrorCode()
+                            .equals(TStatusCode.DELETE_BITMAP_LOCK_ERROR)) {
+                        maxFailedTimes = Config.schema_change_max_retry_time;
+                    }
+                }
+                if (task.getFailedTimes() > maxFailedTimes) {
                     task.setFinished(true);
                     AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.ALTER, task.getSignature());
-                    LOG.warn("rollup task failed: " + task.getErrorMsg());
+                    LOG.warn("rollup task failed, failedTimes: {}, 
maxFailedTimes: {}, err: {}",
+                            task.getFailedTimes(), maxFailedTimes, 
task.getErrorMsg());
                     List<Long> failedBackends = 
failedTabletBackends.get(task.getTabletId());
                     if (failedBackends == null) {
                         failedBackends = Lists.newArrayList();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 89e3c6249c8..8141936033c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -583,15 +583,14 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                     if (task.getErrorCode() != null && task.getErrorCode()
                             .equals(TStatusCode.DELETE_BITMAP_LOCK_ERROR)) {
                         maxFailedTimes = Config.schema_change_max_retry_time;
-                        LOG.warn("schema change task failed: {}, set 
maxFailedTimes {}", task.getErrorMsg(),
-                                maxFailedTimes);
                     }
                 }
                 if (task.getFailedTimes() > maxFailedTimes) {
                     task.setFinished(true);
                     if (!FeConstants.runningUnitTest) {
                         AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.ALTER, task.getSignature());
-                        LOG.warn("schema change task failed: {}", 
task.getErrorMsg());
+                        LOG.warn("schema change task failed, failedTimes: {}, 
maxFailedTimes: {}, err: {}",
+                                task.getFailedTimes(), maxFailedTimes, 
task.getErrorMsg());
                         List<Long> failedBackends = 
failedTabletBackends.get(task.getTabletId());
                         if (failedBackends == null) {
                             failedBackends = Lists.newArrayList();
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_sync_mv.out 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_sync_mv.out
new file mode 100644
index 00000000000..47a82e61109
Binary files /dev/null and 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_sync_mv.out 
differ
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_sync_mv.groovy 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_sync_mv.groovy
new file mode 100644
index 00000000000..eec2a0adb6f
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_sync_mv.groovy
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_mow_sync_mv", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+
+    try {
+        GetDebugPoint().clearDebugPointsForAllFEs()
+        GetDebugPoint().clearDebugPointsForAllBEs()
+
+        String db = context.config.getDbNameByFile(context.file)
+        sql "use ${db}"
+
+        def table1 = "test_cloud_mow_sync_mv"
+        sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+        sql """
+        CREATE TABLE ${table1} (
+        `l_orderkey` BIGINT NULL,
+        `l_linenumber` INT NULL,
+        `l_partkey` INT NULL,
+        `l_suppkey` INT NULL,
+        `l_shipdate` DATE not NULL,
+        `l_quantity` DECIMAL(15, 2) NULL,
+        `l_extendedprice` DECIMAL(15, 2) NULL,
+        `l_discount` DECIMAL(15, 2) NULL,
+        `l_tax` DECIMAL(15, 2) NULL,
+        `l_returnflag` VARCHAR(1) NULL,
+        `l_linestatus` VARCHAR(1) NULL,
+        `l_commitdate` DATE NULL,
+        `l_receiptdate` DATE NULL,
+        `l_shipinstruct` VARCHAR(25) NULL,
+        `l_shipmode` VARCHAR(10) NULL,
+        `l_comment` VARCHAR(44) NULL
+        ) ENGINE=OLAP
+        unique KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey, l_shipdate )
+        DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 1
+        PROPERTIES (
+        "enable_unique_key_merge_on_write" = "true",
+        "disable_auto_compaction" = "true");
+        """
+
+        sql """
+        insert into ${table1} values 
+        (null, 1, 2, 3, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', 
'2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'),
+        (null, 1, 2, 3, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', 
'2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'),
+        (1, null, 3, 1, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', 
'2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy'),
+        (1, null, 3, 1, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', 
'2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy'),
+        (3, 3, null, 2, '2023-10-19', 7.5, 8.5, 9.5, 10.5, 'k', 'o', 
'2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx'),
+        (3, 3, null, 2, '2023-10-19', 7.5, 8.5, 9.5, 10.5, 'k', 'o', 
'2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx'),
+        (1, 2, 3, null, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', 
'2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'),
+        (1, 2, 3, null, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', 
'2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'),
+        (2, 3, 2, 1, '2023-10-18', 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, 
'2023-10-18', 'a', 'b', 'yyyyyyyyy'),
+        (2, 3, 2, 1, '2023-10-18', 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, 
'2023-10-18', 'a', 'b', 'yyyyyyyyy'),
+        (3, 1, 1, 2, '2023-10-19', 7.5, 8.5, 9.5, 10.5, 'k', 'o', 
'2023-10-19', null, 'c', 'd', 'xxxxxxxxx'),
+        (3, 1, 1, 2, '2023-10-19', 7.5, 8.5, 9.5, 10.5, 'k', 'o', 
'2023-10-19', null, 'c', 'd', 'xxxxxxxxx'),
+        (1, 3, 2, 2, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', 
'2023-10-17', 'a', 'b', 'yyyyyyyyy'),
+        (1, 3, 2, 2, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', 
'2023-10-17', 'a', 'b', 'yyyyyyyyy');
+        """
+
+        def mv1 = """
+            select l_orderkey, l_linenumber, l_partkey, l_suppkey, l_shipdate,
+            substring(concat(l_returnflag, l_linestatus), 1)
+            from ${table1};
+        """
+
+        def query1 = """
+            select l_orderkey, l_linenumber, l_partkey, l_suppkey, l_shipdate,
+            substring(concat(l_returnflag, l_linestatus), 1)
+            from ${table1};
+        """
+        order_qt_query1_before "${query1}"
+
+        
GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.fail.before.commit_job")
+        Thread.sleep(1000)
+
+        def t1 = Thread.start {
+            Thread.sleep(5000)
+            
GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.fail.before.commit_job")
+        }
+        
+        create_sync_mv(db, table1, "mv1", mv1)
+        t1.join()
+
+        explain {
+            sql("""${query1}""")
+            check {result ->
+                result.contains("(mv1)") && 
result.contains("__DORIS_DELETE_SIGN__")
+            }
+        }
+        order_qt_query1_after "${query1}"
+
+    } catch(Exception e) {
+        logger.info(e.getMessage())
+        throw e
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllBEs()
+        GetDebugPoint().clearDebugPointsForAllFEs()
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to