jonathanc-n opened a new pull request, #16660:
URL: https://github.com/apache/datafusion/pull/16660

   ## Rationale for this change
   `PiecewiseMergeJoin` is a nice pre cursor to the implementation of ASOF, 
inequality, etc. joins (multiple range predicates). `PiecewiseMergeJoin` is 
specialized for when there is only one range filter and can perform much faster 
in this case especially for semi, anti, mark joins.
   <!--
    Why are you proposing this change? If this is already explained clearly in 
the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your 
changes and offer better suggestions for fixes.  
   -->
   
   ## What changes are included in this PR?
   `PiecewiseMergeJoin` implementation only, there is no `physical planner -> 
PiecewiseMergeJoinExec`.  
   
   `ExecutionPlan` has been implemented for `PiecewiseMergeJoinExec`
   - Currently `compute_properties` and `swap_inputs` is not implemented
   - Builds execution plan for piecewise merge join exec
   
   `PiecewiseMergeJoinStream` has been implemented Examples have been provided 
for the `PiecewiseMergeJoinExec` and `PiecewiseMergeJoinStream` 
implementations. 
   <!--
   There is no need to duplicate the description in the issue here but it is 
sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   ## Benchmark Results
   The benchmarks were tested on a random batch of values (streamed side) 
against a sorted batch (buffered side).
   
   For the regular join tests i was using the larger side as the left side as 
it gave more performance, but this shouldn't make sense as the left side will 
need to be sorted. I will look into this on a follow up pull request which will 
be coupled with the side swapping.
   When compared to NestedLoopJoin the queries for classic joins (left, right, 
inner, full) were about 10x faster  🚀 
    - However, when larger batch sizes were equal, it performed slower than the 
Nested loop join.
   
   For existence joins (semi, anti), the join performed about 1000 x faster 🚀 
    - Just as a quick note to explain the ridiculous speedup, all we need to do 
instead of a cartesian product, is find the max/min value of the unsorted 
stream side, and do a O(n) scan of the sorted buffered side to find the first 
match and emit all rows after it.
   
   <details>
     <summary>Benchmark Results for normal joins</summary>
   
   ```
   
      joins/PiecewiseMergeJoin/l=1000_r=1000
                           time:   [345.69 µs 351.99 µs 361.11 µs]
                           change: [-4.0041% -2.4315% -0.4405%] (p = 0.01 < 
0.05)
                           Change within noise threshold.
   Found 10 outliers among 100 measurements (10.00%)
     7 (7.00%) high mild
     3 (3.00%) high severe
   joins/NestedLoopJoin/l=1000_r=1000
                           time:   [2.6237 ms 2.6518 ms 2.6870 ms]
                           change: [-4.0439% +0.5217% +4.4183%] (p = 0.84 > 
0.05)
                           No change in performance detected.
   Found 15 outliers among 100 measurements (15.00%)
     4 (4.00%) high mild
     11 (11.00%) high severe
   Benchmarking joins/PiecewiseMergeJoin/l=10000_r=10000: Warming up for 3.0000 
s
   Warning: Unable to complete 100 samples in 5.0s. You may wish to increase 
target time to 52.4s, or reduce sample count to 10.
   joins/PiecewiseMergeJoin/l=10000_r=10000
                           time:   [490.26 ms 501.24 ms 513.75 ms]
                           change: [-14.807% -9.4227% -4.1141%] (p = 0.00 < 
0.05)
                           Performance has improved.
   Found 12 outliers among 100 measurements (12.00%)
     8 (8.00%) high mild
     4 (4.00%) high severe
   Benchmarking joins/NestedLoopJoin/l=10000_r=10000: Warming up for 3.0000 s
   Warning: Unable to complete 100 samples in 5.0s. You may wish to increase 
target time to 32.9s, or reduce sample count to 10.
   joins/NestedLoopJoin/l=10000_r=10000
                           time:   [325.74 ms 330.41 ms 335.76 ms]
                           change: [-30.701% -25.545% -20.089%] (p = 0.00 < 
0.05)
                           Performance has improved.
   Found 9 outliers among 100 measurements (9.00%)
     1 (1.00%) high mild
     8 (8.00%) high severe
   joins/PiecewiseMergeJoin/l=100000_r=1000
                           time:   [46.738 ms 47.037 ms 47.348 ms]
                           change: [+6.8565% +7.8729% +8.8987%] (p = 0.00 < 
0.05)
                           Performance has regressed.
   Found 1 outliers among 100 measurements (1.00%)
     1 (1.00%) high mild
   Benchmarking joins/NestedLoopJoin/l=100000_r=1000: Warming up for 3.0000 s
   Warning: Unable to complete 100 samples in 5.0s. You may wish to increase 
target time to 34.7s, or reduce sample count to 10.
   joins/NestedLoopJoin/l=100000_r=1000
                           time:   [337.92 ms 355.00 ms 375.33 ms]
                           change: [+3.4274% +8.8931% +15.219%] (p = 0.00 < 
0.05)
                           Performance has regressed.
   Found 17 outliers among 100 measurements (17.00%)
     4 (4.00%) high mild
     13 (13.00%) high severe
   joins/PiecewiseMergeJoin/l=10000_r=100
                           time:   [353.07 µs 356.19 µs 359.16 µs]
                           change: [-20.427% -19.045% -17.788%] (p = 0.00 < 
0.05)
                           Performance has improved.
   Found 3 outliers among 100 measurements (3.00%)
     3 (3.00%) low mild
   joins/NestedLoopJoin/l=10000_r=100
                           time:   [2.4624 ms 2.4690 ms 2.4759 ms]
                           change: [-35.277% -26.644% -17.558%] (p = 0.00 < 
0.05)
                           Performance has improved.
   Found 2 outliers among 100 measurements (2.00%)
     1 (1.00%) high mild
     1 (1.00%) high severe
   joins/PiecewiseMergeJoin/l=1000000_r=100
                           time:   [49.569 ms 49.788 ms 50.071 ms]
                           change: [-11.268% -8.9464% -7.0861%] (p = 0.00 < 
0.05)
                           Performance has improved.
   Found 6 outliers among 100 measurements (6.00%)
     4 (4.00%) high mild
     2 (2.00%) high severe
   Benchmarking joins/NestedLoopJoin/l=1000000_r=100: Warming up for 3.0000 s
   Warning: Unable to complete 100 samples in 5.0s. You may wish to increase 
target time to 32.6s, or reduce sample count to 10.
   joins/NestedLoopJoin/l=1000000_r=100
                           time:   [318.73 ms 321.16 ms 324.12 ms]
                           change: [-2.3069% -0.7454% +0.6191%] (p = 0.35 > 
0.05)
                           No change in performance detected.
   Found 3 outliers among 100 measurements (3.00%)
     1 (1.00%) high mild
     2 (2.00%) high severe
   
   ```
     
   </details>
   
   <details>
   
   
     <summary>Benchmark Results for existence joins</summary>
     
   ```  
   
   joins/PiecewiseMergeJoin/l=1000_r=1000
                           time:   [17.562 µs 17.856 µs 18.368 µs]
                           change: [-95.034% -94.834% -94.578%] (p = 0.00 < 
0.05)
                           Performance has improved.
   Found 5 outliers among 100 measurements (5.00%)
     5 (5.00%) high severe
   joins/NestedLoopJoin/l=1000_r=1000
                           time:   [2.5747 ms 2.6143 ms 2.6718 ms]
                           change: [-3.5382% -1.4140% +1.1788%] (p = 0.24 > 
0.05)
                           No change in performance detected.
   Found 2 outliers among 100 measurements (2.00%)
     2 (2.00%) high severe
   joins/PiecewiseMergeJoin/l=10000_r=10000
                           time:   [126.97 µs 130.34 µs 133.60 µs]
                           change: [-99.975% -99.974% -99.973%] (p = 0.00 < 
0.05)
                           Performance has improved.
   Found 21 outliers among 100 measurements (21.00%)
     16 (16.00%) low mild
     5 (5.00%) high mild
   Benchmarking joins/NestedLoopJoin/l=10000_r=10000: Warming up for 3.0000 s
   Warning: Unable to complete 100 samples in 5.0s. You may wish to increase 
target time to 32.1s, or reduce sample count to 10.
   joins/NestedLoopJoin/l=10000_r=10000
                           time:   [324.45 ms 329.32 ms 335.64 ms]
                           change: [-2.5195% -0.3276% +2.0344%] (p = 0.79 > 
0.05)
                           No change in performance detected.
   Found 8 outliers among 100 measurements (8.00%)
     3 (3.00%) high mild
     5 (5.00%) high severe
   Benchmarking joins/PiecewiseMergeJoin/l=100000_r=1000: Warming up for 3.0000 
s
   Warning: Unable to complete 100 samples in 5.0s. You may wish to increase 
target time to 5.7s, enable flat sampling, or reduce sample count to 60.
     
      joins/PiecewiseMergeJoin/l=1000_r=10000
                           time:   [21.704 µs 21.851 µs 22.039 µs]
                           change: [-99.951% -99.951% -99.951%] (p = 0.00 < 
0.05)
                           Performance has improved.
   Found 4 outliers among 100 measurements (4.00%)
     1 (1.00%) low mild
     3 (3.00%) high severe
   joins/NestedLoopJoin/l=1000_r=10000
                           time:   [26.852 ms 27.166 ms 27.482 ms]
                           change: [-34.057% -28.293% -22.241%] (p = 0.00 < 
0.05)
                           Performance has improved.
   joins/PiecewiseMergeJoin/l=100_r=100000
                           time:   [74.249 µs 74.381 µs 74.516 µs]
                           change: [-99.952% -99.952% -99.951%] (p = 0.00 < 
0.05)
                           Performance has improved.
   Found 8 outliers among 100 measurements (8.00%)
     7 (7.00%) high mild
     1 (1.00%) high severe
   joins/NestedLoopJoin/l=100_r=100000
                           time:   [25.960 ms 26.343 ms 26.807 ms]
                           change: [-1.0541% +0.8379% +2.8866%] (p = 0.41 > 
0.05)
                           No change in performance detected.
   Found 2 outliers among 100 measurements (2.00%)
     2 (2.00%) high severe
   joins/PiecewiseMergeJoin/l=1000_r=100000
                           time:   [82.470 µs 83.025 µs 83.761 µs]
                           change: [-99.996% -99.996% -99.996%] (p = 0.00 < 
0.05)
                           Performance has improved.
   Found 12 outliers among 100 measurements (12.00%)
     6 (6.00%) high mild
     6 (6.00%) high severe
   Benchmarking joins/NestedLoopJoin/l=1000_r=100000: Warming up for 3.0000 s
   Warning: Unable to complete 100 samples in 5.0s. You may wish to increase 
target time to 33.0s, or reduce sample count to 10.
   joins/NestedLoopJoin/l=1000_r=100000
                           time:   [322.35 ms 323.86 ms 325.53 ms]
                           change: [-33.068% -26.620% -19.778%] (p = 0.00 < 
0.05)
                           Performance has improved.
   Found 2 outliers among 100 measurements (2.00%)
     1 (1.00%) high mild
     1 (1.00%) high severe
     
   ```
   
   </details>
   
   If you want to replicate:
   
   <details>
     <summary>Code</summary>
   
     Here’s the hidden content that you can put **Markdown** in,
     including lists, code blocks, images, etc.
   
     ```rust
   
     use std::sync::Arc;
   
   use arrow::array::{
       ArrayRef, Date32Builder, Decimal128Builder, Int32Builder, RecordBatch, 
StringBuilder,
   };
   use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
   use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, 
Criterion};
   use datafusion_common::{JoinSide, Result};
   use datafusion_execution::TaskContext;
   use datafusion_expr::{JoinType, Operator};
   use datafusion_physical_expr::expressions::{BinaryExpr, Column};
   
   use datafusion_physical_expr::PhysicalExpr;
   use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
   use datafusion_physical_plan::joins::{NestedLoopJoinExec, 
PiecewiseMergeJoinExec};
   use datafusion_physical_plan::test::TestMemoryExec;
   use datafusion_physical_plan::{collect, ExecutionPlan};
   use rand::{rng, Rng};
   use tokio::runtime::Runtime;
   
   /// Creates a RecordBatch of `num_rows` with completely random values in [0, 
100_000].
   pub fn create_random_batch(num_rows: usize) -> RecordBatch {
       let schema = Arc::new(Schema::new(vec![
           Field::new("c0", DataType::Int32, true),
           Field::new("c1", DataType::Utf8, true),
           Field::new("c2", DataType::Date32, true),
           Field::new("c3", DataType::Decimal128(11, 2), true),
       ]));
   
       let mut rng = rng();
       let mut a = Int32Builder::new();
       let mut b = StringBuilder::new();
       let mut c = Date32Builder::new();
       let mut d = Decimal128Builder::new()
           .with_precision_and_scale(11, 2)
           .unwrap();
   
       for _ in 0..num_rows {
           let int_val = rng.random_range(0..=100_000);
           a.append_value(int_val);
           b.append_value(format!("string_{int_val}"));
           c.append_value(int_val);
           let dec_val = (rng.random_range(0..=100_000) as i128) * 100;
           d.append_value(dec_val);
       }
   
       let a = Arc::new(a.finish()) as ArrayRef;
       let b = Arc::new(b.finish()) as ArrayRef;
       let c = Arc::new(c.finish()) as ArrayRef;
       let d = Arc::new(d.finish()) as ArrayRef;
   
       RecordBatch::try_new(schema.clone(), vec![a, b, c, d]).unwrap()
   }
   
   pub fn create_sorted_batch(num_rows: usize, max_increment: i32) -> 
RecordBatch {
       let schema = Arc::new(Schema::new(vec![
           Field::new("c0", DataType::Int32, true),
           Field::new("c1", DataType::Utf8, true),
           Field::new("c2", DataType::Date32, true),
           Field::new("c3", DataType::Decimal128(11, 2), true),
       ]));
   
       let mut rng = rng();
       let mut a = Int32Builder::new();
       let mut b = StringBuilder::new();
       let mut c = Date32Builder::new();
       let mut d = Decimal128Builder::new()
           .with_precision_and_scale(11, 2)
           .unwrap();
   
       let mut current = rng.random_range(0..=max_increment);
       for _ in 0..num_rows {
           let inc = rng.random_range(0..=max_increment);
           current = current.saturating_add(inc);
           a.append_value(current);
           b.append_value(format!("string_{current}"));
           c.append_value(current);
           d.append_value((current as i128) * 100);
       }
   
       let a = Arc::new(a.finish()) as ArrayRef;
       let b = Arc::new(b.finish()) as ArrayRef;
       let c = Arc::new(c.finish()) as ArrayRef;
       let d = Arc::new(d.finish()) as ArrayRef;
   
       RecordBatch::try_new(schema.clone(), vec![a, b, c, d]).unwrap()
   }
   
   fn make_memory_execs(
       left_rows: usize,
       right_rows: usize,
   ) -> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>, SchemaRef) {
       let left_batch = create_random_batch(left_rows);
       let schema = left_batch.schema();
       let left_partitions = vec![vec![left_batch]];
   
       let right_batch = create_sorted_batch(right_rows, 10);
       let right_partitions = vec![vec![right_batch]];
   
       let left_mem =
           TestMemoryExec::try_new_exec(&left_partitions, schema.clone(), 
None).unwrap();
       let right_mem =
           TestMemoryExec::try_new_exec(&right_partitions, schema.clone(), 
None).unwrap();
   
       (left_mem, right_mem, schema)
   }
   
   fn build_two_joins(
       left: Arc<dyn ExecutionPlan>,
       right: Arc<dyn ExecutionPlan>,
   ) -> Result<(
       Arc<dyn ExecutionPlan>, // pwmj
       Arc<dyn ExecutionPlan>, // nlj
   )> {
       let left_on: Arc<dyn PhysicalExpr> = Arc::new(
           Column::new_with_schema("c0", &left.schema())
               .expect("left schema must contain 'c0'"),
       );
       let right_on: Arc<dyn PhysicalExpr> = Arc::new(
           Column::new_with_schema("c0", &right.schema())
               .expect("right schema must contain 'c0'"),
       );
   
       let hj = PiecewiseMergeJoinExec::try_new(
           left.clone(),
           right.clone(),
           (left_on.clone(), right_on.clone()),
           Operator::Lt,
           JoinType::Left,
       )?;
   
       let filter_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
           left_on.clone(),
           Operator::Lt,
           right_on.clone(),
       ));
   
       let column_indices = vec![
           ColumnIndex {
               index: 0,
               side: JoinSide::Left,
           },
           ColumnIndex {
               index: 0,
               side: JoinSide::Right,
           },
       ];
   
       let intermediate_schema = Arc::new(Schema::new(vec![
           Field::new("c0_left", DataType::Int32, false),
           Field::new("c0_right", DataType::Int32, false),
       ]));
   
       let join_filter = JoinFilter::new(filter_expr, column_indices, 
intermediate_schema); // :contentReference[oaicite
       let nlj = NestedLoopJoinExec::try_new(
           left,
           right,
           Some(join_filter),
           &JoinType::Left,
           None,
       )?;
   
       Ok((Arc::new(hj), Arc::new(nlj)))
   }
   
   fn bench_joins(c: &mut Criterion) {
       let rt = Runtime::new().unwrap();
       let mut group = c.benchmark_group("joins");
   
       // row pairs for each side in benchmarks
       let size_pairs = &[
           (1000, 1000),
           (10000, 10000),
           (100000, 1000),
           (10000, 100),
           (1000000, 100),
           (1000, 10000),
           (100, 100000),
           (1000, 100000),
       ];
   
       for &(left_rows, right_rows) in size_pairs.iter() {
           let (left_mem, right_mem, _schema) = make_memory_execs(left_rows, 
right_rows);
   
           let (pwmj_join, nested_loop_join) =
               build_two_joins(left_mem.clone(), right_mem.clone()).unwrap();
   
           group.bench_with_input(
               BenchmarkId::new(
                   "PiecewiseMergeJoin",
                   format!("l={}_r={}", left_rows, right_rows),
               ),
               &pwmj_join,
               |b, plan| {
                   b.iter_batched(
                       || (),
                       |_setup| {
                           let ctx = TaskContext::default();
                           let fut = collect(plan.clone(), Arc::new(ctx));
                           rt.block_on(async {
                               let _ = fut.await.unwrap();
                           });
                       },
                       BatchSize::SmallInput,
                   )
               },
           );
   
           group.bench_with_input(
               BenchmarkId::new(
                   "NestedLoopJoin",
                   format!("l={}_r={}", left_rows, right_rows),
               ),
               &nested_loop_join,
               |b, plan| {
                   b.iter_batched(
                       || (),
                       |_setup| {
                           let ctx = TaskContext::default();
                           let fut = collect(plan.clone(), Arc::new(ctx));
                           rt.block_on(async {
                               let _ = fut.await.unwrap();
                           });
                       },
                       BatchSize::SmallInput,
                   )
               },
           );
       }
   
       group.finish();
   }
   
   criterion_group!(benches, bench_joins);
   criterion_main!(benches);
   
   ```
   
   </details>
   
   
   ## Next Steps
   Pull request was getting large, here are the following steps for this:
    - Serialization
    - Mark join support
    - swap support + looking into the bottlenecks for why the performance is so 
bad when left side is larger
    - enforce sorting + physical planner 
    - fuzz tests
    - Implement metrics + memory reservation for rest of the join
   
   ## Are these changes tested?
   Yes unit tests
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are 
they covered by existing tests)?
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to