This is an automated email from the ASF dual-hosted git repository.
zclll pushed a commit to branch tpc_preview6
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/tpc_preview6 by this push:
new 398065043e0 add PASSTHROUGH in agg and join (#59302) (#60479)
398065043e0 is described below
commit 398065043e042f5ca9cee04b7e54424f7a2b2028
Author: Mryange <[email protected]>
AuthorDate: Tue Feb 3 21:12:34 2026 +0800
add PASSTHROUGH in agg and join (#59302) (#60479)
---
be/src/pipeline/exec/hashjoin_probe_operator.h | 2 ++
be/src/pipeline/exec/operator.h | 2 ++
be/src/pipeline/exec/streaming_aggregation_operator.h | 2 +-
3 files changed, 5 insertions(+), 1 deletion(-)
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 45d08e7b021..61735129d1d 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -150,6 +150,8 @@ public:
}
bool is_broadcast_join() const { return _is_broadcast_join; }
+ bool is_hash_join_probe() const override { return true; }
+
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 1b6afecd0e6..8079cdf2976 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -138,6 +138,8 @@ public:
return Status::OK();
}
+ virtual bool is_hash_join_probe() const { return false; }
+
/**
* Pipeline task is blockable means it will be blocked in the next run. So
we should put the
* pipeline task into the blocking task scheduler.
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 26ce294f8a9..25f5564978b 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -219,7 +219,7 @@ public:
_spill_streaming_agg_mem_limit = 1024 * 1024;
}
DataDistribution required_data_distribution(RuntimeState* state) const
override {
- if (state->enable_streaming_agg_force_passthrough()) {
+ if (_child && _child->is_hash_join_probe()) {
return DataDistribution(ExchangeType::PASSTHROUGH);
}
if (!state->get_query_ctx()->should_be_shuffled_agg(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]