This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new eb3ee763699 branch-4.0: [fix](test) deflake test_iot_auto_detect_fail 
by accepting equivalent error messages (#64580)
eb3ee763699 is described below

commit eb3ee763699a62aaddbd18de1778e85894b95c60
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Sun Jun 21 16:10:09 2026 +0800

    branch-4.0: [fix](test) deflake test_iot_auto_detect_fail by accepting 
equivalent error messages (#64580)
---
 .../compaction/test_compaction_agg_keys.groovy     | 42 +++++++------
 .../test_compaction_agg_keys_with_array_map.groovy | 43 +++++++------
 .../test_compaction_agg_keys_with_delete.groovy    | 43 +++++++------
 .../compaction/test_compaction_uniq_keys.groovy    | 36 ++++++-----
 .../compaction/test_compaction_uniq_keys_ck.groovy | 40 +++++++-----
 .../test_compaction_uniq_keys_row_store_ck.groovy  | 39 +++++++-----
 .../test_compaction_uniq_keys_with_delete.groovy   | 38 +++++++-----
 ...test_compaction_uniq_keys_with_delete_ck.groovy | 38 +++++++-----
 .../test_compaction_with_empty_rowset.groovy       | 71 +++++++++++++---------
 .../test_vertical_compaction_agg_keys.groovy       | 39 +++++++-----
 .../test_vertical_compaction_agg_state.groovy      | 39 +++++++-----
 .../test_vertical_compaction_uniq_keys.groovy      | 38 +++++++-----
 .../test_vertical_compaction_uniq_keys_ck.groovy   | 38 +++++++-----
 .../test_colocate_join_of_column_order.groovy      | 18 ++----
 ...iceberg_runtime_filter_partition_pruning.groovy | 29 +++++++++
 ...ntime_filter_partition_pruning_transform.groovy | 30 +++++++++
 ..._paimon_runtime_filter_partition_pruning.groovy | 31 +++++++++-
 .../test_iot_auto_detect_fail.groovy               | 21 ++++++-
 .../query_profile/s3_load_profile_test.groovy      | 17 +++++-
 19 files changed, 458 insertions(+), 232 deletions(-)

diff --git a/regression-test/suites/compaction/test_compaction_agg_keys.groovy 
b/regression-test/suites/compaction/test_compaction_agg_keys.groovy
index 480f4696e5f..7ed481827d9 100644
--- a/regression-test/suites/compaction/test_compaction_agg_keys.groovy
+++ b/regression-test/suites/compaction/test_compaction_agg_keys.groovy
@@ -17,6 +17,7 @@
 
 import org.codehaus.groovy.runtime.IOGroovyMethods
 import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
 
 suite("test_compaction_agg_keys") {
     def tableName = "compaction_agg_keys_regression_test"
@@ -104,26 +105,33 @@ suite("test_compaction_agg_keys") {
         
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,QueryHits,VersionCount,PathHash,MetaUrl,CompactionStatus
         def tablets = sql_return_maparray """ show tablets from ${tableName}; 
"""
 
-        // trigger compactions for all tablets in ${tableName}
-        trigger_and_wait_compaction(tableName, "cumulative")
-
         def replicaNum = get_table_replica_num(tableName)
         logger.info("get table replica num: " + replicaNum)
-        int rowCount = 0
-        for (def tablet in tablets) {
-            String tablet_id = tablet.TabletId
-
-            (code, out, err) = curl("GET", tablet.CompactionStatus)
-            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
-
-            assertEquals(code, 0)
-            def tabletJson = parseJson(out.trim())
-            assert tabletJson.rowsets instanceof List
-            for (String rowset in (List<String>) tabletJson.rowsets) {
-                rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+        // BE only picks rowsets whose version is already visible as cumulative
+        // compaction candidates, and the visible version is pushed from FE
+        // asynchronously. Right after a burst of loads it may lag, so a single
+        // cumulative round can merge only the visible prefix of rowsets. Retry
+        // trigger + recount until the rows are fully merged.
+        Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until(() -> {
+            // trigger compactions for all tablets in ${tableName}
+            trigger_and_wait_compaction(tableName, "cumulative")
+            int rowCount = 0
+            for (def tablet in tablets) {
+                String tablet_id = tablet.TabletId
+
+                (code, out, err) = curl("GET", tablet.CompactionStatus)
+                logger.info("Show tablets status: code=" + code + ", out=" + 
out + ", err=" + err)
+
+                assertEquals(code, 0)
+                def tabletJson = parseJson(out.trim())
+                assert tabletJson.rowsets instanceof List
+                for (String rowset in (List<String>) tabletJson.rowsets) {
+                    rowCount += Integer.parseInt(rowset.split(" ")[1])
+                }
             }
-        }
-        assert (rowCount < 8 * replicaNum)
+            return rowCount < 8 * replicaNum
+        })
         qt_select_default2 """ SELECT * FROM ${tableName} t ORDER BY user_id; 
"""
     } finally {
         try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git 
a/regression-test/suites/compaction/test_compaction_agg_keys_with_array_map.groovy
 
b/regression-test/suites/compaction/test_compaction_agg_keys_with_array_map.groovy
index bad84c83295..604025ed54d 100644
--- 
a/regression-test/suites/compaction/test_compaction_agg_keys_with_array_map.groovy
+++ 
b/regression-test/suites/compaction/test_compaction_agg_keys_with_array_map.groovy
@@ -16,6 +16,8 @@
 // under the License.
 
 import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
 
 suite("test_compaction_agg_keys_with_array_map") {
     def tableName = "compaction_agg_keys_regression_test_complex"
@@ -95,26 +97,33 @@ suite("test_compaction_agg_keys_with_array_map") {
         
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,QueryHits,VersionCount,PathHash,MetaUrl,CompactionStatus
         def tablets = sql_return_maparray """ show tablets from ${tableName}; 
"""
 
-        // trigger compactions for all tablets in ${tableName}
-        trigger_and_wait_compaction(tableName, "cumulative")
-
         def replicaNum = get_table_replica_num(tableName)
         logger.info("get table replica num: " + replicaNum)
-        int rowCount = 0
-        for (def tablet in tablets) {
-            String tablet_id = tablet.TabletId
-
-            (code, out, err) = curl("GET", tablet.CompactionStatus)
-            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
-
-            assertEquals(code, 0)
-            def tabletJson = parseJson(out.trim())
-            assert tabletJson.rowsets instanceof List
-            for (String rowset in (List<String>) tabletJson.rowsets) {
-                rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+        // BE only picks rowsets whose version is already visible as cumulative
+        // compaction candidates, and the visible version is pushed from FE
+        // asynchronously. Right after a burst of loads it may lag, so a single
+        // cumulative round can merge only the visible prefix of rowsets. Retry
+        // trigger + recount until the rows are fully merged.
+        Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until(() -> {
+            // trigger compactions for all tablets in ${tableName}
+            trigger_and_wait_compaction(tableName, "cumulative")
+            int rowCount = 0
+            for (def tablet in tablets) {
+                String tablet_id = tablet.TabletId
+
+                (code, out, err) = curl("GET", tablet.CompactionStatus)
+                logger.info("Show tablets status: code=" + code + ", out=" + 
out + ", err=" + err)
+
+                assertEquals(code, 0)
+                def tabletJson = parseJson(out.trim())
+                assert tabletJson.rowsets instanceof List
+                for (String rowset in (List<String>) tabletJson.rowsets) {
+                    rowCount += Integer.parseInt(rowset.split(" ")[1])
+                }
             }
-        }
-        assert (rowCount < 8 * replicaNum)
+            return rowCount < 8 * replicaNum
+        })
         qt_select_default2 """ SELECT * FROM ${tableName} t ORDER BY user_id; 
"""
     } finally {
         try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git 
a/regression-test/suites/compaction/test_compaction_agg_keys_with_delete.groovy 
b/regression-test/suites/compaction/test_compaction_agg_keys_with_delete.groovy
index 99ea6077e46..645c0468078 100644
--- 
a/regression-test/suites/compaction/test_compaction_agg_keys_with_delete.groovy
+++ 
b/regression-test/suites/compaction/test_compaction_agg_keys_with_delete.groovy
@@ -16,6 +16,8 @@
 // under the License.
 
 import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
 
 suite("test_compaction_agg_keys_with_delete") {
     def tableName = "test_compaction_agg_keys_with_delete_regression_test"
@@ -114,26 +116,33 @@ suite("test_compaction_agg_keys_with_delete") {
         
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
         def tablets = sql_return_maparray """ show tablets from ${tableName}; 
"""
 
-        // trigger compactions for all tablets in ${tableName}
-        trigger_and_wait_compaction(tableName, "cumulative")
-
         def replicaNum = get_table_replica_num(tableName)
         logger.info("get table replica num: " + replicaNum)
-        int rowCount = 0
-        for (def tablet in tablets) {
-            String tablet_id = tablet.TabletId
-
-            (code, out, err) = curl("GET", tablet.CompactionStatus)
-            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
-            assertEquals(code, 0)
-
-            def tabletJson = parseJson(out.trim())
-            assert tabletJson.rowsets instanceof List
-            for (String rowset in (List<String>) tabletJson.rowsets) {
-                rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+        // BE only picks rowsets whose version is already visible as cumulative
+        // compaction candidates, and the visible version is pushed from FE
+        // asynchronously. Right after a burst of loads it may lag, so a single
+        // cumulative round can merge only the visible prefix of rowsets. Retry
+        // trigger + recount until the rows are fully merged.
+        Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until(() -> {
+            // trigger compactions for all tablets in ${tableName}
+            trigger_and_wait_compaction(tableName, "cumulative")
+            int rowCount = 0
+            for (def tablet in tablets) {
+                String tablet_id = tablet.TabletId
+
+                (code, out, err) = curl("GET", tablet.CompactionStatus)
+                logger.info("Show tablets status: code=" + code + ", out=" + 
out + ", err=" + err)
+                assertEquals(code, 0)
+
+                def tabletJson = parseJson(out.trim())
+                assert tabletJson.rowsets instanceof List
+                for (String rowset in (List<String>) tabletJson.rowsets) {
+                    rowCount += Integer.parseInt(rowset.split(" ")[1])
+                }
             }
-        }
-        assert (rowCount < 8 * replicaNum)
+            return rowCount < 8 * replicaNum
+        })
         qt_select_default2 """ SELECT * FROM ${tableName} t ORDER BY user_id; 
"""
     } finally {
         try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git a/regression-test/suites/compaction/test_compaction_uniq_keys.groovy 
b/regression-test/suites/compaction/test_compaction_uniq_keys.groovy
index fac2cc4ac80..0da404d2245 100644
--- a/regression-test/suites/compaction/test_compaction_uniq_keys.groovy
+++ b/regression-test/suites/compaction/test_compaction_uniq_keys.groovy
@@ -17,6 +17,7 @@
 
 import org.codehaus.groovy.runtime.IOGroovyMethods
 import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
 
 suite("test_compaction_uniq_keys") {
     def tableName = "compaction_uniq_keys_regression_test"
@@ -103,25 +104,30 @@ suite("test_compaction_uniq_keys") {
         
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
         def tablets = sql_return_maparray """ show tablets from ${tableName}; 
"""
 
-        // trigger compactions for all tablets in ${tableName}
-        trigger_and_wait_compaction(tableName, "cumulative")
-
         def replicaNum = get_table_replica_num(tableName)
         logger.info("get table replica num: " + replicaNum)
 
-        int rowCount = 0
-        for (def tablet in tablets) {
-            String tablet_id = tablet.TabletId
-            (code, out, err) = curl("GET", tablet.CompactionStatus)
-            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
-            assertEquals(code, 0)
-            def tabletJson = parseJson(out.trim())
-            assert tabletJson.rowsets instanceof List
-            for (String rowset in (List<String>) tabletJson.rowsets) {
-                rowCount += Integer.parseInt(rowset.split(" ")[1])
+        // BE only picks rowsets whose version is already visible as cumulative
+        // compaction candidates, and the visible version is pushed from FE
+        // asynchronously. Right after a burst of loads it may lag, so a single
+        // cumulative round can merge only the visible prefix of rowsets. Retry
+        // trigger + recount until the rows are fully merged.
+        Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until(() -> {
+            // trigger compactions for all tablets in ${tableName}
+            trigger_and_wait_compaction(tableName, "cumulative")
+            int rowCount = 0
+            for (def tablet in tablets) {
+                (code, out, err) = curl("GET", tablet.CompactionStatus)
+                logger.info("Show tablets status: code=" + code + ", out=" + 
out + ", err=" + err)
+                assertEquals(code, 0)
+                def tabletJson = parseJson(out.trim())
+                assert tabletJson.rowsets instanceof List
+                for (String rowset in (List<String>) tabletJson.rowsets) {
+                    rowCount += Integer.parseInt(rowset.split(" ")[1])
+                }
             }
-        }
-        assert (rowCount < 8 * replicaNum)
+            return rowCount < 8 * replicaNum
+        })
         qt_select_default2 """ SELECT * FROM ${tableName} t ORDER BY user_id; 
"""
     } finally {
         try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git 
a/regression-test/suites/compaction/test_compaction_uniq_keys_ck.groovy 
b/regression-test/suites/compaction/test_compaction_uniq_keys_ck.groovy
index 1b401b3c789..0b5aed737ff 100644
--- a/regression-test/suites/compaction/test_compaction_uniq_keys_ck.groovy
+++ b/regression-test/suites/compaction/test_compaction_uniq_keys_ck.groovy
@@ -16,6 +16,8 @@
 // under the License.
 
 import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
 
 suite("test_compaction_uniq_keys_ck") {
     def tableName = "compaction_uniq_keys_ck"
@@ -107,24 +109,32 @@ suite("test_compaction_uniq_keys_ck") {
         
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
         def tablets = sql_return_maparray """ show tablets from ${tableName}; 
"""
 
-        // trigger compactions for all tablets in ${tableName}
-        trigger_and_wait_compaction(tableName, "cumulative")
-
         def replicaNum = get_table_replica_num(tableName)
         logger.info("get table replica num: " + replicaNum)
-        int rowCount = 0
-        for (def tablet in tablets) {
-            String tablet_id = tablet.TabletId
-            (code, out, err) = curl("GET", tablet.CompactionStatus)
-            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
-            assertEquals(code, 0)
-            def tabletJson = parseJson(out.trim())
-            assert tabletJson.rowsets instanceof List
-            for (String rowset in (List<String>) tabletJson.rowsets) {
-                rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+        // BE only treats a rowset as a cumulative compaction candidate once 
the BE has
+        // the FE-pushed visible version covering it 
(Tablet::_pick_visible_rowsets_to_compaction).
+        // The fast incremental push from FE (PublishVersionDaemon) can be 
missed; the
+        // periodic fallback then syncs it, bounded by 
partition_info_update_interval_secs
+        // (60s) + report_tablet_interval_seconds (60s), i.e. up to ~120s. So 
keep triggering
+        // + recounting -- the merge happens the instant the visible version 
lands -- and give
+        // the loop budget well above that ~120s ceiling so it stays 
deterministic.
+        Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until(() -> {
+            // trigger compactions for all tablets in ${tableName}
+            trigger_and_wait_compaction(tableName, "cumulative")
+            int rowCount = 0
+            for (def tablet in tablets) {
+                (code, out, err) = curl("GET", tablet.CompactionStatus)
+                logger.info("Show tablets status: code=" + code + ", out=" + 
out + ", err=" + err)
+                assertEquals(code, 0)
+                def tabletJson = parseJson(out.trim())
+                assert tabletJson.rowsets instanceof List
+                for (String rowset in (List<String>) tabletJson.rowsets) {
+                    rowCount += Integer.parseInt(rowset.split(" ")[1])
+                }
             }
-        }
-        assert (rowCount < 8 * replicaNum)
+            return rowCount < 8 * replicaNum
+        })
         qt_select_default2 """ SELECT * FROM ${tableName} t ORDER BY user_id; 
"""
     } finally {
         // try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git 
a/regression-test/suites/compaction/test_compaction_uniq_keys_row_store_ck.groovy
 
b/regression-test/suites/compaction/test_compaction_uniq_keys_row_store_ck.groovy
index 19a6d467d84..25fa852a112 100644
--- 
a/regression-test/suites/compaction/test_compaction_uniq_keys_row_store_ck.groovy
+++ 
b/regression-test/suites/compaction/test_compaction_uniq_keys_row_store_ck.groovy
@@ -16,6 +16,8 @@
 // under the License.
 
 import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
 
 
 suite("test_compaction_uniq_keys_row_store_ck", "p0") {
@@ -186,24 +188,31 @@ suite("test_compaction_uniq_keys_row_store_ck", "p0") {
 
         checkValue()
 
-        // trigger compactions for all tablets in ${tableName}
-        trigger_and_wait_compaction(tableName, "cumulative")
-
         def replicaNum = get_table_replica_num(tableName)
         logger.info("get table replica num: " + replicaNum)
-        int rowCount = 0
-        for (def tablet in tablets) {
-            String tablet_id = tablet.TabletId
-            (code, out, err) = curl("GET", tablet.CompactionStatus)
-            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
-            assertEquals(code, 0)
-            def tabletJson = parseJson(out.trim())
-            assert tabletJson.rowsets instanceof List
-            for (String rowset in (List<String>) tabletJson.rowsets) {
-                rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+        // BE only picks rowsets whose version is already visible as cumulative
+        // compaction candidates, and the visible version is pushed from FE
+        // asynchronously. Right after a burst of loads it may lag, so a single
+        // cumulative round can merge only the visible prefix of rowsets. Retry
+        // trigger + recount until the rows are fully merged.
+        Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until(() -> {
+            // trigger compactions for all tablets in ${tableName}
+            trigger_and_wait_compaction(tableName, "cumulative")
+            int rowCount = 0
+            for (def tablet in tablets) {
+                String tablet_id = tablet.TabletId
+                (code, out, err) = curl("GET", tablet.CompactionStatus)
+                logger.info("Show tablets status: code=" + code + ", out=" + 
out + ", err=" + err)
+                assertEquals(code, 0)
+                def tabletJson = parseJson(out.trim())
+                assert tabletJson.rowsets instanceof List
+                for (String rowset in (List<String>) tabletJson.rowsets) {
+                    rowCount += Integer.parseInt(rowset.split(" ")[1])
+                }
             }
-        }
-        assert (rowCount < 8 * replicaNum)
+            return rowCount < 8 * replicaNum
+        })
         checkValue()
         qt_sql_row_size "select sum(length(__DORIS_ROW_STORE_COL__)) from 
${tableName}"
     } finally {
diff --git 
a/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete.groovy
 
b/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete.groovy
index 463597124c7..2317b930907 100644
--- 
a/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete.groovy
+++ 
b/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete.groovy
@@ -16,6 +16,8 @@
 // under the License.
 
 import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
 
 suite("test_compaction_uniq_keys_with_delete") {
     def tableName = "test_compaction_uniq_keys_with_delete_regression_test"
@@ -118,24 +120,30 @@ suite("test_compaction_uniq_keys_with_delete") {
         
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus
         def tablets = sql_return_maparray """ show tablets from ${tableName}; 
"""
 
-        // trigger compactions for all tablets in ${tableName}
-        trigger_and_wait_compaction(tableName, "cumulative")
-
         def replicaNum = get_table_replica_num(tableName)
         logger.info("get table replica num: " + replicaNum)
-        int rowCount = 0
-        for (def tablet in tablets) {
-            String tablet_id = tablet.TabletId
-            (code, out, err) = curl("GET", tablet.CompactionStatus)
-            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
-            assertEquals(code, 0)
-            def tabletJson = parseJson(out.trim())
-            assert tabletJson.rowsets instanceof List
-            for (String rowset in (List<String>) tabletJson.rowsets) {
-                rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+        // BE only picks rowsets whose version is already visible as cumulative
+        // compaction candidates, and the visible version is pushed from FE
+        // asynchronously. Right after a burst of loads it may lag, so a single
+        // cumulative round can merge only the visible prefix of rowsets. Retry
+        // trigger + recount until the rows are fully merged.
+        Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until(() -> {
+            // trigger compactions for all tablets in ${tableName}
+            trigger_and_wait_compaction(tableName, "cumulative")
+            int rowCount = 0
+            for (def tablet in tablets) {
+                (code, out, err) = curl("GET", tablet.CompactionStatus)
+                logger.info("Show tablets status: code=" + code + ", out=" + 
out + ", err=" + err)
+                assertEquals(code, 0)
+                def tabletJson = parseJson(out.trim())
+                assert tabletJson.rowsets instanceof List
+                for (String rowset in (List<String>) tabletJson.rowsets) {
+                    rowCount += Integer.parseInt(rowset.split(" ")[1])
+                }
             }
-        }
-        assert (rowCount < 8 * replicaNum)
+            return rowCount < 8 * replicaNum
+        })
         qt_select_default3 """ SELECT * FROM ${tableName} t ORDER BY user_id; 
"""
     } finally {
         try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git 
a/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete_ck.groovy
 
b/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete_ck.groovy
index 6a61c84a1f6..fa4443e9370 100644
--- 
a/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete_ck.groovy
+++ 
b/regression-test/suites/compaction/test_compaction_uniq_keys_with_delete_ck.groovy
@@ -16,6 +16,8 @@
 // under the License.
 
 import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
 
 suite("test_compaction_uniq_keys_with_delete_ck") {
     def tableName = "test_compaction_uniq_keys_with_delete_ck"
@@ -135,24 +137,30 @@ suite("test_compaction_uniq_keys_with_delete_ck") {
         
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus
         def tablets = sql_return_maparray """ show tablets from ${tableName}; 
"""
 
-        // trigger compactions for all tablets in ${tableName}
-        trigger_and_wait_compaction(tableName, "cumulative")
-
         def replicaNum = get_table_replica_num(tableName)
         logger.info("get table replica num: " + replicaNum)
-        int rowCount = 0
-        for (def tablet in tablets) {
-            String tablet_id = tablet.TabletId
-            (code, out, err) = curl("GET", tablet.CompactionStatus)
-            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
-            assertEquals(code, 0)
-            def tabletJson = parseJson(out.trim())
-            assert tabletJson.rowsets instanceof List
-            for (String rowset in (List<String>) tabletJson.rowsets) {
-                rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+        // BE only picks rowsets whose version is already visible as cumulative
+        // compaction candidates, and the visible version is pushed from FE
+        // asynchronously. Right after a burst of loads it may lag, so a single
+        // cumulative round can merge only the visible prefix of rowsets. Retry
+        // trigger + recount until the rows are fully merged.
+        Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until(() -> {
+            // trigger compactions for all tablets in ${tableName}
+            trigger_and_wait_compaction(tableName, "cumulative")
+            int rowCount = 0
+            for (def tablet in tablets) {
+                (code, out, err) = curl("GET", tablet.CompactionStatus)
+                logger.info("Show tablets status: code=" + code + ", out=" + 
out + ", err=" + err)
+                assertEquals(code, 0)
+                def tabletJson = parseJson(out.trim())
+                assert tabletJson.rowsets instanceof List
+                for (String rowset in (List<String>) tabletJson.rowsets) {
+                    rowCount += Integer.parseInt(rowset.split(" ")[1])
+                }
             }
-        }
-        assert (rowCount < 8 * replicaNum)
+            return rowCount < 8 * replicaNum
+        })
         qt_select_default3 """ SELECT * FROM ${tableName} t ORDER BY user_id; 
"""
     } finally {
         // try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git 
a/regression-test/suites/compaction/test_compaction_with_empty_rowset.groovy 
b/regression-test/suites/compaction/test_compaction_with_empty_rowset.groovy
index 6f6f869917d..adc4e75ad92 100644
--- a/regression-test/suites/compaction/test_compaction_with_empty_rowset.groovy
+++ b/regression-test/suites/compaction/test_compaction_with_empty_rowset.groovy
@@ -17,6 +17,7 @@
 
 import org.codehaus.groovy.runtime.IOGroovyMethods
 import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
 
 suite("test_compaction_mow_with_empty_rowset", "p0") {
     def tableName = "test_compaction_with_empty_rowset"
@@ -60,21 +61,28 @@ suite("test_compaction_mow_with_empty_rowset", "p0") {
     def replicaNum = get_table_replica_num(tableName)
     logger.info("get table replica num: " + replicaNum)
 
-    // trigger compactions for all tablets in ${tableName}
-    trigger_and_wait_compaction(tableName, "cumulative")
-    int rowCount = 0
-    for (def tablet in tablets) {
-        String tablet_id = tablet.TabletId
-        def (code, out, err) = curl("GET", tablet.CompactionStatus)
-        logger.info("Show tablets status: code=" + code + ", out=" + out + ", 
err=" + err)
-        assertEquals(code, 0)
-        def tabletJson = parseJson(out.trim())
-        assert tabletJson.rowsets instanceof List
-        for (String rowset in (List<String>) tabletJson.rowsets) {
-            rowCount += Integer.parseInt(rowset.split(" ")[1])
+    // BE only picks rowsets whose version is already visible as cumulative
+    // compaction candidates, and the visible version is pushed from FE
+    // asynchronously. Right after a burst of loads it may lag, so a single
+    // cumulative round can merge only the visible prefix of rowsets. Retry
+    // trigger + recount until the rows are fully merged.
+    Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until(() -> {
+        // trigger compactions for all tablets in ${tableName}
+        trigger_and_wait_compaction(tableName, "cumulative")
+        int rowCount = 0
+        for (def tablet in tablets) {
+            String tablet_id = tablet.TabletId
+            def (code, out, err) = curl("GET", tablet.CompactionStatus)
+            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
+            assertEquals(code, 0)
+            def tabletJson = parseJson(out.trim())
+            assert tabletJson.rowsets instanceof List
+            for (String rowset in (List<String>) tabletJson.rowsets) {
+                rowCount += Integer.parseInt(rowset.split(" ")[1])
+            }
         }
-    }
-    assert (rowCount < 10 * replicaNum)
+        return rowCount < 10 * replicaNum
+    })
     qt_sql2 """ select * from ${tableName} order by k1, k2, k3 """
 
     for (int i = 0; i < 10; i++) {
@@ -82,20 +90,27 @@ suite("test_compaction_mow_with_empty_rowset", "p0") {
                 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """
     }   
 
-    // trigger compactions for all tablets in ${tableName}
-    trigger_and_wait_compaction(tableName, "cumulative")
-    rowCount = 0
-    for (def tablet in tablets) {
-        String tablet_id = tablet.TabletId
-        def (code, out, err) = curl("GET", tablet.CompactionStatus)
-        logger.info("Show tablets status: code=" + code + ", out=" + out + ", 
err=" + err)
-        assertEquals(code, 0)
-        def tabletJson = parseJson(out.trim())
-        assert tabletJson.rowsets instanceof List
-        for (String rowset in (List<String>) tabletJson.rowsets) {
-            rowCount += Integer.parseInt(rowset.split(" ")[1])
+    // BE only picks rowsets whose version is already visible as cumulative
+    // compaction candidates, and the visible version is pushed from FE
+    // asynchronously. Right after a burst of loads it may lag, so a single
+    // cumulative round can merge only the visible prefix of rowsets. Retry
+    // trigger + recount until the rows are fully merged.
+    Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until(() -> {
+        // trigger compactions for all tablets in ${tableName}
+        trigger_and_wait_compaction(tableName, "cumulative")
+        int rowCount = 0
+        for (def tablet in tablets) {
+            String tablet_id = tablet.TabletId
+            def (code, out, err) = curl("GET", tablet.CompactionStatus)
+            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
+            assertEquals(code, 0)
+            def tabletJson = parseJson(out.trim())
+            assert tabletJson.rowsets instanceof List
+            for (String rowset in (List<String>) tabletJson.rowsets) {
+                rowCount += Integer.parseInt(rowset.split(" ")[1])
+            }
         }
-    }
-    assert (rowCount < 20 * replicaNum)
+        return rowCount < 20 * replicaNum
+    })
     qt_sql3 """ select * from ${tableName} order by k1, k2, k3 """
 }
diff --git 
a/regression-test/suites/compaction/test_vertical_compaction_agg_keys.groovy 
b/regression-test/suites/compaction/test_vertical_compaction_agg_keys.groovy
index 89493e15343..6f9407324c1 100644
--- a/regression-test/suites/compaction/test_vertical_compaction_agg_keys.groovy
+++ b/regression-test/suites/compaction/test_vertical_compaction_agg_keys.groovy
@@ -16,6 +16,8 @@
 // under the License.
 
 import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
 
 suite("test_vertical_compaction_agg_keys") {
     def tableName = "vertical_compaction_agg_keys_regression_test"
@@ -115,24 +117,31 @@ suite("test_vertical_compaction_agg_keys") {
         
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
         def tablets = sql_return_maparray """ show tablets from ${tableName}; 
"""
 
-        // trigger compactions for all tablets in ${tableName}
-        trigger_and_wait_compaction(tableName, "cumulative")
-
         def replicaNum = get_table_replica_num(tableName)
         logger.info("get table replica num: " + replicaNum)
-        int rowCount = 0
-        for (def tablet in tablets) {
-            String tablet_id = tablet.TabletId
-            (code, out, err) = curl("GET", tablet.CompactionStatus)
-            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
-            assertEquals(code, 0)
-            def tabletJson = parseJson(out.trim())
-            assert tabletJson.rowsets instanceof List
-            for (String rowset in (List<String>) tabletJson.rowsets) {
-                rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+        // BE only picks rowsets whose version is already visible as cumulative
+        // compaction candidates, and the visible version is pushed from FE
+        // asynchronously. Right after a burst of loads it may lag, so a single
+        // cumulative round can merge only the visible prefix of rowsets. Retry
+        // trigger + recount until the rows are fully merged.
+        Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until(() -> {
+            // trigger compactions for all tablets in ${tableName}
+            trigger_and_wait_compaction(tableName, "cumulative")
+            int rowCount = 0
+            for (def tablet in tablets) {
+                String tablet_id = tablet.TabletId
+                (code, out, err) = curl("GET", tablet.CompactionStatus)
+                logger.info("Show tablets status: code=" + code + ", out=" + 
out + ", err=" + err)
+                assertEquals(code, 0)
+                def tabletJson = parseJson(out.trim())
+                assert tabletJson.rowsets instanceof List
+                for (String rowset in (List<String>) tabletJson.rowsets) {
+                    rowCount += Integer.parseInt(rowset.split(" ")[1])
+                }
             }
-        }
-        assert (rowCount < 8 * replicaNum)
+            return rowCount < 8 * replicaNum
+        })
         qt_select_default3 """ SELECT * FROM ${tableName} t ORDER BY user_id; 
"""
     } finally {
         try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git 
a/regression-test/suites/compaction/test_vertical_compaction_agg_state.groovy 
b/regression-test/suites/compaction/test_vertical_compaction_agg_state.groovy
index 22a8f653b74..6216a386bfb 100644
--- 
a/regression-test/suites/compaction/test_vertical_compaction_agg_state.groovy
+++ 
b/regression-test/suites/compaction/test_vertical_compaction_agg_state.groovy
@@ -16,6 +16,8 @@
 // under the License.
 
 import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
 
 suite("test_vertical_compaction_agg_state") {
     def tableName = "vertical_compaction_agg_state_regression_test"
@@ -76,24 +78,31 @@ suite("test_vertical_compaction_agg_state") {
 
         def tablets = sql_return_maparray """ show tablets from ${tableName}; 
"""
 
-        // trigger compactions for all tablets in ${tableName}
-        trigger_and_wait_compaction(tableName, "cumulative")
-
         def replicaNum = get_table_replica_num(tableName)
         logger.info("get table replica num: " + replicaNum)
-        int rowCount = 0
-        for (def tablet in tablets) {
-            String tablet_id = tablet.TabletId
-            (code, out, err) = curl("GET", tablet.CompactionStatus)
-            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
-            assertEquals(code, 0)
-            def tabletJson = parseJson(out.trim())
-            assert tabletJson.rowsets instanceof List
-            for (String rowset in (List<String>) tabletJson.rowsets) {
-                rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+        // BE only picks rowsets whose version is already visible as cumulative
+        // compaction candidates, and the visible version is pushed from FE
+        // asynchronously. Right after a burst of loads it may lag, so a single
+        // cumulative round can merge only the visible prefix of rowsets. Retry
+        // trigger + recount until the rows are fully merged.
+        Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until(() -> {
+            // trigger compactions for all tablets in ${tableName}
+            trigger_and_wait_compaction(tableName, "cumulative")
+            int rowCount = 0
+            for (def tablet in tablets) {
+                String tablet_id = tablet.TabletId
+                (code, out, err) = curl("GET", tablet.CompactionStatus)
+                logger.info("Show tablets status: code=" + code + ", out=" + 
out + ", err=" + err)
+                assertEquals(code, 0)
+                def tabletJson = parseJson(out.trim())
+                assert tabletJson.rowsets instanceof List
+                for (String rowset in (List<String>) tabletJson.rowsets) {
+                    rowCount += Integer.parseInt(rowset.split(" ")[1])
+                }
             }
-        }
-        assert (rowCount < 8 * replicaNum)
+            return rowCount < 8 * replicaNum
+        })
         qt_select_default """ SELECT 
user_id,array_sort(collect_set_merge(agg_user_id)) FROM ${tableName} t group by 
user_id ORDER BY user_id;"""
     } finally {
         try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git 
a/regression-test/suites/compaction/test_vertical_compaction_uniq_keys.groovy 
b/regression-test/suites/compaction/test_vertical_compaction_uniq_keys.groovy
index 6bff003a028..b7300ea8617 100644
--- 
a/regression-test/suites/compaction/test_vertical_compaction_uniq_keys.groovy
+++ 
b/regression-test/suites/compaction/test_vertical_compaction_uniq_keys.groovy
@@ -16,6 +16,8 @@
 // under the License.
 
 import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
 
 suite("test_vertical_compaction_uniq_keys") {
     def tableName = "vertical_compaction_uniq_keys_regression_test"
@@ -112,24 +114,30 @@ suite("test_vertical_compaction_uniq_keys") {
         
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
         def tablets = sql_return_maparray """ show tablets from ${tableName}; 
"""
 
-        // trigger compactions for all tablets in ${tableName}
-        trigger_and_wait_compaction(tableName, "cumulative")
-
         def replicaNum = get_table_replica_num(tableName)
         logger.info("get table replica num: " + replicaNum)
-        int rowCount = 0
-        for (def tablet in tablets) {
-            String tablet_id = tablet.TabletId
-            (code, out, err) = curl("GET", tablet.CompactionStatus)
-            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
-            assertEquals(code, 0)
-            def tabletJson = parseJson(out.trim())
-            assert tabletJson.rowsets instanceof List
-            for (String rowset in (List<String>) tabletJson.rowsets) {
-                rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+        // BE only picks rowsets whose version is already visible as cumulative
+        // compaction candidates, and the visible version is pushed from FE
+        // asynchronously. Right after a burst of loads it may lag, so a single
+        // cumulative round can merge only the visible prefix of rowsets. Retry
+        // trigger + recount until the rows are fully merged.
+        Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until(() -> {
+            // trigger compactions for all tablets in ${tableName}
+            trigger_and_wait_compaction(tableName, "cumulative")
+            int rowCount = 0
+            for (def tablet in tablets) {
+                (code, out, err) = curl("GET", tablet.CompactionStatus)
+                logger.info("Show tablets status: code=" + code + ", out=" + 
out + ", err=" + err)
+                assertEquals(code, 0)
+                def tabletJson = parseJson(out.trim())
+                assert tabletJson.rowsets instanceof List
+                for (String rowset in (List<String>) tabletJson.rowsets) {
+                    rowCount += Integer.parseInt(rowset.split(" ")[1])
+                }
             }
-        }
-        assert (rowCount < 8 * replicaNum)
+            return rowCount < 8 * replicaNum
+        })
         qt_select_default3 """ SELECT * FROM ${tableName} t ORDER BY user_id; 
"""
     } finally {
         try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git 
a/regression-test/suites/compaction/test_vertical_compaction_uniq_keys_ck.groovy
 
b/regression-test/suites/compaction/test_vertical_compaction_uniq_keys_ck.groovy
index e7fad814e15..a78dcc5c3cf 100644
--- 
a/regression-test/suites/compaction/test_vertical_compaction_uniq_keys_ck.groovy
+++ 
b/regression-test/suites/compaction/test_vertical_compaction_uniq_keys_ck.groovy
@@ -16,6 +16,8 @@
 // under the License.
 
 import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+import java.util.concurrent.TimeUnit
 
 suite("test_vertical_compaction_uniq_keys_ck") {
     def tableName = "test_vertical_compaction_uniq_keys_ck"
@@ -114,24 +116,30 @@ suite("test_vertical_compaction_uniq_keys_ck") {
         
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
         def tablets = sql_return_maparray """ show tablets from ${tableName}; 
"""
 
-        // trigger compactions for all tablets in ${tableName}
-        trigger_and_wait_compaction(tableName, "cumulative")
-
         def replicaNum = get_table_replica_num(tableName)
         logger.info("get table replica num: " + replicaNum)
-        int rowCount = 0
-        for (def tablet in tablets) {
-            String tablet_id = tablet.TabletId
-            (code, out, err) = curl("GET", tablet.CompactionStatus)
-            logger.info("Show tablets status: code=" + code + ", out=" + out + 
", err=" + err)
-            assertEquals(code, 0)
-            def tabletJson = parseJson(out.trim())
-            assert tabletJson.rowsets instanceof List
-            for (String rowset in (List<String>) tabletJson.rowsets) {
-                rowCount += Integer.parseInt(rowset.split(" ")[1])
+
+        // BE only picks rowsets whose version is already visible as cumulative
+        // compaction candidates, and the visible version is pushed from FE
+        // asynchronously. Right after a burst of loads it may lag, so a single
+        // cumulative round can merge only the visible prefix of rowsets. Retry
+        // trigger + recount until the rows are fully merged.
+        Awaitility.await().atMost(300, TimeUnit.SECONDS).pollInterval(2, 
TimeUnit.SECONDS).until(() -> {
+            // trigger compactions for all tablets in ${tableName}
+            trigger_and_wait_compaction(tableName, "cumulative")
+            int rowCount = 0
+            for (def tablet in tablets) {
+                (code, out, err) = curl("GET", tablet.CompactionStatus)
+                logger.info("Show tablets status: code=" + code + ", out=" + 
out + ", err=" + err)
+                assertEquals(code, 0)
+                def tabletJson = parseJson(out.trim())
+                assert tabletJson.rowsets instanceof List
+                for (String rowset in (List<String>) tabletJson.rowsets) {
+                    rowCount += Integer.parseInt(rowset.split(" ")[1])
+                }
             }
-        }
-        assert (rowCount < 8 * replicaNum)
+            return rowCount < 8 * replicaNum
+        })
         qt_select_default3 """ SELECT * FROM ${tableName} t ORDER BY user_id; 
"""
     } finally {
         try_sql("DROP TABLE IF EXISTS ${tableName}")
diff --git 
a/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy
 
b/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy
index d6d27e03f76..21e0a6a1873 100644
--- 
a/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy
+++ 
b/regression-test/suites/correctness_p0/test_colocate_join_of_column_order.groovy
@@ -100,20 +100,12 @@ suite("test_colocate_join_of_column_order") {
     sql """insert into test_colocate_join_of_column_order_tb values(1,1);"""
     sql """insert into test_colocate_join_of_column_order_tc values(1,1);"""
 
-    // Pin column statistics so the cost-based COLOCATE-vs-PARTITIONED choice 
is deterministic.
-    // Freshly-created tables have rowCountReported=false (only the async 
CloudTabletStatMgr sets it,
-    // up to a full tick after INSERT); with unreliable stats the optimizer 
can fall back to a
-    // PARTITIONED shuffle join, which makes the COLOCATE assertion below 
flaky. Injecting stats
-    // (userInjected) bypasses the async-report dependency. See 
nereids_p0/join/initial_join_order.
-    sql """alter table test_colocate_join_of_column_order_ta modify column c1 
set stats ('row_count'='1', 'ndv'='1', 'num_nulls'='0', 'min_value'='1', 
'max_value'='1')"""
-    sql """alter table test_colocate_join_of_column_order_ta modify column c2 
set stats ('row_count'='1', 'ndv'='1', 'num_nulls'='0', 'min_value'='1', 
'max_value'='1')"""
-    sql """alter table test_colocate_join_of_column_order_tb modify column c1 
set stats ('row_count'='1', 'ndv'='1', 'num_nulls'='0', 'min_value'='1', 
'max_value'='1')"""
-    sql """alter table test_colocate_join_of_column_order_tb modify column c2 
set stats ('row_count'='1', 'ndv'='1', 'num_nulls'='0', 'min_value'='1', 
'max_value'='1')"""
-    sql """alter table test_colocate_join_of_column_order_tc modify column c1 
set stats ('row_count'='1', 'ndv'='1', 'num_nulls'='0', 'min_value'='1', 
'max_value'='1')"""
-    sql """alter table test_colocate_join_of_column_order_tc modify column c2 
set stats ('row_count'='1', 'ndv'='1', 'num_nulls'='0', 'min_value'='1', 
'max_value'='1')"""
-
+    // parallel_pipeline_task_num=1 keeps the bucket-shuffle downgrade from 
firing: with the default
+    // fuzzed value (0 -> auto ~cores/2) the heuristic totalBucketNum(10) < 
backEndNum*paraNum*0.8 can
+    // trip on a single-BE cloud cluster, turning ta's NATURAL distribution 
into EXECUTION_BUCKETED and
+    // forbidding the downstream COLOCATE. Pinning paraNum=1 makes the 
condition always false.
     explain {
-        sql("""select /*+ set_var(disable_join_reorder=true) */ * from 
test_colocate_join_of_column_order_ta join [shuffle] (select cast((c2 + 1) as 
bigint) c2 from test_colocate_join_of_column_order_tb) 
test_colocate_join_of_column_order_tb  on 
test_colocate_join_of_column_order_ta.c1 = 
test_colocate_join_of_column_order_tb.c2 join [shuffle] 
test_colocate_join_of_column_order_tc on 
test_colocate_join_of_column_order_tb.c2 = 
test_colocate_join_of_column_order_tc.c1;""");
+        sql("""select /*+ 
set_var(disable_join_reorder=true,parallel_pipeline_task_num=1) */ * from 
test_colocate_join_of_column_order_ta join [shuffle] (select cast((c2 + 1) as 
bigint) c2 from test_colocate_join_of_column_order_tb) 
test_colocate_join_of_column_order_tb  on 
test_colocate_join_of_column_order_ta.c1 = 
test_colocate_join_of_column_order_tb.c2 join [shuffle] 
test_colocate_join_of_column_order_tc on 
test_colocate_join_of_column_order_tb.c2 = test_colocate_join_of_column_order 
[...]
         contains "COLOCATE"
     }
 
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning.groovy
index 442a8a4c121..9bf238c517d 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning.groovy
@@ -15,6 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
 suite("test_iceberg_runtime_filter_partition_pruning", 
"p0,external,doris,external_docker,external_docker_doris") {
 
     String enabled = context.config.otherConfigs.get("enableIcebergTest")
@@ -40,6 +43,32 @@ suite("test_iceberg_runtime_filter_partition_pruning", 
"p0,external,doris,extern
         "s3.region" = "us-east-1"
     );"""
 
+    // All tables in partition_db are pre-created by the docker preinstalled 
script (run18.sql)
+    // and are only read here. A freshly created Iceberg REST catalog can 
occasionally return an
+    // incomplete table list on its first metadata fetch (REST client cold 
start), which made this
+    // test flaky with "Table xxx does not exist in database [partition_db]". 
Wait until all
+    // expected tables are visible (refresh to drop any cached partial list) 
before querying.
+    def expectedTables = ["date_partitioned", "int_partitioned", 
"float_partitioned",
+                          "string_partitioned", "timestamp_partitioned", 
"timestamp_ntz_partitioned",
+                          "boolean_partitioned", "decimal_partitioned", 
"binary_partitioned",
+                          "null_str_partition_table"] as Set
+    Awaitility.await("wait for ${db_name} tables to be visible")
+            .atMost(60, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS).until {
+        try {
+            sql """refresh catalog ${catalog_name}"""
+            def actualTables = sql("""show tables from 
`${catalog_name}`.`${db_name}`""")
+                    .collect { it[0] as String } as Set
+            if (!actualTables.containsAll(expectedTables)) {
+                logger.warn("${db_name} not ready yet, missing tables: 
${expectedTables - actualTables}")
+                return false
+            }
+            return true
+        } catch (Exception e) {
+            logger.warn("waiting for ${db_name} tables to be visible: 
${e.getMessage()}")
+            return false
+        }
+    }
+
     sql """switch ${catalog_name}"""
     sql """use ${db_name}"""
 
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning_transform.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning_transform.groovy
index 7a2dbcfab92..f2b951366cc 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning_transform.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_runtime_filter_partition_pruning_transform.groovy
@@ -15,6 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
 suite("test_iceberg_runtime_filter_partition_pruning_transform", 
"p0,external,doris,external_docker,external_docker_doris") {
 
     String enabled = context.config.otherConfigs.get("enableIcebergTest")
@@ -40,6 +43,33 @@ 
suite("test_iceberg_runtime_filter_partition_pruning_transform", "p0,external,do
         "s3.region" = "us-east-1"
     );"""
 
+    // All tables in transform_partition_db are pre-created by the docker 
preinstalled script
+    // (run19.sql) and are only read here. A freshly created Iceberg REST 
catalog can occasionally
+    // return an incomplete table list on its first metadata fetch (REST 
client cold start), which can
+    // make this test flaky with "Table xxx does not exist in database 
[transform_partition_db]". Wait
+    // until all expected tables are visible (refresh to drop any cached 
partial list) before querying.
+    def expectedTables = ["bucket_int_4", "bucket_bigint_4", 
"bucket_string_4", "bucket_date_4",
+                          "bucket_timestamp_4", "bucket_timestamp_ntz_4", 
"bucket_binary_4",
+                          "truncate_string_3", "truncate_binary_4", 
"truncate_int_10",
+                          "truncate_bigint_100", "truncate_decimal_10", 
"day_partitioned",
+                          "year_partitioned", "month_partitioned", 
"hour_partitioned"] as Set
+    Awaitility.await("wait for ${db_name} tables to be visible")
+            .atMost(60, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS).until {
+        try {
+            sql """refresh catalog ${catalog_name}"""
+            def actualTables = sql("""show tables from 
`${catalog_name}`.`${db_name}`""")
+                    .collect { it[0] as String } as Set
+            if (!actualTables.containsAll(expectedTables)) {
+                logger.warn("${db_name} not ready yet, missing tables: 
${expectedTables - actualTables}")
+                return false
+            }
+            return true
+        } catch (Exception e) {
+            logger.warn("waiting for ${db_name} tables to be visible: 
${e.getMessage()}")
+            return false
+        }
+    }
+
     sql """switch ${catalog_name}"""
     sql """use ${db_name}"""
 
diff --git 
a/regression-test/suites/external_table_p0/paimon/test_paimon_runtime_filter_partition_pruning.groovy
 
b/regression-test/suites/external_table_p0/paimon/test_paimon_runtime_filter_partition_pruning.groovy
index f7a666d2c83..325e9f242d0 100644
--- 
a/regression-test/suites/external_table_p0/paimon/test_paimon_runtime_filter_partition_pruning.groovy
+++ 
b/regression-test/suites/external_table_p0/paimon/test_paimon_runtime_filter_partition_pruning.groovy
@@ -15,6 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
 suite("test_paimon_runtime_filter_partition_pruning", 
"p0,external,doris,external_docker,external_docker_doris") {
     String enabled = context.config.otherConfigs.get("enablePaimonTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
@@ -36,8 +39,34 @@ suite("test_paimon_runtime_filter_partition_pruning", 
"p0,external,doris,externa
                     's3.path.style.access' = 'true'
             );
         """
+        // All tables in partition_db are pre-created by the docker 
preinstalled paimon script and are
+        // only read here. A freshly created Paimon catalog can occasionally 
return an incomplete table
+        // list on its first metadata fetch (catalog cold start), which can 
make this test flaky with
+        // "Table xxx does not exist in database [partition_db]". Wait until 
all expected tables are
+        // visible (refresh to drop any cached partial list) before querying.
+        // Note: binary_partitioned is intentionally excluded (its queries are 
skipped below).
+        def expectedTables = ["decimal_partitioned", "int_partitioned", 
"string_partitioned",
+                              "date_partitioned", "timestamp_partitioned", 
"bigint_partitioned",
+                              "boolean_partitioned", "float_partitioned",
+                              "null_str_partition_table"] as Set
+        Awaitility.await("wait for ${db_name} tables to be visible")
+                .atMost(60, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS).until {
+            try {
+                sql """refresh catalog ${catalog_name}"""
+                def actualTables = sql("""show tables from 
`${catalog_name}`.`${db_name}`""")
+                        .collect { it[0] as String } as Set
+                if (!actualTables.containsAll(expectedTables)) {
+                    logger.warn("${db_name} not ready yet, missing tables: 
${expectedTables - actualTables}")
+                    return false
+                }
+                return true
+            } catch (Exception e) {
+                logger.warn("waiting for ${db_name} tables to be visible: 
${e.getMessage()}")
+                return false
+            }
+        }
         sql """use `${catalog_name}`.`${db_name}`;"""
-        
+
         def test_runtime_filter_partition_pruning = {
             qt_runtime_filter_partition_pruning_decimal1 """
                 select count(*) from decimal_partitioned where partition_key =
diff --git 
a/regression-test/suites/insert_overwrite_p0/test_iot_auto_detect_fail.groovy 
b/regression-test/suites/insert_overwrite_p0/test_iot_auto_detect_fail.groovy
index 07c271bec47..30a4c39785a 100644
--- 
a/regression-test/suites/insert_overwrite_p0/test_iot_auto_detect_fail.groovy
+++ 
b/regression-test/suites/insert_overwrite_p0/test_iot_auto_detect_fail.groovy
@@ -152,16 +152,31 @@ PROPERTIES (
 );
     """
 
+    // The overwrite must FAIL because the source rows (dt='20241128') fall 
into a
+    // partition that does not exist in fail_tag and 
enable_auto_create_when_overwrite=false.
+    // With multiple parallel sink instances this single logical failure can 
surface as any
+    // of several equivalent messages (the semantic "Cannot found origin 
partitions" error and
+    // the collateral "no partition for this tuple" / strict-mode 
filtered-data error race to be
+    // reported). Accept all of them, same as 
insert_overwrite_auto_detect.groovy. The point is
+    // that the statement fails (not silently creating a partition / 
succeeding).
+    def checkOverwriteFail = { result, exception, startTime, endTime ->
+        assertTrue(exception != null && (
+                exception.getMessage().contains('Cannot found origin 
partitions')
+                || exception.getMessage().contains('no partition for this 
tuple')
+                || exception.getMessage().contains('Insert has filtered data 
in strict mode')),
+            "expect insert-overwrite auto-detect to fail (no matching origin 
partition), "
+                + "but got result=${result}, 
exception=${exception?.getMessage()}")
+    }
     test {
         sql "insert overwrite table fail_tag PARTITION(*) select 
qsrq,lsh,wth,khh,dt from fail_src where dt='20241128';"
-        exception "Cannot found origin partitions"
+        check checkOverwriteFail
     }
     test {
         sql "insert overwrite table fail_tag PARTITION(*) select 
qsrq,lsh,wth,khh,dt from fail_src where dt='20241128';"
-        exception "Cannot found origin partitions"
+        check checkOverwriteFail
     }
     test {
         sql "insert overwrite table fail_tag PARTITION(*) select 
qsrq,lsh,wth,khh,dt from fail_src where dt='20241128';"
-        exception "Cannot found origin partitions"
+        check checkOverwriteFail
     }
 }
diff --git a/regression-test/suites/query_profile/s3_load_profile_test.groovy 
b/regression-test/suites/query_profile/s3_load_profile_test.groovy
index 9692efde734..6faaf369a12 100644
--- a/regression-test/suites/query_profile/s3_load_profile_test.groovy
+++ b/regression-test/suites/query_profile/s3_load_profile_test.groovy
@@ -17,6 +17,8 @@
 
 import groovy.json.JsonSlurper
 import org.apache.doris.regression.action.ProfileAction
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
 
 def getProfile = { masterHTTPAddr, id ->
     def dst = 'http://' + masterHTTPAddr
@@ -197,7 +199,20 @@ PROPERTIES (
     def masterAddress = masterIP + ":" + masterHTTPPort
     logger.info("masterIP:masterHTTPPort is:${masterAddress}")
 
-    def profileString = getProfile(masterAddress, jobId.toString())
+    // The BE reports the detailed execution profile to FE asynchronously, 
after
+    // the load job has already finished (the Summary section is pushed
+    // synchronously on txn VISIBLE, but the Fragments/operators are filled 
only
+    // when the coordinator BE's report lands). Fetching too early yields a
+    // profile whose MergedProfile carries no operators, so the scan counters
+    // are not present yet. Poll until they show up, i.e. the execution profile
+    // has landed; the await times out and fails loudly if it never does.
+    def profileString = ""
+    Awaitility.await().atMost(60, SECONDS).pollInterval(1, SECONDS).until {
+        profileString = getProfile(masterAddress, jobId.toString())
+        return profileString.contains("NumScanners") &&
+                profileString.contains("RowsProduced") &&
+                profileString.contains("RowsRead")
+    }
     logger.info("profileDataString:" + profileString)
     assertTrue(profileString.contains("NumScanners"))
     assertTrue(profileString.contains("RowsProduced"))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to