This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new bfa0ea8056 Hash join buffering on probe side (#19761)
bfa0ea8056 is described below

commit bfa0ea80567ddfdc9aed567c9d2acb9e23316a2d
Author: Gabriel <[email protected]>
AuthorDate: Sat Mar 7 12:30:21 2026 +0100

    Hash join buffering on probe side (#19761)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    It does not close any issue, but it's related to:
    - https://github.com/apache/datafusion/issues/17494
    - https://github.com/apache/datafusion/issues/15885
    - https://github.com/apache/datafusion/issues/18942
    
    ## Rationale for this change
    
    This is a PR from a batch of PRs that attempt to improve performance in
    hash joins:
    - https://github.com/apache/datafusion/pull/19759
    - https://github.com/apache/datafusion/pull/19760
    - This PR
    
    It adds the new `BufferExec` node at the top of the probe side of hash
    joins so that some work is eagerly performed before the build side of
    the hash join is completely finished.
    
    ### Why should this speed up joins?
    
    In order to better understand the impact of this PR, it's useful to
    understand how streams work in Rust: creating a stream does not perform
    any work, progress is just made if the stream gets polled.
    
    This means that whenever we call `.execute()` on an `ExecutionPlan`
    (like the probe side of a join), nothing happens, not even the most
    basic TCP connections or system calls are performed. Instead, all this
    work is delayed as much as possible until the first poll is made to the
    stream, losing the opportunity to make some early progress.
    
    This gets worst when multiple hash joins are chained together: they will
    get executed in cascade as if they were domino pieces, which has the
    benefit of leaving a small memory footprint, but underutilizes the
    resources of the machine for executing the query faster.
    
    > [!NOTE]
    > Even if this shows overall performance improvement in the benchmarks,
    it can show performance degradation on queries with dynamic filters, so
    hash join buffering is disabled by default, and users can opt in.
    > Follow up work will be needed in order to make this interact well with
    dynamic filters.
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    ## What changes are included in this PR?
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    Adds a new `HashJoinBuffering` physical optimizer rule that will
    idempotently place `BufferExec` nodes on the probe side of has joins:
    
    ```
                ┌───────────────────┐
                │   HashJoinExec    │
                └─────▲────────▲────┘
              ┌───────┘        └─────────┐
              │                          │
     ┌────────────────┐         ┌─────────────────┐
     │   Build side   │       + │   BufferExec    │
     └────────────────┘         └────────▲────────┘
                                         │
                                ┌────────┴────────┐
                                │   Probe side    │
                                └─────────────────┘
    ```
    
    ## Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    yes, by existing tests
    
    ## Are there any user-facing changes?
    
    Not by default, users can now opt in to this feature with the
    hash_join_buffering_capacity config parameter.
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
    
    ---
    
    # Results
    
    > [!NOTE]
    > Note that a small number of TPC-DS queries have regressed, this is
    because with eager buffering they do not benefit from dynamic filters as
    much. This is the main reason for leaving this config parameter disabled
    by default until we have a proper way for interacting with the dynamic
    filters inside the BufferExec node.
    
    ```
    ./bench.sh compare main hash-join-buffering-on-probe-side
    Comparing main and hash-join-buffering-on-probe-side
    --------------------
    Benchmark tpcds_sf1.json
    --------------------
    
┏━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
    ┃ Query     ┃       main ┃ hash-join-buffering-on-probe-side ┃        
Change ┃
    
┡━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
    │ QQuery 1  │   39.70 ms │                          18.30 ms │ +2.17x 
faster │
    │ QQuery 2  │  134.95 ms │                          57.64 ms │ +2.34x 
faster │
    │ QQuery 3  │  103.08 ms │                          89.88 ms │ +1.15x 
faster │
    │ QQuery 4  │ 1034.04 ms │                         340.84 ms │ +3.03x 
faster │
    │ QQuery 5  │  155.97 ms │                         139.44 ms │ +1.12x 
faster │
    │ QQuery 6  │  591.97 ms │                         523.17 ms │ +1.13x 
faster │
    │ QQuery 7  │  304.47 ms │                         235.52 ms │ +1.29x 
faster │
    │ QQuery 8  │  100.02 ms │                          90.65 ms │ +1.10x 
faster │
    │ QQuery 9  │   91.86 ms │                          92.25 ms │     no 
change │
    │ QQuery 10 │   90.97 ms │                          47.59 ms │ +1.91x 
faster │
    │ QQuery 11 │  649.77 ms │                         217.42 ms │ +2.99x 
faster │
    │ QQuery 12 │   38.12 ms │                          31.71 ms │ +1.20x 
faster │
    │ QQuery 13 │  337.93 ms │                         302.24 ms │ +1.12x 
faster │
    │ QQuery 14 │  797.93 ms │                         439.40 ms │ +1.82x 
faster │
    │ QQuery 15 │   28.14 ms │                          47.75 ms │  1.70x 
slower │
    │ QQuery 16 │   34.08 ms │                         103.14 ms │  3.03x 
slower │
    │ QQuery 17 │  225.43 ms │                         152.29 ms │ +1.48x 
faster │
    │ QQuery 18 │  118.09 ms │                         164.11 ms │  1.39x 
slower │
    │ QQuery 19 │  143.39 ms │                         120.07 ms │ +1.19x 
faster │
    │ QQuery 20 │   12.70 ms │                          52.15 ms │  4.11x 
slower │
    │ QQuery 21 │   16.74 ms │                         184.55 ms │ 11.02x 
slower │
    │ QQuery 22 │  311.97 ms │                         358.70 ms │  1.15x 
slower │
    │ QQuery 23 │  807.41 ms │                         531.22 ms │ +1.52x 
faster │
    │ QQuery 24 │  347.90 ms │                         279.01 ms │ +1.25x 
faster │
    │ QQuery 25 │  313.20 ms │                         183.26 ms │ +1.71x 
faster │
    │ QQuery 26 │   83.57 ms │                         124.28 ms │  1.49x 
slower │
    │ QQuery 27 │  300.93 ms │                         237.28 ms │ +1.27x 
faster │
    │ QQuery 28 │  130.79 ms │                         129.64 ms │     no 
change │
    │ QQuery 29 │  267.08 ms │                         157.55 ms │ +1.70x 
faster │
    │ QQuery 30 │   37.23 ms │                          25.98 ms │ +1.43x 
faster │
    │ QQuery 31 │  128.57 ms │                         102.96 ms │ +1.25x 
faster │
    │ QQuery 32 │   50.16 ms │                          42.77 ms │ +1.17x 
faster │
    │ QQuery 33 │  114.06 ms │                         110.83 ms │     no 
change │
    │ QQuery 34 │   89.27 ms │                          77.19 ms │ +1.16x 
faster │
    │ QQuery 35 │   86.66 ms │                          50.86 ms │ +1.70x 
faster │
    │ QQuery 36 │  173.00 ms │                         160.46 ms │ +1.08x 
faster │
    │ QQuery 37 │  157.69 ms │                         153.57 ms │     no 
change │
    │ QQuery 38 │   62.53 ms │                          52.28 ms │ +1.20x 
faster │
    │ QQuery 39 │   83.38 ms │                         394.28 ms │  4.73x 
slower │
    │ QQuery 40 │   87.64 ms │                          77.15 ms │ +1.14x 
faster │
    │ QQuery 41 │   16.23 ms │                          15.05 ms │ +1.08x 
faster │
    │ QQuery 42 │   93.24 ms │                          88.03 ms │ +1.06x 
faster │
    │ QQuery 43 │   72.64 ms │                          63.49 ms │ +1.14x 
faster │
    │ QQuery 44 │    9.06 ms │                           7.80 ms │ +1.16x 
faster │
    │ QQuery 45 │   55.46 ms │                          34.12 ms │ +1.63x 
faster │
    │ QQuery 46 │  185.75 ms │                         163.09 ms │ +1.14x 
faster │
    │ QQuery 47 │  529.01 ms │                         143.05 ms │ +3.70x 
faster │
    │ QQuery 48 │  236.59 ms │                         198.08 ms │ +1.19x 
faster │
    │ QQuery 49 │  208.83 ms │                         191.07 ms │ +1.09x 
faster │
    │ QQuery 50 │  176.04 ms │                         143.57 ms │ +1.23x 
faster │
    │ QQuery 51 │  140.97 ms │                          96.36 ms │ +1.46x 
faster │
    │ QQuery 52 │   92.83 ms │                          86.68 ms │ +1.07x 
faster │
    │ QQuery 53 │   90.46 ms │                          83.34 ms │ +1.09x 
faster │
    │ QQuery 54 │  135.74 ms │                         116.89 ms │ +1.16x 
faster │
    │ QQuery 55 │   91.55 ms │                          87.18 ms │     no 
change │
    │ QQuery 56 │  113.12 ms │                         111.00 ms │     no 
change │
    │ QQuery 57 │  129.43 ms │                          78.69 ms │ +1.64x 
faster │
    │ QQuery 58 │  229.68 ms │                         165.27 ms │ +1.39x 
faster │
    │ QQuery 59 │  161.24 ms │                         125.57 ms │ +1.28x 
faster │
    │ QQuery 60 │  116.86 ms │                         111.38 ms │     no 
change │
    │ QQuery 61 │  150.19 ms │                         143.00 ms │     no 
change │
    │ QQuery 62 │  426.70 ms │                         413.02 ms │     no 
change │
    │ QQuery 63 │   93.41 ms │                          81.94 ms │ +1.14x 
faster │
    │ QQuery 64 │  578.51 ms │                         442.41 ms │ +1.31x 
faster │
    │ QQuery 65 │  201.75 ms │                          87.46 ms │ +2.31x 
faster │
    │ QQuery 66 │  181.57 ms │                         184.28 ms │     no 
change │
    │ QQuery 67 │  246.39 ms │                         226.38 ms │ +1.09x 
faster │
    │ QQuery 68 │  230.40 ms │                         212.41 ms │ +1.08x 
faster │
    │ QQuery 69 │   91.30 ms │                          46.05 ms │ +1.98x 
faster │
    │ QQuery 70 │  270.46 ms │                         232.65 ms │ +1.16x 
faster │
    │ QQuery 71 │  111.93 ms │                         107.35 ms │     no 
change │
    │ QQuery 72 │  562.16 ms │                         435.56 ms │ +1.29x 
faster │
    │ QQuery 73 │   85.66 ms │                          81.05 ms │ +1.06x 
faster │
    │ QQuery 74 │  371.14 ms │                         148.67 ms │ +2.50x 
faster │
    │ QQuery 75 │  221.61 ms │                         170.13 ms │ +1.30x 
faster │
    │ QQuery 76 │  122.88 ms │                         107.23 ms │ +1.15x 
faster │
    │ QQuery 77 │  163.52 ms │                         140.98 ms │ +1.16x 
faster │
    │ QQuery 78 │  313.92 ms │                         205.72 ms │ +1.53x 
faster │
    │ QQuery 79 │  187.31 ms │                         163.73 ms │ +1.14x 
faster │
    │ QQuery 80 │  262.86 ms │                         240.16 ms │ +1.09x 
faster │
    │ QQuery 81 │   21.89 ms │                          18.13 ms │ +1.21x 
faster │
    │ QQuery 82 │  172.40 ms │                         159.50 ms │ +1.08x 
faster │
    │ QQuery 83 │   45.38 ms │                          22.20 ms │ +2.04x 
faster │
    │ QQuery 84 │   39.41 ms │                          30.58 ms │ +1.29x 
faster │
    │ QQuery 85 │  141.02 ms │                          73.78 ms │ +1.91x 
faster │
    │ QQuery 86 │   31.68 ms │                          28.44 ms │ +1.11x 
faster │
    │ QQuery 87 │   63.57 ms │                          53.78 ms │ +1.18x 
faster │
    │ QQuery 88 │   88.01 ms │                          74.09 ms │ +1.19x 
faster │
    │ QQuery 89 │  105.44 ms │                          85.16 ms │ +1.24x 
faster │
    │ QQuery 90 │   19.76 ms │                          16.27 ms │ +1.21x 
faster │
    │ QQuery 91 │   52.31 ms │                          33.46 ms │ +1.56x 
faster │
    │ QQuery 92 │   50.04 ms │                          25.38 ms │ +1.97x 
faster │
    │ QQuery 93 │  143.48 ms │                         130.62 ms │ +1.10x 
faster │
    │ QQuery 94 │   50.84 ms │                          45.57 ms │ +1.12x 
faster │
    │ QQuery 95 │  131.03 ms │                          57.60 ms │ +2.27x 
faster │
    │ QQuery 96 │   60.62 ms │                          53.24 ms │ +1.14x 
faster │
    │ QQuery 97 │   95.60 ms │                          67.33 ms │ +1.42x 
faster │
    │ QQuery 98 │  125.88 ms │                         103.03 ms │ +1.22x 
faster │
    │ QQuery 99 │ 4475.55 ms │                        4459.77 ms │     no 
change │
    
└───────────┴────────────┴───────────────────────────────────┴───────────────┘
    ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
    ┃ Benchmark Summary                                ┃            ┃
    ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
    │ Total Time (main)                                │ 22354.66ms │
    │ Total Time (hash-join-buffering-on-probe-side)   │ 18217.21ms │
    │ Average Time (main)                              │   225.80ms │
    │ Average Time (hash-join-buffering-on-probe-side) │   184.01ms │
    │ Queries Faster                                   │         79 │
    │ Queries Slower                                   │          8 │
    │ Queries with No Change                           │         12 │
    │ Queries with Failure                             │          0 │
    └──────────────────────────────────────────────────┴────────────┘
    --------------------
    Benchmark tpch_sf1.json
    --------------------
    
┏━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
    ┃ Query     ┃      main ┃ hash-join-buffering-on-probe-side ┃        Change 
┃
    
┡━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
    │ QQuery 1  │  42.94 ms │                          45.96 ms │  1.07x slower 
│
    │ QQuery 2  │  20.64 ms │                          13.00 ms │ +1.59x faster 
│
    │ QQuery 3  │  30.07 ms │                          24.52 ms │ +1.23x faster 
│
    │ QQuery 4  │  17.22 ms │                          16.39 ms │     no change 
│
    │ QQuery 5  │  98.91 ms │                          41.25 ms │ +2.40x faster 
│
    │ QQuery 6  │  18.67 ms │                          18.23 ms │     no change 
│
    │ QQuery 7  │ 104.82 ms │                          46.45 ms │ +2.26x faster 
│
    │ QQuery 8  │  97.98 ms │                          34.09 ms │ +2.87x faster 
│
    │ QQuery 9  │  86.25 ms │                          43.32 ms │ +1.99x faster 
│
    │ QQuery 10 │ 106.09 ms │                          41.49 ms │ +2.56x faster 
│
    │ QQuery 11 │  13.77 ms │                          11.15 ms │ +1.24x faster 
│
    │ QQuery 12 │  54.57 ms │                          30.04 ms │ +1.82x faster 
│
    │ QQuery 13 │  21.71 ms │                          21.74 ms │     no change 
│
    │ QQuery 14 │  51.38 ms │                          21.87 ms │ +2.35x faster 
│
    │ QQuery 15 │  35.13 ms │                          27.95 ms │ +1.26x faster 
│
    │ QQuery 16 │  13.04 ms │                          12.05 ms │ +1.08x faster 
│
    │ QQuery 17 │  82.94 ms │                          53.05 ms │ +1.56x faster 
│
    │ QQuery 18 │ 109.92 ms │                          61.16 ms │ +1.80x faster 
│
    │ QQuery 19 │  37.57 ms │                          37.79 ms │     no change 
│
    │ QQuery 20 │  60.77 ms │                          26.21 ms │ +2.32x faster 
│
    │ QQuery 21 │  78.28 ms │                          54.11 ms │ +1.45x faster 
│
    │ QQuery 22 │   8.48 ms │                           8.73 ms │     no change 
│
    
└───────────┴───────────┴───────────────────────────────────┴───────────────┘
    ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
    ┃ Benchmark Summary                                ┃           ┃
    ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
    │ Total Time (main)                                │ 1191.17ms │
    │ Total Time (hash-join-buffering-on-probe-side)   │  690.57ms │
    │ Average Time (main)                              │   54.14ms │
    │ Average Time (hash-join-buffering-on-probe-side) │   31.39ms │
    │ Queries Faster                                   │        16 │
    │ Queries Slower                                   │         1 │
    │ Queries with No Change                           │         5 │
    │ Queries with Failure                             │         0 │
    └──────────────────────────────────────────────────┴───────────┘
    --------------------
    Benchmark tpch_sf10.json
    --------------------
    
┏━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
    ┃ Query     ┃      main ┃ hash-join-buffering-on-probe-side ┃        Change 
┃
    
┡━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
    │ QQuery 1  │ 373.55 ms │                         331.39 ms │ +1.13x faster 
│
    │ QQuery 2  │ 150.76 ms │                          89.50 ms │ +1.68x faster 
│
    │ QQuery 3  │ 320.66 ms │                         271.20 ms │ +1.18x faster 
│
    │ QQuery 4  │ 132.21 ms │                         115.69 ms │ +1.14x faster 
│
    │ QQuery 5  │ 462.29 ms │                         401.73 ms │ +1.15x faster 
│
    │ QQuery 6  │ 150.85 ms │                         124.88 ms │ +1.21x faster 
│
    │ QQuery 7  │ 631.13 ms │                         547.52 ms │ +1.15x faster 
│
    │ QQuery 8  │ 593.22 ms │                         445.85 ms │ +1.33x faster 
│
    │ QQuery 9  │ 780.22 ms │                         657.02 ms │ +1.19x faster 
│
    │ QQuery 10 │ 495.89 ms │                         324.21 ms │ +1.53x faster 
│
    │ QQuery 11 │ 144.90 ms │                          88.97 ms │ +1.63x faster 
│
    │ QQuery 12 │ 263.50 ms │                         188.59 ms │ +1.40x faster 
│
    │ QQuery 13 │ 287.01 ms │                         217.33 ms │ +1.32x faster 
│
    │ QQuery 14 │ 248.88 ms │                         166.86 ms │ +1.49x faster 
│
    │ QQuery 15 │ 399.52 ms │                         280.62 ms │ +1.42x faster 
│
    │ QQuery 16 │  97.62 ms │                          65.14 ms │ +1.50x faster 
│
    │ QQuery 17 │ 780.01 ms │                         641.17 ms │ +1.22x faster 
│
    │ QQuery 18 │ 824.42 ms │                         696.09 ms │ +1.18x faster 
│
    │ QQuery 19 │ 367.17 ms │                         268.54 ms │ +1.37x faster 
│
    │ QQuery 20 │ 332.86 ms │                         241.19 ms │ +1.38x faster 
│
    │ QQuery 21 │ 856.49 ms │                         697.65 ms │ +1.23x faster 
│
    │ QQuery 22 │  89.72 ms │                          72.73 ms │ +1.23x faster 
│
    
└───────────┴───────────┴───────────────────────────────────┴───────────────┘
    ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
    ┃ Benchmark Summary                                ┃           ┃
    ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
    │ Total Time (main)                                │ 8782.89ms │
    │ Total Time (hash-join-buffering-on-probe-side)   │ 6933.87ms │
    │ Average Time (main)                              │  399.22ms │
    │ Average Time (hash-join-buffering-on-probe-side) │  315.18ms │
    │ Queries Faster                                   │        22 │
    │ Queries Slower                                   │         0 │
    │ Queries with No Change                           │         0 │
    │ Queries with Failure                             │         0 │
    └──────────────────────────────────────────────────┴───────────┘
    ```
---
 benchmarks/src/imdb/run.rs                         |   8 ++
 benchmarks/src/tpcds/run.rs                        |   6 ++
 benchmarks/src/tpch/run.rs                         |   8 ++
 datafusion/common/src/config.rs                    |  15 +++
 .../physical-optimizer/src/hash_join_buffering.rs  | 103 +++++++++++++++++++++
 datafusion/physical-optimizer/src/lib.rs           |   1 +
 datafusion/physical-optimizer/src/optimizer.rs     |   5 +
 datafusion/sqllogictest/test_files/explain.slt     |   4 +
 .../sqllogictest/test_files/information_schema.slt |   2 +
 datafusion/sqllogictest/test_files/struct.slt      |   2 +-
 docs/source/user-guide/configs.md                  |   1 +
 11 files changed, 154 insertions(+), 1 deletion(-)

diff --git a/benchmarks/src/imdb/run.rs b/benchmarks/src/imdb/run.rs
index 9ddea67148..29ca5249aa 100644
--- a/benchmarks/src/imdb/run.rs
+++ b/benchmarks/src/imdb/run.rs
@@ -92,6 +92,10 @@ pub struct RunOpt {
     /// True by default.
     #[arg(short = 'j', long = "prefer_hash_join", default_value = "true")]
     prefer_hash_join: BoolDefaultTrue,
+
+    /// How many bytes to buffer on the probe side of hash joins.
+    #[arg(long, default_value = "0")]
+    hash_join_buffering_capacity: usize,
 }
 
 fn map_query_id_to_str(query_id: usize) -> &'static str {
@@ -306,6 +310,8 @@ impl RunOpt {
             .config()?
             .with_collect_statistics(!self.disable_statistics);
         config.options_mut().optimizer.prefer_hash_join = 
self.prefer_hash_join;
+        config.options_mut().execution.hash_join_buffering_capacity =
+            self.hash_join_buffering_capacity;
         let rt_builder = self.common.runtime_env_builder()?;
         let ctx = SessionContext::new_with_config_rt(config, 
rt_builder.build_arc()?);
 
@@ -527,6 +533,7 @@ mod tests {
             output_path: None,
             disable_statistics: false,
             prefer_hash_join: true,
+            hash_join_buffering_capacity: 0,
         };
         opt.register_tables(&ctx).await?;
         let queries = get_query_sql(map_query_id_to_str(query))?;
@@ -563,6 +570,7 @@ mod tests {
             output_path: None,
             disable_statistics: false,
             prefer_hash_join: true,
+            hash_join_buffering_capacity: 0,
         };
         opt.register_tables(&ctx).await?;
         let queries = get_query_sql(map_query_id_to_str(query))?;
diff --git a/benchmarks/src/tpcds/run.rs b/benchmarks/src/tpcds/run.rs
index 586ee754d2..8e24b121b2 100644
--- a/benchmarks/src/tpcds/run.rs
+++ b/benchmarks/src/tpcds/run.rs
@@ -144,6 +144,10 @@ pub struct RunOpt {
     /// The tables should have been created with the `--sort` option for this 
to have any effect.
     #[arg(short = 't', long = "sorted")]
     sorted: bool,
+
+    /// How many bytes to buffer on the probe side of hash joins.
+    #[arg(long, default_value = "0")]
+    hash_join_buffering_capacity: usize,
 }
 
 impl RunOpt {
@@ -162,6 +166,8 @@ impl RunOpt {
         config.options_mut().optimizer.prefer_hash_join = 
self.prefer_hash_join;
         config.options_mut().optimizer.enable_piecewise_merge_join =
             self.enable_piecewise_merge_join;
+        config.options_mut().execution.hash_join_buffering_capacity =
+            self.hash_join_buffering_capacity;
         let rt_builder = self.common.runtime_env_builder()?;
         let ctx = SessionContext::new_with_config_rt(config, 
rt_builder.build_arc()?);
         // register tables
diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs
index 9706296fea..392e02f847 100644
--- a/benchmarks/src/tpch/run.rs
+++ b/benchmarks/src/tpch/run.rs
@@ -105,6 +105,10 @@ pub struct RunOpt {
     /// The tables should have been created with the `--sort` option for this 
to have any effect.
     #[arg(short = 't', long = "sorted")]
     sorted: bool,
+
+    /// How many bytes to buffer on the probe side of hash joins.
+    #[arg(long, default_value = "0")]
+    hash_join_buffering_capacity: usize,
 }
 
 impl RunOpt {
@@ -123,6 +127,8 @@ impl RunOpt {
         config.options_mut().optimizer.prefer_hash_join = 
self.prefer_hash_join;
         config.options_mut().optimizer.enable_piecewise_merge_join =
             self.enable_piecewise_merge_join;
+        config.options_mut().execution.hash_join_buffering_capacity =
+            self.hash_join_buffering_capacity;
         let rt_builder = self.common.runtime_env_builder()?;
         let ctx = SessionContext::new_with_config_rt(config, 
rt_builder.build_arc()?);
         // register tables
@@ -392,6 +398,7 @@ mod tests {
             prefer_hash_join: true,
             enable_piecewise_merge_join: false,
             sorted: false,
+            hash_join_buffering_capacity: 0,
         };
         opt.register_tables(&ctx).await?;
         let queries = get_query_sql(query)?;
@@ -430,6 +437,7 @@ mod tests {
             prefer_hash_join: true,
             enable_piecewise_merge_join: false,
             sorted: false,
+            hash_join_buffering_capacity: 0,
         };
         opt.register_tables(&ctx).await?;
         let queries = get_query_sql(query)?;
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 7238d842e3..15eb279079 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -669,6 +669,21 @@ config_namespace! {
         /// # Default
         /// `false` — ANSI SQL mode is disabled by default.
         pub enable_ansi_mode: bool, default = false
+
+        /// How many bytes to buffer in the probe side of hash joins while the 
build side is
+        /// concurrently being built.
+        ///
+        /// Without this, hash joins will wait until the full materialization 
of the build side
+        /// before polling the probe side. This is useful in scenarios where 
the query is not
+        /// completely CPU bounded, allowing to do some early work 
concurrently and reducing the
+        /// latency of the query.
+        ///
+        /// Note that when hash join buffering is enabled, the probe side will 
start eagerly
+        /// polling data, not giving time for the producer side of dynamic 
filters to produce any
+        /// meaningful predicate. Queries with dynamic filters might see 
performance degradation.
+        ///
+        /// Disabled by default, set to a number greater than 0 for enabling 
it.
+        pub hash_join_buffering_capacity: usize, default = 0
     }
 }
 
diff --git a/datafusion/physical-optimizer/src/hash_join_buffering.rs 
b/datafusion/physical-optimizer/src/hash_join_buffering.rs
new file mode 100644
index 0000000000..3c29b46c0f
--- /dev/null
+++ b/datafusion/physical-optimizer/src/hash_join_buffering.rs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::PhysicalOptimizerRule;
+use datafusion_common::JoinSide;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_physical_plan::ExecutionPlan;
+use datafusion_physical_plan::buffer::BufferExec;
+use datafusion_physical_plan::joins::HashJoinExec;
+use std::sync::Arc;
+
+/// Looks for all the [HashJoinExec]s in the plan and places a [BufferExec] 
node with the
+/// configured capacity in the probe side:
+///
+/// ```text
+///            ┌───────────────────┐
+///            │   HashJoinExec    │
+///            └─────▲────────▲────┘
+///          ┌───────┘        └─────────┐
+///          │                          │
+/// ┌────────────────┐         ┌─────────────────┐
+/// │   Build side   │       + │   BufferExec    │
+/// └────────────────┘         └────────▲────────┘
+///                                     │
+///                            ┌────────┴────────┐
+///                            │   Probe side    │
+///                            └─────────────────┘
+/// ```
+///
+/// Which allows eagerly pulling it even before the build side has completely 
finished.
+#[derive(Debug, Default)]
+pub struct HashJoinBuffering {}
+
+impl HashJoinBuffering {
+    pub fn new() -> Self {
+        Self::default()
+    }
+}
+
+impl PhysicalOptimizerRule for HashJoinBuffering {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+        let capacity = config.execution.hash_join_buffering_capacity;
+        if capacity == 0 {
+            return Ok(plan);
+        }
+
+        plan.transform_down(|plan| {
+            let Some(node) = plan.as_any().downcast_ref::<HashJoinExec>() else 
{
+                return Ok(Transformed::no(plan));
+            };
+            let plan = Arc::clone(&plan);
+            Ok(Transformed::yes(
+                if HashJoinExec::probe_side() == JoinSide::Left {
+                    // Do not stack BufferExec nodes together.
+                    if 
node.left.as_any().downcast_ref::<BufferExec>().is_some() {
+                        return Ok(Transformed::no(plan));
+                    }
+                    plan.with_new_children(vec![
+                        Arc::new(BufferExec::new(Arc::clone(&node.left), 
capacity)),
+                        Arc::clone(&node.right),
+                    ])?
+                } else {
+                    // Do not stack BufferExec nodes together.
+                    if 
node.right.as_any().downcast_ref::<BufferExec>().is_some() {
+                        return Ok(Transformed::no(plan));
+                    }
+                    plan.with_new_children(vec![
+                        Arc::clone(&node.left),
+                        Arc::new(BufferExec::new(Arc::clone(&node.right), 
capacity)),
+                    ])?
+                },
+            ))
+        })
+        .data()
+    }
+
+    fn name(&self) -> &str {
+        "HashJoinBuffering"
+    }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
+}
diff --git a/datafusion/physical-optimizer/src/lib.rs 
b/datafusion/physical-optimizer/src/lib.rs
index 3a0d79ae2d..98331a94e3 100644
--- a/datafusion/physical-optimizer/src/lib.rs
+++ b/datafusion/physical-optimizer/src/lib.rs
@@ -39,6 +39,7 @@ pub mod optimizer;
 pub mod output_requirements;
 pub mod projection_pushdown;
 pub use datafusion_pruning as pruning;
+pub mod hash_join_buffering;
 pub mod pushdown_sort;
 pub mod sanity_checker;
 pub mod topk_aggregation;
diff --git a/datafusion/physical-optimizer/src/optimizer.rs 
b/datafusion/physical-optimizer/src/optimizer.rs
index 49225db03a..f31a1f981f 100644
--- a/datafusion/physical-optimizer/src/optimizer.rs
+++ b/datafusion/physical-optimizer/src/optimizer.rs
@@ -35,6 +35,7 @@ use crate::sanity_checker::SanityCheckPlan;
 use crate::topk_aggregation::TopKAggregation;
 use crate::update_aggr_exprs::OptimizeAggregateOrder;
 
+use crate::hash_join_buffering::HashJoinBuffering;
 use crate::limit_pushdown_past_window::LimitPushPastWindows;
 use crate::pushdown_sort::PushdownSort;
 use datafusion_common::Result;
@@ -137,6 +138,10 @@ impl PhysicalOptimizer {
             // This can possibly be combined with [LimitPushdown]
             // It needs to come after [EnforceSorting]
             Arc::new(LimitPushPastWindows::new()),
+            // The HashJoinBuffering rule adds a BufferExec node with the 
configured capacity
+            // in the prob side of hash joins. That way, the probe side gets 
eagerly polled before
+            // the build side is completely finished.
+            Arc::new(HashJoinBuffering::new()),
             // The LimitPushdown rule tries to push limits down as far as 
possible,
             // replacing operators with fetching variants, or adding limits
             // past operators that support limit pushdown.
diff --git a/datafusion/sqllogictest/test_files/explain.slt 
b/datafusion/sqllogictest/test_files/explain.slt
index c5907d4975..6b50a1d1cb 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -243,6 +243,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], file_type=csv, has_header=true
 physical_plan after LimitAggregation SAME TEXT AS ABOVE
 physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
+physical_plan after HashJoinBuffering SAME TEXT AS ABOVE
 physical_plan after LimitPushdown SAME TEXT AS ABOVE
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after PushdownSort SAME TEXT AS ABOVE
@@ -323,6 +324,7 @@ physical_plan after OutputRequirements
 02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: 
ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]: 
ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: 
ScanBytes=Exact(32)),(Col[5]: ScanBytes= [...]
 physical_plan after LimitAggregation SAME TEXT AS ABOVE
 physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
+physical_plan after HashJoinBuffering SAME TEXT AS ABOVE
 physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]: 
ScanBytes=Exact(32)),(Col[1]: ScanBytes=Inexact(24)),(Col[2]: 
ScanBytes=Exact(32)),(Col[3]: ScanBytes=Exact(32)),(Col[4]: ScanBytes=E [...]
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after PushdownSort SAME TEXT AS ABOVE
@@ -367,6 +369,7 @@ physical_plan after OutputRequirements
 02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet
 physical_plan after LimitAggregation SAME TEXT AS ABOVE
 physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
+physical_plan after HashJoinBuffering SAME TEXT AS ABOVE
 physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after PushdownSort SAME TEXT AS ABOVE
@@ -608,6 +611,7 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], file_type=csv, has_header=true
 physical_plan after LimitAggregation SAME TEXT AS ABOVE
 physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
+physical_plan after HashJoinBuffering SAME TEXT AS ABOVE
 physical_plan after LimitPushdown SAME TEXT AS ABOVE
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after PushdownSort SAME TEXT AS ABOVE
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt 
b/datafusion/sqllogictest/test_files/information_schema.slt
index b61ceecb24..18f6db9cdc 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -220,6 +220,7 @@ datafusion.execution.collect_statistics true
 datafusion.execution.enable_ansi_mode false
 datafusion.execution.enable_recursive_ctes true
 datafusion.execution.enforce_batch_size_in_joins false
+datafusion.execution.hash_join_buffering_capacity 0
 datafusion.execution.keep_partition_by_columns false
 datafusion.execution.listing_table_factory_infer_partitions true
 datafusion.execution.listing_table_ignore_subdirectory true
@@ -358,6 +359,7 @@ datafusion.execution.collect_statistics true Should 
DataFusion collect statistic
 datafusion.execution.enable_ansi_mode false Whether to enable ANSI SQL mode. 
The flag is experimental and relevant only for DataFusion Spark built-in 
functions When `enable_ansi_mode` is set to `true`, the query engine follows 
ANSI SQL semantics for expressions, casting, and error handling. This means: - 
**Strict type coercion rules:** implicit casts between incompatible types are 
disallowed. - **Standard SQL arithmetic behavior:** operations such as division 
by zero,   numeric overflow, [...]
 datafusion.execution.enable_recursive_ctes true Should DataFusion support 
recursive CTEs
 datafusion.execution.enforce_batch_size_in_joins false Should DataFusion 
enforce batch size in joins or not. By default, DataFusion will not enforce 
batch size in joins. Enforcing batch size in joins can reduce memory usage when 
joining large tables with a highly-selective join filter, but is also slightly 
slower.
+datafusion.execution.hash_join_buffering_capacity 0 How many bytes to buffer 
in the probe side of hash joins while the build side is concurrently being 
built. Without this, hash joins will wait until the full materialization of the 
build side before polling the probe side. This is useful in scenarios where the 
query is not completely CPU bounded, allowing to do some early work 
concurrently and reducing the latency of the query. Note that when hash join 
buffering is enabled, the probe sid [...]
 datafusion.execution.keep_partition_by_columns false Should DataFusion keep 
the columns used for partition_by in the output RecordBatches
 datafusion.execution.listing_table_factory_infer_partitions true Should a 
`ListingTable` created through the `ListingTableFactory` infer table partitions 
from Hive compliant directories. Defaults to true (partition columns are 
inferred and will be represented in the table schema).
 datafusion.execution.listing_table_ignore_subdirectory true Should sub 
directories be ignored when scanning directories for data files. Defaults to 
true (ignores subdirectories), consistent with Hive. Note that this setting 
does not affect reading partitioned tables (e.g. 
`/table/year=2021/month=01/data.parquet`).
diff --git a/datafusion/sqllogictest/test_files/struct.slt 
b/datafusion/sqllogictest/test_files/struct.slt
index e20815a58c..53a1bb4ec6 100644
--- a/datafusion/sqllogictest/test_files/struct.slt
+++ b/datafusion/sqllogictest/test_files/struct.slt
@@ -1666,4 +1666,4 @@ order by id;
 3 2 150
 
 statement ok
-drop table t_agg_window;
\ No newline at end of file
+drop table t_agg_window;
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index f33e6314d3..ede1652fdf 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -133,6 +133,7 @@ The following configuration settings are available:
 | datafusion.execution.enforce_batch_size_in_joins                        | 
false                     | Should DataFusion enforce batch size in joins or 
not. By default, DataFusion will not enforce batch size in joins. Enforcing 
batch size in joins can reduce memory usage when joining large tables with a 
highly-selective join filter, but is also slightly slower.                      
                                                                                
                           [...]
 | datafusion.execution.objectstore_writer_buffer_size                     | 
10485760                  | Size (bytes) of data buffer DataFusion uses when 
writing output files. This affects the size of the data chunks that are 
uploaded to remote object stores (e.g. AWS S3). If very large (>= 100 GiB) 
output files are being written, it may be necessary to increase this size to 
avoid errors from the remote end point.                                         
                                    [...]
 | datafusion.execution.enable_ansi_mode                                   | 
false                     | Whether to enable ANSI SQL mode. The flag is 
experimental and relevant only for DataFusion Spark built-in functions When 
`enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL 
semantics for expressions, casting, and error handling. This means: - **Strict 
type coercion rules:** implicit casts between incompatible types are 
disallowed. - **Standard SQL arithmetic behavior [...]
+| datafusion.execution.hash_join_buffering_capacity                       | 0  
                       | How many bytes to buffer in the probe side of hash 
joins while the build side is concurrently being built. Without this, hash 
joins will wait until the full materialization of the build side before polling 
the probe side. This is useful in scenarios where the query is not completely 
CPU bounded, allowing to do some early work concurrently and reducing the 
latency of the query. Note tha [...]
 | datafusion.optimizer.enable_distinct_aggregation_soft_limit             | 
true                      | When set to true, the optimizer will push a limit 
operation into grouped aggregations which have no aggregate expressions, as a 
soft limit, emitting groups once the limit is reached, before all rows in the 
group are read.                                                                 
                                                                                
                       [...]
 | datafusion.optimizer.enable_round_robin_repartition                     | 
true                      | When set to true, the physical plan optimizer will 
try to add round robin repartitioning to increase parallelism to leverage more 
CPU cores                                                                       
                                                                                
                                                                                
                   [...]
 | datafusion.optimizer.enable_topk_aggregation                            | 
true                      | When set to true, the optimizer will attempt to 
perform limit operations during aggregations, if possible                       
                                                                                
                                                                                
                                                                                
                     [...]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to