This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_optimize
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 9fc18602238de07eda011f284da39a1b3d051e73
Author: 924060929 <[email protected]>
AuthorDate: Thu Jun 4 20:00:33 2026 +0800

    [fix](exchange) orphan detection must use per-BE local task index and skip 
serial exchanges
    
    Two corrections to the bucket-shuffle orphan receiver fix, both found by 
3-BE
    local testing (single-BE tests could not catch either):
    
    1. Index currency: the predicate compared per_fragment_instance_idx (= 
sender_id
       = indexInUnassignedJob, a GLOBAL index across all workers: be1 0-15, be2
       16-31) against bucket_seq_to_instance_idx values (built per worker on FE,
       LOCAL 0-15 on every BE). On any worker after the first, every instance 
was
       misjudged as an orphan, its receiver started at EOS, and that worker's
       buckets were silently dropped (3-BE repro: exactly 1/4 of join rows lost,
       matching be2's single bucket). Use LocalStateInfo::task_idx — the per-BE
       local index, the same currency as the map values and local exchange
       channels.
    
    2. Serial gate: with a serial exchange (planner off / funnel mode) 
destinations
       go to the first instance per worker regardless of bucket ownership, so
       ownership-based orphan detection is conceptually invalid there; BE's
       serial-exchange mechanics already close the unaddressed receivers. The
       predicate now requires !is_serial_operator().
    
    Verified on 3 BEs (4-bucket 20M x 20M single bucket-shuffle join, 200M-row
    output): all arms match the local-shuffle-off baseline; upgraded 16-way 
0.42s
    beats both bucket 4-way 0.47s and downgraded shuffle 0.47s by ~11%.
---
 be/src/exec/operator/exchange_source_operator.cpp |  3 ++-
 be/src/exec/operator/exchange_source_operator.h   | 20 +++++++++++++++++---
 2 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/be/src/exec/operator/exchange_source_operator.cpp 
b/be/src/exec/operator/exchange_source_operator.cpp
index 8809e0002f9..03a60abe6d6 100644
--- a/be/src/exec/operator/exchange_source_operator.cpp
+++ b/be/src/exec/operator/exchange_source_operator.cpp
@@ -65,7 +65,7 @@ std::string ExchangeSourceOperatorX::debug_string(int 
indentation_level) const {
 void ExchangeLocalState::create_stream_recvr(RuntimeState* state) {
     auto& p = _parent->cast<ExchangeSourceOperatorX>();
     int num_senders = p.num_senders();
-    if 
(p.is_bucket_shuffle_orphan_instance(state->per_fragment_instance_idx())) {
+    if (p.is_bucket_shuffle_orphan_instance(local_task_idx)) {
         // Bucket-routed senders open one channel per destination entry (one 
per bucket),
         // so an instance owning no bucket never gets a channel — and never 
gets EOS.
         // Start its receiver with zero senders so it reports EOS immediately 
instead of
@@ -83,6 +83,7 @@ Status ExchangeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
+    local_task_idx = info.task_idx;
     create_stream_recvr(state);
     const auto& queues = stream_recvr->sender_queues();
     deps.resize(queues.size());
diff --git a/be/src/exec/operator/exchange_source_operator.h 
b/be/src/exec/operator/exchange_source_operator.h
index fb7598269e3..b8726851615 100644
--- a/be/src/exec/operator/exchange_source_operator.h
+++ b/be/src/exec/operator/exchange_source_operator.h
@@ -78,6 +78,9 @@ public:
     doris::VExprContextSPtrs ordering_expr_ctxs;
     int64_t num_rows_skipped;
     bool is_ready;
+    // per-BE local instance index (LocalStateInfo::task_idx), used for 
bucket-shuffle
+    // orphan detection in create_stream_recvr — see 
is_bucket_shuffle_orphan_instance.
+    int local_task_idx = 0;
 
     std::vector<std::shared_ptr<Dependency>> deps;
 
@@ -125,9 +128,20 @@ public:
         _has_bucket_dest_instances = true;
     }
 
-    [[nodiscard]] bool is_bucket_shuffle_orphan_instance(int instance_idx) 
const {
-        return _partition_type == 
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED &&
-               _has_bucket_dest_instances && 
!_bucket_dest_instances.contains(instance_idx);
+    // local_task_idx is the per-BE local instance index 
(LocalStateInfo::task_idx) — the
+    // same numbering as bucket_seq_to_instance_idx values (built per worker 
on FE). Do NOT
+    // pass per_fragment_instance_idx here: that is sender_id = the GLOBAL 
index across all
+    // workers, which only coincides with the local index on the first worker 
(single-BE
+    // tests pass, multi-BE silently drops every later worker's buckets).
+    //
+    // Ownership-based orphan detection is only valid when destinations follow 
bucket
+    // ownership, i.e. the non-serial (FE planner dest spread) mode. A serial 
exchange's
+    // destinations funnel to the first instance per worker regardless of 
bucket ownership,
+    // and BE's serial-exchange mechanics already close the other receivers.
+    [[nodiscard]] bool is_bucket_shuffle_orphan_instance(int local_task_idx) 
const {
+        return !is_serial_operator() &&
+               _partition_type == 
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED &&
+               _has_bucket_dest_instances && 
!_bucket_dest_instances.contains(local_task_idx);
     }
 
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to