This is an automated email from the ASF dual-hosted git repository.
morrySnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a183718e000 [fix](coordinator) fix computeDestIdToInstanceId picking
wrong ExchangeNode for multi-input fragments (#63615)
a183718e000 is described below
commit a183718e000d0f09b6cb2e12c4dc7c59d5417894
Author: 924060929 <[email protected]>
AuthorDate: Thu May 28 16:43:03 2026 +0800
[fix](coordinator) fix computeDestIdToInstanceId picking wrong ExchangeNode
for multi-input fragments (#63615)
## Proposed changes
Fix `Rows mismatched! Data may be lost` error when a fragment receives
data from
multiple ExchangeNode inputs with different partition types (e.g. NLJ
with
HASH-partitioned probe + BROADCAST build).
### Root cause
`ThriftPlansBuilder.filterInstancesWhichReceiveDataFromRemote` used
`.iterator().next()` to pick the first input ExchangeNode. The iteration
order
over a `Set<Entry>` is non-deterministic. When it happens to pick the
BROADCAST
input (1 destination per BE), `shuffle_idx_to_instance_idx` has only 1
entry,
while the HASH LOCAL_EXCHANGE expects N entries (one per pipeline task).
Most
hash partition indices find no mapping, and BE reports the error.
Reproduction: a CTE query with `MultiCastDataSinks` sending
UNPARTITIONED (to a
BROADCAST build) and HASH_PARTITIONED (to an INNER JOIN build) into the
same
scan-free fragment. The bug is non-deterministic because it depends on
Set
iteration order.
### Fix
Iterate all input exchanges and select the one with the most
destinations on the
target worker. This correctly identifies the main data-carrying
(HASH-partitioned) exchange, ensuring the map is complete.
---
.../doris/qe/runtime/ThriftPlansBuilder.java | 29 +++++--
.../test_multicast_sink_multi_exchange.groovy | 96 ++++++++++++++++++++++
2 files changed, 120 insertions(+), 5 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 4f8499970c9..2ccb96912d7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -594,13 +594,32 @@ public class ThriftPlansBuilder {
PipelineDistributedPlan receivePlan, DistributedPlanWorker
filterWorker,
BiConsumer<AssignedJob, Integer> computeFn) {
- // current only support all input plans have same destination with
same order,
- // so we can get first input plan to compute shuffle index to instance
id
- Set<Entry<ExchangeNode, DistributedPlan>> exchangeToChildPlanSet =
receivePlan.getInputs().entries();
- if (exchangeToChildPlanSet.isEmpty()) {
+ // When a fragment has multiple ExchangeNode inputs (e.g. NLJ with
probe + BROADCAST
+ // build), pick the one with the most destinations on this worker. A
BROADCAST input has
+ // 1 dest per BE while HASH-partitioned has N; using BROADCAST would
produce a 1-entry
+ // map and cause 'Rows mismatched' for GLOBAL_HASH LOCAL_EXCHANGE.
+ Entry<ExchangeNode, DistributedPlan> exchangeToChildPlan = null;
+ int maxDestsOnWorker = -1;
+ for (Entry<ExchangeNode, DistributedPlan> entry :
receivePlan.getInputs().entries()) {
+ ExchangeNode exchNode = entry.getKey();
+ PipelineDistributedPlan childPlan = (PipelineDistributedPlan)
entry.getValue();
+ for (Entry<DataSink, List<AssignedJob>> kv :
childPlan.getDestinations().entrySet()) {
+ if (kv.getKey().getExchNodeId().asInt() !=
exchNode.getId().asInt()) {
+ continue;
+ }
+ int destsOnWorker = (int) kv.getValue().stream()
+ .filter(j -> j.getAssignedWorker().id() ==
filterWorker.id())
+ .count();
+ if (destsOnWorker > maxDestsOnWorker) {
+ maxDestsOnWorker = destsOnWorker;
+ exchangeToChildPlan = entry;
+ }
+ break;
+ }
+ }
+ if (exchangeToChildPlan == null) {
return;
}
- Entry<ExchangeNode, DistributedPlan> exchangeToChildPlan =
exchangeToChildPlanSet.iterator().next();
ExchangeNode linkNode = exchangeToChildPlan.getKey();
PipelineDistributedPlan firstInputPlan = (PipelineDistributedPlan)
exchangeToChildPlan.getValue();
Map<DataSink, List<AssignedJob>> sinkToDestInstances =
firstInputPlan.getDestinations();
diff --git
a/regression-test/suites/nereids_syntax_p0/distribute/test_multicast_sink_multi_exchange.groovy
b/regression-test/suites/nereids_syntax_p0/distribute/test_multicast_sink_multi_exchange.groovy
new file mode 100644
index 00000000000..a2a670d50d4
--- /dev/null
+++
b/regression-test/suites/nereids_syntax_p0/distribute/test_multicast_sink_multi_exchange.groovy
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Regression test for computeDestIdToInstanceId picking wrong ExchangeNode
+// when a fragment has multiple inputs with different partition types
+// (BROADCAST + HASH_PARTITIONED). Before the fix, .iterator().next() could
+// pick the BROADCAST input (1 dest per BE), producing a 1-entry shuffle map
+// and causing "Rows mismatched! Data may be lost".
+suite("test_multicast_sink_multi_exchange") {
+ sql "DROP TABLE IF EXISTS mse_fact"
+ sql "DROP TABLE IF EXISTS mse_dim"
+
+ sql """
+ CREATE TABLE mse_fact (
+ k1 VARCHAR(64) NOT NULL,
+ k2 INT NOT NULL,
+ v1 INT NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(k1, k2)
+ DISTRIBUTED BY HASH(k2) BUCKETS 8
+ PROPERTIES ("replication_num" = "1")
+ """
+ sql """
+ CREATE TABLE mse_dim (
+ k1 VARCHAR(64) NOT NULL,
+ v1 INT NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(k1)
+ DISTRIBUTED BY HASH(v1) BUCKETS 7
+ PROPERTIES ("replication_num" = "1")
+ """
+
+ // Use numbers() to generate enough rows across buckets
+ sql """
+ INSERT INTO mse_dim SELECT CONCAT('key_', number), number
+ FROM numbers('number' = '100')
+ """
+ sql """
+ INSERT INTO mse_fact SELECT CONCAT('key_', number % 100), number % 10,
number
+ FROM numbers('number' = '500')
+ """
+
+ sql "SET enable_nereids_distribute_planner=true"
+ sql "SET disable_join_reorder=true"
+ sql "SET enable_local_shuffle=true"
+
+ // CTE consumed by LEFT SEMI JOIN [broadcast] (1 dest per BE) and
+ // INNER JOIN [shuffle] (N dests per BE), producing MultiCastDataSinks
+ // with different partition types to the same downstream fragment.
+ // Vary parallel_pipeline_task_num to change per-BE destination counts.
+ for (def ppt in [1, 2, 4, 8, 16]) {
+ def expected = sql """
+ SELECT /*+
SET_VAR(parallel_pipeline_task_num=${ppt},enable_nereids_distribute_planner=false)
*/
+ t2.k2, SUM(t1.v1)
+ FROM (
+ SELECT k1, k2 FROM mse_fact GROUP BY k1, k2
+ ) t2
+ LEFT SEMI JOIN mse_dim d ON t2.k1 = d.k1
+ INNER JOIN mse_dim t1 ON t2.k1 = t1.k1
+ GROUP BY t2.k2 ORDER BY t2.k2
+ """
+
+ test {
+ sql """
+ WITH /*+ SET_VAR(parallel_pipeline_task_num=${ppt}) */ dim_cte
AS (
+ SELECT k1, SUM(v1) as v1 FROM mse_dim GROUP BY k1
+ )
+ SELECT t2.k2, SUM(t1.v1)
+ FROM (
+ SELECT k1, k2 FROM mse_fact GROUP BY k1, k2
+ ) t2
+ LEFT SEMI JOIN [broadcast] dim_cte d ON t2.k1 = d.k1
+ INNER JOIN [shuffle] dim_cte t1 ON t2.k1 = t1.k1
+ GROUP BY t2.k2 ORDER BY t2.k2
+ """
+ result(expected)
+ }
+ }
+
+ sql "DROP TABLE IF EXISTS mse_fact"
+ sql "DROP TABLE IF EXISTS mse_dim"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]