This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_optimize_pr in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0abc26eb08f3d45e015aa712baba970b809bf2f1 Author: 924060929 <[email protected]> AuthorDate: Wed Jun 24 17:47:33 2026 +0800 [test](local shuffle) regression tests for bucket upgrade + RF + count(distinct) - test_local_shuffle_bucket_upgrade: make assertions independent of BE count (pin bucket_shuffle_downgrade_ratio=0, remove environment-sensitive ratio=1.5 cases) - Forced runtime-filter correctness case: 4-arm test (upgrade+RF vs bucket+RF vs shuffle+RF vs upgrade-no-RF) ensuring the force_local_merge signal keeps RF correct under bucket-to-hash upgrade - Pin DORIS-25413 regression: count(distinct) under serial exchange, verifying the FE planner's require framework inserts hash LE for non-streaming dedup --- .../test_local_shuffle_bucket_upgrade.groovy | 35 ++++++++++------ .../test_local_shuffle_rqg_bugs.groovy | 49 ++++++++++++++++++++++ 2 files changed, 72 insertions(+), 12 deletions(-) diff --git a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy index 0d9c21fe7d9..0e9c8ad85b4 100644 --- a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy +++ b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy @@ -28,10 +28,13 @@ * Shape notes (verified against a live cluster): * - LocalExchangeNodes only appear in EXPLAIN DISTRIBUTED PLAN (plain EXPLAIN * renders the tree before AddLocalExchange runs). - * - Nereids bucket-shuffle downgrade: bucket shuffle only forms when - * totalBucketNum >= totalInstanceNum * 0.8, so BUCKETS 13 with - * parallel_pipeline_task_num=16 on 1 BE (13 >= 12.8) keeps the bucket join, - * and ratio=1.1 (16 > 13*1.1) enables the upgrade while default 1.5 does not. + * - Nereids bucket-shuffle downgrade (bucket shuffle only forms when + * totalBucketNum >= totalInstanceNum * bucket_shuffle_downgrade_ratio) depends on + * the alive BE count, so the suite pins bucket_shuffle_downgrade_ratio=0 to keep + * the bucket join forming in any environment. ratio=1.1 fires on any machine + * with >= 4 cores: min(task_num=16, cores) / min(buckets=4, cores) >= 4/4 = 1.0 + * at 4 cores, and 5/4 = 1.25 > 1.1 at 5+ cores. Bucket counts are kept low + * (4/3/3) to avoid tripping the cores-aware gate on small machines. * - The aggregation above must NOT group by the bucket key: a colocate agg * requires bucket distribution of the join output and correctly blocks the * upgrade via the parentRequire gate. @@ -51,6 +54,7 @@ suite("test_local_shuffle_bucket_upgrade") { parallel_pipeline_task_num=16, parallel_exchange_instance_num=8, query_timeout=600, + bucket_shuffle_downgrade_ratio=0, local_shuffle_bucket_upgrade_ratio=${ratio}, enable_local_shuffle=${ls_on}, enable_local_shuffle_planner=${ls_on} @@ -61,13 +65,13 @@ suite("test_local_shuffle_bucket_upgrade") { sql "DROP TABLE IF EXISTS lsbu_probe" sql "DROP TABLE IF EXISTS lsbu_probe2" sql """CREATE TABLE lsbu_fact (k INT, v BIGINT) - ENGINE=OLAP DUPLICATE KEY(k) DISTRIBUTED BY HASH(k) BUCKETS 13 + ENGINE=OLAP DUPLICATE KEY(k) DISTRIBUTED BY HASH(k) BUCKETS 4 PROPERTIES ("replication_num"="1")""" sql """CREATE TABLE lsbu_probe (pk INT, k INT, w BIGINT) - ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 7 + ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 3 PROPERTIES ("replication_num"="1")""" sql """CREATE TABLE lsbu_probe2 (pk INT, k INT, w BIGINT) - ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 5 + ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 3 PROPERTIES ("replication_num"="1")""" sql """INSERT INTO lsbu_fact SELECT CAST(number%50 AS INT), number*10+1 @@ -107,11 +111,6 @@ suite("test_local_shuffle_bucket_upgrade") { assertFalse(ratioOnePlan.toString().contains("LOCAL_EXECUTION_HASH_SHUFFLE"), "ratio=1 must keep the upgrade off (<=1 disables)") - // default ratio 1.5 does not fire here: 16 < 13*1.5 (gate respects the threshold) - def ratioDefaultPlan = sql "EXPLAIN DISTRIBUTED PLAN ${singleJoin(hints('true', '1.5'))}" - assertFalse(ratioDefaultPlan.toString().contains("LOCAL_EXECUTION_HASH_SHUFFLE"), - "ratio=1.5 with 16 instances vs 13 buckets (16 < 19.5) must not upgrade") - // Note: whether a group-by-bucket-key agg blocks the upgrade depends on the agg // shape the optimizer picks (a colocate one-phase agg requires bucket distribution // and blocks it; a two-phase agg does not). That parentRequire gate is covered @@ -154,6 +153,18 @@ suite("test_local_shuffle_bucket_upgrade") { assertTrue(stackedUpgradedText.contains("LOCAL_EXECUTION_HASH_SHUFFLE"), "ratio=1.1 must upgrade the stacked bucket chain to LOCAL hash") + // Forced-RF killer case: with the upgrade, the join build is hash-sliced; the + // per-instance IN/MIN_MAX partial filters MUST be merged before application + // (TRuntimeFilterDesc.force_local_merge). Before that fix this query silently + // lost up to 96% of its rows. + def rfHints = { ratio -> + hints('true', ratio).replace(")*/", + ", enable_runtime_filter_prune=false, runtime_filter_type='IN,MIN_MAX')*/") + } + def single_up_rf = sql "SELECT ${rfHints('1.1')} p.pk % 10 AS g, COUNT(*) c, SUM(f.v) sv, SUM(p.w) sw FROM lsbu_fact f JOIN lsbu_probe p ON p.k = f.k GROUP BY g ORDER BY g" + assertEquals(single_baseline, single_up_rf, + "upgraded bucket join with forced IN/MIN_MAX runtime filters must stay correct") + def stacked_baseline = sql stackedJoin(hints('false', '0')) def stacked_bucket = sql stackedJoin(hints('true', '0')) def stacked_upgraded = sql stackedJoin(hints('true', '1.1')) diff --git a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy index e369dc0f11e..31cc5720581 100644 --- a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy +++ b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy @@ -1160,6 +1160,55 @@ suite("test_local_shuffle_rqg_bugs") { assertTrue(false, "Bug 20: Serial exchange + agg hang: ${t.message}") } + // ============================================================ + // DORIS-25413: count(distinct)+std + RIGHT JOIN returns inflated distinct count + // when use_serial_exchange=true + enable_local_exchange_before_agg=false. + // Root cause (BE-planned): AggSink early-return ignored that the serial exchange + // child breaks the HASH(s) invariant via PASSTHROUGH fan-out; fixed upstream by + // child_breaks_local_key_distribution (#63766). The FE planner fixes it + // structurally: requires are semantic, a hash LE is inserted instead of + // PASSTHROUGH. This case pins both paths. + // ============================================================ + try { + logger.info("DORIS-25413: count(distinct) under serial exchange") + sql "DROP TABLE IF EXISTS rqg_25413_t1" + sql "DROP TABLE IF EXISTS rqg_25413_t2" + sql """CREATE TABLE rqg_25413_t1 (pk INT NOT NULL, s VARCHAR(64) NOT NULL, d DECIMAL(10,2) NOT NULL) + ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 5 + PROPERTIES ("replication_num"="1")""" + sql """CREATE TABLE rqg_25413_t2 (pk INT NOT NULL, dt DATETIME NOT NULL) + ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 5 + PROPERTIES ("replication_num"="1")""" + sql """INSERT INTO rqg_25413_t1 + SELECT CAST(number AS INT), concat('s', CAST(number % 29 AS INT)), + CAST(number * 13 % 1000 AS DECIMAL(10,2)) + FROM numbers("number"="200")""" + sql """INSERT INTO rqg_25413_t2 + SELECT CAST(number AS INT), + date_add('2000-01-01 00:00:00', INTERVAL CAST(number % 3000 AS INT) DAY) + FROM numbers("number"="200")""" + + def q25413 = { vars -> """ + SELECT /*+SET_VAR(${vars})*/ + count(distinct t1.s) AS cnt_distinct, std(t1.d) AS std_val + FROM rqg_25413_t1 t1 + RIGHT JOIN rqg_25413_t2 t2 ON t1.pk = t2.pk + WHERE t2.dt < '2005-01-01 00:00:00' + """ } + def base25413 = "enable_sql_cache=false, enable_local_exchange_before_agg=false, parallel_pipeline_task_num=4" + def expected25413 = sql q25413(base25413) + for (planner in ['false', 'true']) { + def actual = sql q25413( + "${base25413}, experimental_use_serial_exchange=true, enable_local_shuffle_planner=${planner}") + assertEquals(expected25413, actual, + "DORIS-25413 planner=${planner}: distinct count must not be inflated under serial exchange") + } + logger.info("DORIS-25413: PASSED") + } catch (Throwable t) { + logger.error("DORIS-25413 FAILED: ${t.message}") + assertTrue(false, "DORIS-25413: ${t.message}") + } + // ============================================================ // Bug 21: Multi-distinct COUNT on many-bucket table → COREDUMP // RQG build 186737/186929/186952: AggSinkOperatorX::sink → set_ready_to_read --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
