This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_optimize
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/fe_local_shuffle_optimize by 
this push:
     new 150113e25b7 [test](local shuffle) pin DORIS-25413: count(distinct) 
under serial exchange
150113e25b7 is described below

commit 150113e25b7e0e9d72b64027ed2e91d800d5dd6d
Author: 924060929 <[email protected]>
AuthorDate: Thu Jun 4 21:52:16 2026 +0800

    [test](local shuffle) pin DORIS-25413: count(distinct) under serial exchange
    
    DORIS-25413 (closed in April as 'fix after the FE local exchange planner'):
    count(distinct)+std with RIGHT JOIN under use_serial_exchange=true +
    enable_local_exchange_before_agg=false returned an inflated distinct count —
    the BE inserted PASSTHROUGH after the serial hash exchange, breaking the
    HASH(s) invariant before the merge agg.
    
    Both halves are now covered and this case pins them:
    - BE-planned path: fixed upstream by child_breaks_local_key_distribution
      (#63766), inherited via the master rebase.
    - FE-planned path: the require framework asks for hash semantically, so a
      hash local exchange is inserted instead of PASSTHROUGH by construction.
    
    Verified with the original reproduction recipe on a 3-BE cluster: both
    planner modes match the no-serial baseline (29 distinct values).
---
 .../test_local_shuffle_rqg_bugs.groovy             | 49 ++++++++++++++++++++++
 1 file changed, 49 insertions(+)

diff --git 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
index b7120b17637..e6b3b3ba4d6 100644
--- 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
+++ 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
@@ -1149,6 +1149,55 @@ suite("test_local_shuffle_rqg_bugs") {
         assertTrue(false, "Bug 20: Serial exchange + agg hang: ${t.message}")
     }
 
+    // ============================================================
+    //  DORIS-25413: count(distinct)+std + RIGHT JOIN returns inflated 
distinct count
+    //  when use_serial_exchange=true + enable_local_exchange_before_agg=false.
+    //  Root cause (BE-planned): AggSink early-return ignored that the serial 
exchange
+    //  child breaks the HASH(s) invariant via PASSTHROUGH fan-out; fixed 
upstream by
+    //  child_breaks_local_key_distribution (#63766). The FE planner fixes it
+    //  structurally: requires are semantic, a hash LE is inserted instead of
+    //  PASSTHROUGH. This case pins both paths.
+    // ============================================================
+    try {
+        logger.info("DORIS-25413: count(distinct) under serial exchange")
+        sql "DROP TABLE IF EXISTS rqg_25413_t1"
+        sql "DROP TABLE IF EXISTS rqg_25413_t2"
+        sql """CREATE TABLE rqg_25413_t1 (pk INT NOT NULL, s VARCHAR(64) NOT 
NULL, d DECIMAL(10,2) NOT NULL)
+               ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 5
+               PROPERTIES ("replication_num"="1")"""
+        sql """CREATE TABLE rqg_25413_t2 (pk INT NOT NULL, dt DATETIME NOT 
NULL)
+               ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 5
+               PROPERTIES ("replication_num"="1")"""
+        sql """INSERT INTO rqg_25413_t1
+               SELECT CAST(number AS INT), concat('s', CAST(number % 29 AS 
INT)),
+                      CAST(number * 13 % 1000 AS DECIMAL(10,2))
+               FROM numbers("number"="200")"""
+        sql """INSERT INTO rqg_25413_t2
+               SELECT CAST(number AS INT),
+                      date_add('2000-01-01 00:00:00', INTERVAL CAST(number % 
3000 AS INT) DAY)
+               FROM numbers("number"="200")"""
+
+        def q25413 = { vars -> """
+            SELECT /*+SET_VAR(${vars})*/
+                count(distinct t1.s) AS cnt_distinct, std(t1.d) AS std_val
+            FROM rqg_25413_t1 t1
+            RIGHT JOIN rqg_25413_t2 t2 ON t1.pk = t2.pk
+            WHERE t2.dt < '2005-01-01 00:00:00'
+        """ }
+        def base25413 = "enable_sql_cache=false, 
enable_local_exchange_before_agg=false, parallel_pipeline_task_num=4"
+        def expected25413 = sql q25413(base25413)
+        for (planner in ['false', 'true']) {
+            def actual = sql q25413(
+                "${base25413}, experimental_use_serial_exchange=true, 
enable_local_shuffle_planner=${planner}")
+            assertEquals(expected25413, actual,
+                "DORIS-25413 planner=${planner}: distinct count must not be 
inflated under serial exchange")
+        }
+        logger.info("DORIS-25413: PASSED")
+    } catch (Throwable t) {
+        logger.error("DORIS-25413 FAILED: ${t.message}")
+        assertTrue(false, "DORIS-25413: ${t.message}")
+    }
+
     // ============================================================
     //  Bug 21: Multi-distinct COUNT on many-bucket table → COREDUMP
     //  RQG build 186737/186929/186952: AggSinkOperatorX::sink → 
set_ready_to_read


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to