This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new bfa0ea8056 Hash join buffering on probe side (#19761)
bfa0ea8056 is described below
commit bfa0ea80567ddfdc9aed567c9d2acb9e23316a2d
Author: Gabriel <[email protected]>
AuthorDate: Sat Mar 7 12:30:21 2026 +0100
Hash join buffering on probe side (#19761)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
It does not close any issue, but it's related to:
- https://github.com/apache/datafusion/issues/17494
- https://github.com/apache/datafusion/issues/15885
- https://github.com/apache/datafusion/issues/18942
## Rationale for this change
This is a PR from a batch of PRs that attempt to improve performance in
hash joins:
- https://github.com/apache/datafusion/pull/19759
- https://github.com/apache/datafusion/pull/19760
- This PR
It adds the new `BufferExec` node at the top of the probe side of hash
joins so that some work is eagerly performed before the build side of
the hash join is completely finished.
### Why should this speed up joins?
In order to better understand the impact of this PR, it's useful to
understand how streams work in Rust: creating a stream does not perform
any work, progress is just made if the stream gets polled.
This means that whenever we call `.execute()` on an `ExecutionPlan`
(like the probe side of a join), nothing happens, not even the most
basic TCP connections or system calls are performed. Instead, all this
work is delayed as much as possible until the first poll is made to the
stream, losing the opportunity to make some early progress.
This gets worst when multiple hash joins are chained together: they will
get executed in cascade as if they were domino pieces, which has the
benefit of leaving a small memory footprint, but underutilizes the
resources of the machine for executing the query faster.
> [!NOTE]
> Even if this shows overall performance improvement in the benchmarks,
it can show performance degradation on queries with dynamic filters, so
hash join buffering is disabled by default, and users can opt in.
> Follow up work will be needed in order to make this interact well with
dynamic filters.
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
## What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
Adds a new `HashJoinBuffering` physical optimizer rule that will
idempotently place `BufferExec` nodes on the probe side of has joins:
```
┌───────────────────┐
│ HashJoinExec │
└─────▲────────▲────┘
┌───────┘ └─────────┐
│ │
┌────────────────┐ ┌─────────────────┐
│ Build side │ + │ BufferExec │
└────────────────┘ └────────▲────────┘
│
┌────────┴────────┐
│ Probe side │
└─────────────────┘
```
## Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
yes, by existing tests
## Are there any user-facing changes?
Not by default, users can now opt in to this feature with the
hash_join_buffering_capacity config parameter.
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---
# Results
> [!NOTE]
> Note that a small number of TPC-DS queries have regressed, this is
because with eager buffering they do not benefit from dynamic filters as
much. This is the main reason for leaving this config parameter disabled
by default until we have a proper way for interacting with the dynamic
filters inside the BufferExec node.
```
./bench.sh compare main hash-join-buffering-on-probe-side
Comparing main and hash-join-buffering-on-probe-side
--------------------
Benchmark tpcds_sf1.json
--------------------
┏━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ hash-join-buffering-on-probe-side ┃
Change ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 39.70 ms │ 18.30 ms │ +2.17x
faster │
│ QQuery 2 │ 134.95 ms │ 57.64 ms │ +2.34x
faster │
│ QQuery 3 │ 103.08 ms │ 89.88 ms │ +1.15x
faster │
│ QQuery 4 │ 1034.04 ms │ 340.84 ms │ +3.03x
faster │
│ QQuery 5 │ 155.97 ms │ 139.44 ms │ +1.12x
faster │
│ QQuery 6 │ 591.97 ms │ 523.17 ms │ +1.13x
faster │
│ QQuery 7 │ 304.47 ms │ 235.52 ms │ +1.29x
faster │
│ QQuery 8 │ 100.02 ms │ 90.65 ms │ +1.10x
faster │
│ QQuery 9 │ 91.86 ms │ 92.25 ms │ no
change │
│ QQuery 10 │ 90.97 ms │ 47.59 ms │ +1.91x
faster │
│ QQuery 11 │ 649.77 ms │ 217.42 ms │ +2.99x
faster │
│ QQuery 12 │ 38.12 ms │ 31.71 ms │ +1.20x
faster │
│ QQuery 13 │ 337.93 ms │ 302.24 ms │ +1.12x
faster │
│ QQuery 14 │ 797.93 ms │ 439.40 ms │ +1.82x
faster │
│ QQuery 15 │ 28.14 ms │ 47.75 ms │ 1.70x
slower │
│ QQuery 16 │ 34.08 ms │ 103.14 ms │ 3.03x
slower │
│ QQuery 17 │ 225.43 ms │ 152.29 ms │ +1.48x
faster │
│ QQuery 18 │ 118.09 ms │ 164.11 ms │ 1.39x
slower │
│ QQuery 19 │ 143.39 ms │ 120.07 ms │ +1.19x
faster │
│ QQuery 20 │ 12.70 ms │ 52.15 ms │ 4.11x
slower │
│ QQuery 21 │ 16.74 ms │ 184.55 ms │ 11.02x
slower │
│ QQuery 22 │ 311.97 ms │ 358.70 ms │ 1.15x
slower │
│ QQuery 23 │ 807.41 ms │ 531.22 ms │ +1.52x
faster │
│ QQuery 24 │ 347.90 ms │ 279.01 ms │ +1.25x
faster │
│ QQuery 25 │ 313.20 ms │ 183.26 ms │ +1.71x
faster │
│ QQuery 26 │ 83.57 ms │ 124.28 ms │ 1.49x
slower │
│ QQuery 27 │ 300.93 ms │ 237.28 ms │ +1.27x
faster │
│ QQuery 28 │ 130.79 ms │ 129.64 ms │ no
change │
│ QQuery 29 │ 267.08 ms │ 157.55 ms │ +1.70x
faster │
│ QQuery 30 │ 37.23 ms │ 25.98 ms │ +1.43x
faster │
│ QQuery 31 │ 128.57 ms │ 102.96 ms │ +1.25x
faster │
│ QQuery 32 │ 50.16 ms │ 42.77 ms │ +1.17x
faster │
│ QQuery 33 │ 114.06 ms │ 110.83 ms │ no
change │
│ QQuery 34 │ 89.27 ms │ 77.19 ms │ +1.16x
faster │
│ QQuery 35 │ 86.66 ms │ 50.86 ms │ +1.70x
faster │
│ QQuery 36 │ 173.00 ms │ 160.46 ms │ +1.08x
faster │
│ QQuery 37 │ 157.69 ms │ 153.57 ms │ no
change │
│ QQuery 38 │ 62.53 ms │ 52.28 ms │ +1.20x
faster │
│ QQuery 39 │ 83.38 ms │ 394.28 ms │ 4.73x
slower │
│ QQuery 40 │ 87.64 ms │ 77.15 ms │ +1.14x
faster │
│ QQuery 41 │ 16.23 ms │ 15.05 ms │ +1.08x
faster │
│ QQuery 42 │ 93.24 ms │ 88.03 ms │ +1.06x
faster │
│ QQuery 43 │ 72.64 ms │ 63.49 ms │ +1.14x
faster │
│ QQuery 44 │ 9.06 ms │ 7.80 ms │ +1.16x
faster │
│ QQuery 45 │ 55.46 ms │ 34.12 ms │ +1.63x
faster │
│ QQuery 46 │ 185.75 ms │ 163.09 ms │ +1.14x
faster │
│ QQuery 47 │ 529.01 ms │ 143.05 ms │ +3.70x
faster │
│ QQuery 48 │ 236.59 ms │ 198.08 ms │ +1.19x
faster │
│ QQuery 49 │ 208.83 ms │ 191.07 ms │ +1.09x
faster │
│ QQuery 50 │ 176.04 ms │ 143.57 ms │ +1.23x
faster │
│ QQuery 51 │ 140.97 ms │ 96.36 ms │ +1.46x
faster │
│ QQuery 52 │ 92.83 ms │ 86.68 ms │ +1.07x
faster │
│ QQuery 53 │ 90.46 ms │ 83.34 ms │ +1.09x
faster │
│ QQuery 54 │ 135.74 ms │ 116.89 ms │ +1.16x
faster │
│ QQuery 55 │ 91.55 ms │ 87.18 ms │ no
change │
│ QQuery 56 │ 113.12 ms │ 111.00 ms │ no
change │
│ QQuery 57 │ 129.43 ms │ 78.69 ms │ +1.64x
faster │
│ QQuery 58 │ 229.68 ms │ 165.27 ms │ +1.39x
faster │
│ QQuery 59 │ 161.24 ms │ 125.57 ms │ +1.28x
faster │
│ QQuery 60 │ 116.86 ms │ 111.38 ms │ no
change │
│ QQuery 61 │ 150.19 ms │ 143.00 ms │ no
change │
│ QQuery 62 │ 426.70 ms │ 413.02 ms │ no
change │
│ QQuery 63 │ 93.41 ms │ 81.94 ms │ +1.14x
faster │
│ QQuery 64 │ 578.51 ms │ 442.41 ms │ +1.31x
faster │
│ QQuery 65 │ 201.75 ms │ 87.46 ms │ +2.31x
faster │
│ QQuery 66 │ 181.57 ms │ 184.28 ms │ no
change │
│ QQuery 67 │ 246.39 ms │ 226.38 ms │ +1.09x
faster │
│ QQuery 68 │ 230.40 ms │ 212.41 ms │ +1.08x
faster │
│ QQuery 69 │ 91.30 ms │ 46.05 ms │ +1.98x
faster │
│ QQuery 70 │ 270.46 ms │ 232.65 ms │ +1.16x
faster │
│ QQuery 71 │ 111.93 ms │ 107.35 ms │ no
change │
│ QQuery 72 │ 562.16 ms │ 435.56 ms │ +1.29x
faster │
│ QQuery 73 │ 85.66 ms │ 81.05 ms │ +1.06x
faster │
│ QQuery 74 │ 371.14 ms │ 148.67 ms │ +2.50x
faster │
│ QQuery 75 │ 221.61 ms │ 170.13 ms │ +1.30x
faster │
│ QQuery 76 │ 122.88 ms │ 107.23 ms │ +1.15x
faster │
│ QQuery 77 │ 163.52 ms │ 140.98 ms │ +1.16x
faster │
│ QQuery 78 │ 313.92 ms │ 205.72 ms │ +1.53x
faster │
│ QQuery 79 │ 187.31 ms │ 163.73 ms │ +1.14x
faster │
│ QQuery 80 │ 262.86 ms │ 240.16 ms │ +1.09x
faster │
│ QQuery 81 │ 21.89 ms │ 18.13 ms │ +1.21x
faster │
│ QQuery 82 │ 172.40 ms │ 159.50 ms │ +1.08x
faster │
│ QQuery 83 │ 45.38 ms │ 22.20 ms │ +2.04x
faster │
│ QQuery 84 │ 39.41 ms │ 30.58 ms │ +1.29x
faster │
│ QQuery 85 │ 141.02 ms │ 73.78 ms │ +1.91x
faster │
│ QQuery 86 │ 31.68 ms │ 28.44 ms │ +1.11x
faster │
│ QQuery 87 │ 63.57 ms │ 53.78 ms │ +1.18x
faster │
│ QQuery 88 │ 88.01 ms │ 74.09 ms │ +1.19x
faster │
│ QQuery 89 │ 105.44 ms │ 85.16 ms │ +1.24x
faster │
│ QQuery 90 │ 19.76 ms │ 16.27 ms │ +1.21x
faster │
│ QQuery 91 │ 52.31 ms │ 33.46 ms │ +1.56x
faster │
│ QQuery 92 │ 50.04 ms │ 25.38 ms │ +1.97x
faster │
│ QQuery 93 │ 143.48 ms │ 130.62 ms │ +1.10x
faster │
│ QQuery 94 │ 50.84 ms │ 45.57 ms │ +1.12x
faster │
│ QQuery 95 │ 131.03 ms │ 57.60 ms │ +2.27x
faster │
│ QQuery 96 │ 60.62 ms │ 53.24 ms │ +1.14x
faster │
│ QQuery 97 │ 95.60 ms │ 67.33 ms │ +1.42x
faster │
│ QQuery 98 │ 125.88 ms │ 103.03 ms │ +1.22x
faster │
│ QQuery 99 │ 4475.55 ms │ 4459.77 ms │ no
change │
└───────────┴────────────┴───────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main) │ 22354.66ms │
│ Total Time (hash-join-buffering-on-probe-side) │ 18217.21ms │
│ Average Time (main) │ 225.80ms │
│ Average Time (hash-join-buffering-on-probe-side) │ 184.01ms │
│ Queries Faster │ 79 │
│ Queries Slower │ 8 │
│ Queries with No Change │ 12 │
│ Queries with Failure │ 0 │
└──────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ hash-join-buffering-on-probe-side ┃ Change
┃
┡━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 42.94 ms │ 45.96 ms │ 1.07x slower
│
│ QQuery 2 │ 20.64 ms │ 13.00 ms │ +1.59x faster
│
│ QQuery 3 │ 30.07 ms │ 24.52 ms │ +1.23x faster
│
│ QQuery 4 │ 17.22 ms │ 16.39 ms │ no change
│
│ QQuery 5 │ 98.91 ms │ 41.25 ms │ +2.40x faster
│
│ QQuery 6 │ 18.67 ms │ 18.23 ms │ no change
│
│ QQuery 7 │ 104.82 ms │ 46.45 ms │ +2.26x faster
│
│ QQuery 8 │ 97.98 ms │ 34.09 ms │ +2.87x faster
│
│ QQuery 9 │ 86.25 ms │ 43.32 ms │ +1.99x faster
│
│ QQuery 10 │ 106.09 ms │ 41.49 ms │ +2.56x faster
│
│ QQuery 11 │ 13.77 ms │ 11.15 ms │ +1.24x faster
│
│ QQuery 12 │ 54.57 ms │ 30.04 ms │ +1.82x faster
│
│ QQuery 13 │ 21.71 ms │ 21.74 ms │ no change
│
│ QQuery 14 │ 51.38 ms │ 21.87 ms │ +2.35x faster
│
│ QQuery 15 │ 35.13 ms │ 27.95 ms │ +1.26x faster
│
│ QQuery 16 │ 13.04 ms │ 12.05 ms │ +1.08x faster
│
│ QQuery 17 │ 82.94 ms │ 53.05 ms │ +1.56x faster
│
│ QQuery 18 │ 109.92 ms │ 61.16 ms │ +1.80x faster
│
│ QQuery 19 │ 37.57 ms │ 37.79 ms │ no change
│
│ QQuery 20 │ 60.77 ms │ 26.21 ms │ +2.32x faster
│
│ QQuery 21 │ 78.28 ms │ 54.11 ms │ +1.45x faster
│
│ QQuery 22 │ 8.48 ms │ 8.73 ms │ no change
│
└───────────┴───────────┴───────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main) │ 1191.17ms │
│ Total Time (hash-join-buffering-on-probe-side) │ 690.57ms │
│ Average Time (main) │ 54.14ms │
│ Average Time (hash-join-buffering-on-probe-side) │ 31.39ms │
│ Queries Faster │ 16 │
│ Queries Slower │ 1 │
│ Queries with No Change │ 5 │
│ Queries with Failure │ 0 │
└──────────────────────────────────────────────────┴───────────┘
--------------------
Benchmark tpch_sf10.json
--------------------
┏━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ hash-join-buffering-on-probe-side ┃ Change
┃
┡━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 373.55 ms │ 331.39 ms │ +1.13x faster
│
│ QQuery 2 │ 150.76 ms │ 89.50 ms │ +1.68x faster
│
│ QQuery 3 │ 320.66 ms │ 271.20 ms │ +1.18x faster
│
│ QQuery 4 │ 132.21 ms │ 115.69 ms │ +1.14x faster
│
│ QQuery 5 │ 462.29 ms │ 401.73 ms │ +1.15x faster
│
│ QQuery 6 │ 150.85 ms │ 124.88 ms │ +1.21x faster
│
│ QQuery 7 │ 631.13 ms │ 547.52 ms │ +1.15x faster
│
│ QQuery 8 │ 593.22 ms │ 445.85 ms │ +1.33x faster
│
│ QQuery 9 │ 780.22 ms │ 657.02 ms │ +1.19x faster
│
│ QQuery 10 │ 495.89 ms │ 324.21 ms │ +1.53x faster
│
│ QQuery 11 │ 144.90 ms │ 88.97 ms │ +1.63x faster
│
│ QQuery 12 │ 263.50 ms │ 188.59 ms │ +1.40x faster
│
│ QQuery 13 │ 287.01 ms │ 217.33 ms │ +1.32x faster
│
│ QQuery 14 │ 248.88 ms │ 166.86 ms │ +1.49x faster
│
│ QQuery 15 │ 399.52 ms │ 280.62 ms │ +1.42x faster
│
│ QQuery 16 │ 97.62 ms │ 65.14 ms │ +1.50x faster
│
│ QQuery 17 │ 780.01 ms │ 641.17 ms │ +1.22x faster
│
│ QQuery 18 │ 824.42 ms │ 696.09 ms │ +1.18x faster
│
│ QQuery 19 │ 367.17 ms │ 268.54 ms │ +1.37x faster
│
│ QQuery 20 │ 332.86 ms │ 241.19 ms │ +1.38x faster
│
│ QQuery 21 │ 856.49 ms │ 697.65 ms │ +1.23x faster
│
│ QQuery 22 │ 89.72 ms │ 72.73 ms │ +1.23x faster
│
└───────────┴───────────┴───────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main) │ 8782.89ms │
│ Total Time (hash-join-buffering-on-probe-side) │ 6933.87ms │
│ Average Time (main) │ 399.22ms │
│ Average Time (hash-join-buffering-on-probe-side) │ 315.18ms │
│ Queries Faster │ 22 │
│ Queries Slower │ 0 │
│ Queries with No Change │ 0 │
│ Queries with Failure │ 0 │
└──────────────────────────────────────────────────┴───────────┘
```
---
benchmarks/src/imdb/run.rs | 8 ++
benchmarks/src/tpcds/run.rs | 6 ++
benchmarks/src/tpch/run.rs | 8 ++
datafusion/common/src/config.rs | 15 +++
.../physical-optimizer/src/hash_join_buffering.rs | 103 +++++++++++++++++++++
datafusion/physical-optimizer/src/lib.rs | 1 +
datafusion/physical-optimizer/src/optimizer.rs | 5 +
datafusion/sqllogictest/test_files/explain.slt | 4 +
.../sqllogictest/test_files/information_schema.slt | 2 +
datafusion/sqllogictest/test_files/struct.slt | 2 +-
docs/source/user-guide/configs.md | 1 +
11 files changed, 154 insertions(+), 1 deletion(-)
diff --git a/benchmarks/src/imdb/run.rs b/benchmarks/src/imdb/run.rs
index 9ddea67148..29ca5249aa 100644
--- a/benchmarks/src/imdb/run.rs
+++ b/benchmarks/src/imdb/run.rs
@@ -92,6 +92,10 @@ pub struct RunOpt {
/// True by default.
#[arg(short = 'j', long = "prefer_hash_join", default_value = "true")]
prefer_hash_join: BoolDefaultTrue,
+
+ /// How many bytes to buffer on the probe side of hash joins.
+ #[arg(long, default_value = "0")]
+ hash_join_buffering_capacity: usize,
}
fn map_query_id_to_str(query_id: usize) -> &'static str {
@@ -306,6 +310,8 @@ impl RunOpt {
.config()?
.with_collect_statistics(!self.disable_statistics);
config.options_mut().optimizer.prefer_hash_join =
self.prefer_hash_join;
+ config.options_mut().execution.hash_join_buffering_capacity =
+ self.hash_join_buffering_capacity;
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config,
rt_builder.build_arc()?);
@@ -527,6 +533,7 @@ mod tests {
output_path: None,
disable_statistics: false,
prefer_hash_join: true,
+ hash_join_buffering_capacity: 0,
};
opt.register_tables(&ctx).await?;
let queries = get_query_sql(map_query_id_to_str(query))?;
@@ -563,6 +570,7 @@ mod tests {
output_path: None,
disable_statistics: false,
prefer_hash_join: true,
+ hash_join_buffering_capacity: 0,
};
opt.register_tables(&ctx).await?;
let queries = get_query_sql(map_query_id_to_str(query))?;
diff --git a/benchmarks/src/tpcds/run.rs b/benchmarks/src/tpcds/run.rs
index 586ee754d2..8e24b121b2 100644
--- a/benchmarks/src/tpcds/run.rs
+++ b/benchmarks/src/tpcds/run.rs
@@ -144,6 +144,10 @@ pub struct RunOpt {
/// The tables should have been created with the `--sort` option for this
to have any effect.
#[arg(short = 't', long = "sorted")]
sorted: bool,
+
+ /// How many bytes to buffer on the probe side of hash joins.
+ #[arg(long, default_value = "0")]
+ hash_join_buffering_capacity: usize,
}
impl RunOpt {
@@ -162,6 +166,8 @@ impl RunOpt {
config.options_mut().optimizer.prefer_hash_join =
self.prefer_hash_join;
config.options_mut().optimizer.enable_piecewise_merge_join =
self.enable_piecewise_merge_join;
+ config.options_mut().execution.hash_join_buffering_capacity =
+ self.hash_join_buffering_capacity;
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config,
rt_builder.build_arc()?);
// register tables
diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs
index 9706296fea..392e02f847 100644
--- a/benchmarks/src/tpch/run.rs
+++ b/benchmarks/src/tpch/run.rs
@@ -105,6 +105,10 @@ pub struct RunOpt {
/// The tables should have been created with the `--sort` option for this
to have any effect.
#[arg(short = 't', long = "sorted")]
sorted: bool,
+
+ /// How many bytes to buffer on the probe side of hash joins.
+ #[arg(long, default_value = "0")]
+ hash_join_buffering_capacity: usize,
}
impl RunOpt {
@@ -123,6 +127,8 @@ impl RunOpt {
config.options_mut().optimizer.prefer_hash_join =
self.prefer_hash_join;
config.options_mut().optimizer.enable_piecewise_merge_join =
self.enable_piecewise_merge_join;
+ config.options_mut().execution.hash_join_buffering_capacity =
+ self.hash_join_buffering_capacity;
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config,
rt_builder.build_arc()?);
// register tables
@@ -392,6 +398,7 @@ mod tests {
prefer_hash_join: true,
enable_piecewise_merge_join: false,
sorted: false,
+ hash_join_buffering_capacity: 0,
};
opt.register_tables(&ctx).await?;
let queries = get_query_sql(query)?;
@@ -430,6 +437,7 @@ mod tests {
prefer_hash_join: true,
enable_piecewise_merge_join: false,
sorted: false,
+ hash_join_buffering_capacity: 0,
};
opt.register_tables(&ctx).await?;
let queries = get_query_sql(query)?;
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 7238d842e3..15eb279079 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -669,6 +669,21 @@ config_namespace! {
/// # Default
/// `false` — ANSI SQL mode is disabled by default.
pub enable_ansi_mode: bool, default = false
+
+ /// How many bytes to buffer in the probe side of hash joins while the
build side is
+ /// concurrently being built.
+ ///
+ /// Without this, hash joins will wait until the full materialization
of the build side
+ /// before polling the probe side. This is useful in scenarios where
the query is not
+ /// completely CPU bounded, allowing to do some early work
concurrently and reducing the
+ /// latency of the query.
+ ///
+ /// Note that when hash join buffering is enabled, the probe side will
start eagerly
+ /// polling data, not giving time for the producer side of dynamic
filters to produce any
+ /// meaningful predicate. Queries with dynamic filters might see
performance degradation.
+ ///
+ /// Disabled by default, set to a number greater than 0 for enabling
it.
+ pub hash_join_buffering_capacity: usize, default = 0
}
}
diff --git a/datafusion/physical-optimizer/src/hash_join_buffering.rs
b/datafusion/physical-optimizer/src/hash_join_buffering.rs
new file mode 100644
index 0000000000..3c29b46c0f
--- /dev/null
+++ b/datafusion/physical-optimizer/src/hash_join_buffering.rs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::PhysicalOptimizerRule;
+use datafusion_common::JoinSide;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_physical_plan::ExecutionPlan;
+use datafusion_physical_plan::buffer::BufferExec;
+use datafusion_physical_plan::joins::HashJoinExec;
+use std::sync::Arc;
+
+/// Looks for all the [HashJoinExec]s in the plan and places a [BufferExec]
node with the
+/// configured capacity in the probe side:
+///
+/// ```text
+/// ┌───────────────────┐
+/// │ HashJoinExec │
+/// └─────▲────────▲────┘
+/// ┌───────┘ └─────────┐
+/// │ │
+/// ┌────────────────┐ ┌─────────────────┐
+/// │ Build side │ + │ BufferExec │
+/// └────────────────┘ └────────▲────────┘
+/// │
+/// ┌────────┴────────┐
+/// │ Probe side │
+/// └─────────────────┘
+/// ```
+///
+/// Which allows eagerly pulling it even before the build side has completely
finished.
+#[derive(Debug, Default)]
+pub struct HashJoinBuffering {}
+
+impl HashJoinBuffering {
+ pub fn new() -> Self {
+ Self::default()
+ }
+}
+
+impl PhysicalOptimizerRule for HashJoinBuffering {
+ fn optimize(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ config: &ConfigOptions,
+ ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+ let capacity = config.execution.hash_join_buffering_capacity;
+ if capacity == 0 {
+ return Ok(plan);
+ }
+
+ plan.transform_down(|plan| {
+ let Some(node) = plan.as_any().downcast_ref::<HashJoinExec>() else
{
+ return Ok(Transformed::no(plan));
+ };
+ let plan = Arc::clone(&plan);
+ Ok(Transformed::yes(
+ if HashJoinExec::probe_side() == JoinSide::Left {
+ // Do not stack BufferExec nodes together.
+ if
node.left.as_any().downcast_ref::<BufferExec>().is_some() {
+ return Ok(Transformed::no(plan));
+ }
+ plan.with_new_children(vec![
+ Arc::new(BufferExec::new(Arc::clone(&node.left),
capacity)),
+ Arc::clone(&node.right),
+ ])?
+ } else {
+ // Do not stack BufferExec nodes together.
+ if
node.right.as_any().downcast_ref::<BufferExec>().is_some() {
+ return Ok(Transformed::no(plan));
+ }
+ plan.with_new_children(vec![
+ Arc::clone(&node.left),
+ Arc::new(BufferExec::new(Arc::clone(&node.right),
capacity)),
+ ])?
+ },
+ ))
+ })
+ .data()
+ }
+
+ fn name(&self) -> &str {
+ "HashJoinBuffering"
+ }
+
+ fn schema_check(&self) -> bool {
+ true
+ }
+}
diff --git a/datafusion/physical-optimizer/src/lib.rs
b/datafusion/physical-optimizer/src/lib.rs
index 3a0d79ae2d..98331a94e3 100644
--- a/datafusion/physical-optimizer/src/lib.rs
+++ b/datafusion/physical-optimizer/src/lib.rs
@@ -39,6 +39,7 @@ pub mod optimizer;
pub mod output_requirements;
pub mod projection_pushdown;
pub use datafusion_pruning as pruning;
+pub mod hash_join_buffering;
pub mod pushdown_sort;
pub mod sanity_checker;
pub mod topk_aggregation;
diff --git a/datafusion/physical-optimizer/src/optimizer.rs
b/datafusion/physical-optimizer/src/optimizer.rs
index 49225db03a..f31a1f981f 100644
--- a/datafusion/physical-optimizer/src/optimizer.rs
+++ b/datafusion/physical-optimizer/src/optimizer.rs
@@ -35,6 +35,7 @@ use crate::sanity_checker::SanityCheckPlan;
use crate::topk_aggregation::TopKAggregation;
use crate::update_aggr_exprs::OptimizeAggregateOrder;
+use crate::hash_join_buffering::HashJoinBuffering;
use crate::limit_pushdown_past_window::LimitPushPastWindows;
use crate::pushdown_sort::PushdownSort;
use datafusion_common::Result;
@@ -137,6 +138,10 @@ impl PhysicalOptimizer {
// This can possibly be combined with [LimitPushdown]
// It needs to come after [EnforceSorting]
Arc::new(LimitPushPastWindows::new()),
+ // The HashJoinBuffering rule adds a BufferExec node with the
configured capacity
+ // in the prob side of hash joins. That way, the probe side gets
eagerly polled before
+ // the build side is completely finished.
+ Arc::new(HashJoinBuffering::new()),
// The LimitPushdown rule tries to push limits down as far as
possible,
// replacing operators with fetching variants, or adding limits
// past operators that support limit pushdown.
diff --git a/datafusion/sqllogictest/test_files/explain.slt
b/datafusion/sqllogictest/test_files/explain.slt
index c5907d4975..6b50a1d1cb 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -243,6 +243,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after OutputRequirements DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b,
c], file_type=csv, has_header=true
physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
+physical_plan after HashJoinBuffering SAME TEXT AS ABOVE
physical_plan after LimitPushdown SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after PushdownSort SAME TEXT AS ABOVE
@@ -323,6 +324,7 @@ physical_plan after OutputRequirements
02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:
ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]:
ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]:
ScanBytes=Exact(32)),(Col[5]: ScanBytes= [...]
physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
+physical_plan after HashJoinBuffering SAME TEXT AS ABOVE
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:
ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]:
ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=E [...]
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after PushdownSort SAME TEXT AS ABOVE
@@ -367,6 +369,7 @@ physical_plan after OutputRequirements
02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet
physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
+physical_plan after HashJoinBuffering SAME TEXT AS ABOVE
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after PushdownSort SAME TEXT AS ABOVE
@@ -608,6 +611,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after OutputRequirements DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b,
c], file_type=csv, has_header=true
physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
+physical_plan after HashJoinBuffering SAME TEXT AS ABOVE
physical_plan after LimitPushdown SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after PushdownSort SAME TEXT AS ABOVE
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt
b/datafusion/sqllogictest/test_files/information_schema.slt
index b61ceecb24..18f6db9cdc 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -220,6 +220,7 @@ datafusion.execution.collect_statistics true
datafusion.execution.enable_ansi_mode false
datafusion.execution.enable_recursive_ctes true
datafusion.execution.enforce_batch_size_in_joins false
+datafusion.execution.hash_join_buffering_capacity 0
datafusion.execution.keep_partition_by_columns false
datafusion.execution.listing_table_factory_infer_partitions true
datafusion.execution.listing_table_ignore_subdirectory true
@@ -358,6 +359,7 @@ datafusion.execution.collect_statistics true Should
DataFusion collect statistic
datafusion.execution.enable_ansi_mode false Whether to enable ANSI SQL mode.
The flag is experimental and relevant only for DataFusion Spark built-in
functions When `enable_ansi_mode` is set to `true`, the query engine follows
ANSI SQL semantics for expressions, casting, and error handling. This means: -
**Strict type coercion rules:** implicit casts between incompatible types are
disallowed. - **Standard SQL arithmetic behavior:** operations such as division
by zero, numeric overflow, [...]
datafusion.execution.enable_recursive_ctes true Should DataFusion support
recursive CTEs
datafusion.execution.enforce_batch_size_in_joins false Should DataFusion
enforce batch size in joins or not. By default, DataFusion will not enforce
batch size in joins. Enforcing batch size in joins can reduce memory usage when
joining large tables with a highly-selective join filter, but is also slightly
slower.
+datafusion.execution.hash_join_buffering_capacity 0 How many bytes to buffer
in the probe side of hash joins while the build side is concurrently being
built. Without this, hash joins will wait until the full materialization of the
build side before polling the probe side. This is useful in scenarios where the
query is not completely CPU bounded, allowing to do some early work
concurrently and reducing the latency of the query. Note that when hash join
buffering is enabled, the probe sid [...]
datafusion.execution.keep_partition_by_columns false Should DataFusion keep
the columns used for partition_by in the output RecordBatches
datafusion.execution.listing_table_factory_infer_partitions true Should a
`ListingTable` created through the `ListingTableFactory` infer table partitions
from Hive compliant directories. Defaults to true (partition columns are
inferred and will be represented in the table schema).
datafusion.execution.listing_table_ignore_subdirectory true Should sub
directories be ignored when scanning directories for data files. Defaults to
true (ignores subdirectories), consistent with Hive. Note that this setting
does not affect reading partitioned tables (e.g.
`/table/year=2021/month=01/data.parquet`).
diff --git a/datafusion/sqllogictest/test_files/struct.slt
b/datafusion/sqllogictest/test_files/struct.slt
index e20815a58c..53a1bb4ec6 100644
--- a/datafusion/sqllogictest/test_files/struct.slt
+++ b/datafusion/sqllogictest/test_files/struct.slt
@@ -1666,4 +1666,4 @@ order by id;
3 2 150
statement ok
-drop table t_agg_window;
\ No newline at end of file
+drop table t_agg_window;
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index f33e6314d3..ede1652fdf 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -133,6 +133,7 @@ The following configuration settings are available:
| datafusion.execution.enforce_batch_size_in_joins |
false | Should DataFusion enforce batch size in joins or
not. By default, DataFusion will not enforce batch size in joins. Enforcing
batch size in joins can reduce memory usage when joining large tables with a
highly-selective join filter, but is also slightly slower.
[...]
| datafusion.execution.objectstore_writer_buffer_size |
10485760 | Size (bytes) of data buffer DataFusion uses when
writing output files. This affects the size of the data chunks that are
uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB)
output files are being written, it may be necessary to increase this size to
avoid errors from the remote end point.
[...]
| datafusion.execution.enable_ansi_mode |
false | Whether to enable ANSI SQL mode. The flag is
experimental and relevant only for DataFusion Spark built-in functions When
`enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL
semantics for expressions, casting, and error handling. This means: - **Strict
type coercion rules:** implicit casts between incompatible types are
disallowed. - **Standard SQL arithmetic behavior [...]
+| datafusion.execution.hash_join_buffering_capacity | 0
| How many bytes to buffer in the probe side of hash
joins while the build side is concurrently being built. Without this, hash
joins will wait until the full materialization of the build side before polling
the probe side. This is useful in scenarios where the query is not completely
CPU bounded, allowing to do some early work concurrently and reducing the
latency of the query. Note tha [...]
| datafusion.optimizer.enable_distinct_aggregation_soft_limit |
true | When set to true, the optimizer will push a limit
operation into grouped aggregations which have no aggregate expressions, as a
soft limit, emitting groups once the limit is reached, before all rows in the
group are read.
[...]
| datafusion.optimizer.enable_round_robin_repartition |
true | When set to true, the physical plan optimizer will
try to add round robin repartitioning to increase parallelism to leverage more
CPU cores
[...]
| datafusion.optimizer.enable_topk_aggregation |
true | When set to true, the optimizer will attempt to
perform limit operations during aggregations, if possible
[...]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]