This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_optimize in repository https://gitbox.apache.org/repos/asf/doris.git
commit 9fc18602238de07eda011f284da39a1b3d051e73 Author: 924060929 <[email protected]> AuthorDate: Thu Jun 4 20:00:33 2026 +0800 [fix](exchange) orphan detection must use per-BE local task index and skip serial exchanges Two corrections to the bucket-shuffle orphan receiver fix, both found by 3-BE local testing (single-BE tests could not catch either): 1. Index currency: the predicate compared per_fragment_instance_idx (= sender_id = indexInUnassignedJob, a GLOBAL index across all workers: be1 0-15, be2 16-31) against bucket_seq_to_instance_idx values (built per worker on FE, LOCAL 0-15 on every BE). On any worker after the first, every instance was misjudged as an orphan, its receiver started at EOS, and that worker's buckets were silently dropped (3-BE repro: exactly 1/4 of join rows lost, matching be2's single bucket). Use LocalStateInfo::task_idx — the per-BE local index, the same currency as the map values and local exchange channels. 2. Serial gate: with a serial exchange (planner off / funnel mode) destinations go to the first instance per worker regardless of bucket ownership, so ownership-based orphan detection is conceptually invalid there; BE's serial-exchange mechanics already close the unaddressed receivers. The predicate now requires !is_serial_operator(). Verified on 3 BEs (4-bucket 20M x 20M single bucket-shuffle join, 200M-row output): all arms match the local-shuffle-off baseline; upgraded 16-way 0.42s beats both bucket 4-way 0.47s and downgraded shuffle 0.47s by ~11%. --- be/src/exec/operator/exchange_source_operator.cpp | 3 ++- be/src/exec/operator/exchange_source_operator.h | 20 +++++++++++++++++--- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/be/src/exec/operator/exchange_source_operator.cpp b/be/src/exec/operator/exchange_source_operator.cpp index 8809e0002f9..03a60abe6d6 100644 --- a/be/src/exec/operator/exchange_source_operator.cpp +++ b/be/src/exec/operator/exchange_source_operator.cpp @@ -65,7 +65,7 @@ std::string ExchangeSourceOperatorX::debug_string(int indentation_level) const { void ExchangeLocalState::create_stream_recvr(RuntimeState* state) { auto& p = _parent->cast<ExchangeSourceOperatorX>(); int num_senders = p.num_senders(); - if (p.is_bucket_shuffle_orphan_instance(state->per_fragment_instance_idx())) { + if (p.is_bucket_shuffle_orphan_instance(local_task_idx)) { // Bucket-routed senders open one channel per destination entry (one per bucket), // so an instance owning no bucket never gets a channel — and never gets EOS. // Start its receiver with zero senders so it reports EOS immediately instead of @@ -83,6 +83,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); + local_task_idx = info.task_idx; create_stream_recvr(state); const auto& queues = stream_recvr->sender_queues(); deps.resize(queues.size()); diff --git a/be/src/exec/operator/exchange_source_operator.h b/be/src/exec/operator/exchange_source_operator.h index fb7598269e3..b8726851615 100644 --- a/be/src/exec/operator/exchange_source_operator.h +++ b/be/src/exec/operator/exchange_source_operator.h @@ -78,6 +78,9 @@ public: doris::VExprContextSPtrs ordering_expr_ctxs; int64_t num_rows_skipped; bool is_ready; + // per-BE local instance index (LocalStateInfo::task_idx), used for bucket-shuffle + // orphan detection in create_stream_recvr — see is_bucket_shuffle_orphan_instance. + int local_task_idx = 0; std::vector<std::shared_ptr<Dependency>> deps; @@ -125,9 +128,20 @@ public: _has_bucket_dest_instances = true; } - [[nodiscard]] bool is_bucket_shuffle_orphan_instance(int instance_idx) const { - return _partition_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED && - _has_bucket_dest_instances && !_bucket_dest_instances.contains(instance_idx); + // local_task_idx is the per-BE local instance index (LocalStateInfo::task_idx) — the + // same numbering as bucket_seq_to_instance_idx values (built per worker on FE). Do NOT + // pass per_fragment_instance_idx here: that is sender_id = the GLOBAL index across all + // workers, which only coincides with the local index on the first worker (single-BE + // tests pass, multi-BE silently drops every later worker's buckets). + // + // Ownership-based orphan detection is only valid when destinations follow bucket + // ownership, i.e. the non-serial (FE planner dest spread) mode. A serial exchange's + // destinations funnel to the first instance per worker regardless of bucket ownership, + // and BE's serial-exchange mechanics already close the other receivers. + [[nodiscard]] bool is_bucket_shuffle_orphan_instance(int local_task_idx) const { + return !is_serial_operator() && + _partition_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED && + _has_bucket_dest_instances && !_bucket_dest_instances.contains(local_task_idx); } DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
