jonathanc-n opened a new pull request, #16660: URL: https://github.com/apache/datafusion/pull/16660
## Rationale for this change `PiecewiseMergeJoin` is a nice pre cursor to the implementation of ASOF, inequality, etc. joins (multiple range predicates). `PiecewiseMergeJoin` is specialized for when there is only one range filter and can perform much faster in this case especially for semi, anti, mark joins. <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? `PiecewiseMergeJoin` implementation only, there is no `physical planner -> PiecewiseMergeJoinExec`. `ExecutionPlan` has been implemented for `PiecewiseMergeJoinExec` - Currently `compute_properties` and `swap_inputs` is not implemented - Builds execution plan for piecewise merge join exec `PiecewiseMergeJoinStream` has been implemented Examples have been provided for the `PiecewiseMergeJoinExec` and `PiecewiseMergeJoinStream` implementations. <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Benchmark Results The benchmarks were tested on a random batch of values (streamed side) against a sorted batch (buffered side). For the regular join tests i was using the larger side as the left side as it gave more performance, but this shouldn't make sense as the left side will need to be sorted. I will look into this on a follow up pull request which will be coupled with the side swapping. When compared to NestedLoopJoin the queries for classic joins (left, right, inner, full) were about 10x faster 🚀 - However, when larger batch sizes were equal, it performed slower than the Nested loop join. For existence joins (semi, anti), the join performed about 1000 x faster 🚀 - Just as a quick note to explain the ridiculous speedup, all we need to do instead of a cartesian product, is find the max/min value of the unsorted stream side, and do a O(n) scan of the sorted buffered side to find the first match and emit all rows after it. <details> <summary>Benchmark Results for normal joins</summary> ``` joins/PiecewiseMergeJoin/l=1000_r=1000 time: [345.69 µs 351.99 µs 361.11 µs] change: [-4.0041% -2.4315% -0.4405%] (p = 0.01 < 0.05) Change within noise threshold. Found 10 outliers among 100 measurements (10.00%) 7 (7.00%) high mild 3 (3.00%) high severe joins/NestedLoopJoin/l=1000_r=1000 time: [2.6237 ms 2.6518 ms 2.6870 ms] change: [-4.0439% +0.5217% +4.4183%] (p = 0.84 > 0.05) No change in performance detected. Found 15 outliers among 100 measurements (15.00%) 4 (4.00%) high mild 11 (11.00%) high severe Benchmarking joins/PiecewiseMergeJoin/l=10000_r=10000: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 52.4s, or reduce sample count to 10. joins/PiecewiseMergeJoin/l=10000_r=10000 time: [490.26 ms 501.24 ms 513.75 ms] change: [-14.807% -9.4227% -4.1141%] (p = 0.00 < 0.05) Performance has improved. Found 12 outliers among 100 measurements (12.00%) 8 (8.00%) high mild 4 (4.00%) high severe Benchmarking joins/NestedLoopJoin/l=10000_r=10000: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 32.9s, or reduce sample count to 10. joins/NestedLoopJoin/l=10000_r=10000 time: [325.74 ms 330.41 ms 335.76 ms] change: [-30.701% -25.545% -20.089%] (p = 0.00 < 0.05) Performance has improved. Found 9 outliers among 100 measurements (9.00%) 1 (1.00%) high mild 8 (8.00%) high severe joins/PiecewiseMergeJoin/l=100000_r=1000 time: [46.738 ms 47.037 ms 47.348 ms] change: [+6.8565% +7.8729% +8.8987%] (p = 0.00 < 0.05) Performance has regressed. Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) high mild Benchmarking joins/NestedLoopJoin/l=100000_r=1000: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 34.7s, or reduce sample count to 10. joins/NestedLoopJoin/l=100000_r=1000 time: [337.92 ms 355.00 ms 375.33 ms] change: [+3.4274% +8.8931% +15.219%] (p = 0.00 < 0.05) Performance has regressed. Found 17 outliers among 100 measurements (17.00%) 4 (4.00%) high mild 13 (13.00%) high severe joins/PiecewiseMergeJoin/l=10000_r=100 time: [353.07 µs 356.19 µs 359.16 µs] change: [-20.427% -19.045% -17.788%] (p = 0.00 < 0.05) Performance has improved. Found 3 outliers among 100 measurements (3.00%) 3 (3.00%) low mild joins/NestedLoopJoin/l=10000_r=100 time: [2.4624 ms 2.4690 ms 2.4759 ms] change: [-35.277% -26.644% -17.558%] (p = 0.00 < 0.05) Performance has improved. Found 2 outliers among 100 measurements (2.00%) 1 (1.00%) high mild 1 (1.00%) high severe joins/PiecewiseMergeJoin/l=1000000_r=100 time: [49.569 ms 49.788 ms 50.071 ms] change: [-11.268% -8.9464% -7.0861%] (p = 0.00 < 0.05) Performance has improved. Found 6 outliers among 100 measurements (6.00%) 4 (4.00%) high mild 2 (2.00%) high severe Benchmarking joins/NestedLoopJoin/l=1000000_r=100: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 32.6s, or reduce sample count to 10. joins/NestedLoopJoin/l=1000000_r=100 time: [318.73 ms 321.16 ms 324.12 ms] change: [-2.3069% -0.7454% +0.6191%] (p = 0.35 > 0.05) No change in performance detected. Found 3 outliers among 100 measurements (3.00%) 1 (1.00%) high mild 2 (2.00%) high severe ``` </details> <details> <summary>Benchmark Results for existence joins</summary> ``` joins/PiecewiseMergeJoin/l=1000_r=1000 time: [17.562 µs 17.856 µs 18.368 µs] change: [-95.034% -94.834% -94.578%] (p = 0.00 < 0.05) Performance has improved. Found 5 outliers among 100 measurements (5.00%) 5 (5.00%) high severe joins/NestedLoopJoin/l=1000_r=1000 time: [2.5747 ms 2.6143 ms 2.6718 ms] change: [-3.5382% -1.4140% +1.1788%] (p = 0.24 > 0.05) No change in performance detected. Found 2 outliers among 100 measurements (2.00%) 2 (2.00%) high severe joins/PiecewiseMergeJoin/l=10000_r=10000 time: [126.97 µs 130.34 µs 133.60 µs] change: [-99.975% -99.974% -99.973%] (p = 0.00 < 0.05) Performance has improved. Found 21 outliers among 100 measurements (21.00%) 16 (16.00%) low mild 5 (5.00%) high mild Benchmarking joins/NestedLoopJoin/l=10000_r=10000: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 32.1s, or reduce sample count to 10. joins/NestedLoopJoin/l=10000_r=10000 time: [324.45 ms 329.32 ms 335.64 ms] change: [-2.5195% -0.3276% +2.0344%] (p = 0.79 > 0.05) No change in performance detected. Found 8 outliers among 100 measurements (8.00%) 3 (3.00%) high mild 5 (5.00%) high severe Benchmarking joins/PiecewiseMergeJoin/l=100000_r=1000: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.7s, enable flat sampling, or reduce sample count to 60. joins/PiecewiseMergeJoin/l=1000_r=10000 time: [21.704 µs 21.851 µs 22.039 µs] change: [-99.951% -99.951% -99.951%] (p = 0.00 < 0.05) Performance has improved. Found 4 outliers among 100 measurements (4.00%) 1 (1.00%) low mild 3 (3.00%) high severe joins/NestedLoopJoin/l=1000_r=10000 time: [26.852 ms 27.166 ms 27.482 ms] change: [-34.057% -28.293% -22.241%] (p = 0.00 < 0.05) Performance has improved. joins/PiecewiseMergeJoin/l=100_r=100000 time: [74.249 µs 74.381 µs 74.516 µs] change: [-99.952% -99.952% -99.951%] (p = 0.00 < 0.05) Performance has improved. Found 8 outliers among 100 measurements (8.00%) 7 (7.00%) high mild 1 (1.00%) high severe joins/NestedLoopJoin/l=100_r=100000 time: [25.960 ms 26.343 ms 26.807 ms] change: [-1.0541% +0.8379% +2.8866%] (p = 0.41 > 0.05) No change in performance detected. Found 2 outliers among 100 measurements (2.00%) 2 (2.00%) high severe joins/PiecewiseMergeJoin/l=1000_r=100000 time: [82.470 µs 83.025 µs 83.761 µs] change: [-99.996% -99.996% -99.996%] (p = 0.00 < 0.05) Performance has improved. Found 12 outliers among 100 measurements (12.00%) 6 (6.00%) high mild 6 (6.00%) high severe Benchmarking joins/NestedLoopJoin/l=1000_r=100000: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 33.0s, or reduce sample count to 10. joins/NestedLoopJoin/l=1000_r=100000 time: [322.35 ms 323.86 ms 325.53 ms] change: [-33.068% -26.620% -19.778%] (p = 0.00 < 0.05) Performance has improved. Found 2 outliers among 100 measurements (2.00%) 1 (1.00%) high mild 1 (1.00%) high severe ``` </details> If you want to replicate: <details> <summary>Code</summary> Here’s the hidden content that you can put **Markdown** in, including lists, code blocks, images, etc. ```rust use std::sync::Arc; use arrow::array::{ ArrayRef, Date32Builder, Decimal128Builder, Int32Builder, RecordBatch, StringBuilder, }; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion}; use datafusion_common::{JoinSide, Result}; use datafusion_execution::TaskContext; use datafusion_expr::{JoinType, Operator}; use datafusion_physical_expr::expressions::{BinaryExpr, Column}; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter}; use datafusion_physical_plan::joins::{NestedLoopJoinExec, PiecewiseMergeJoinExec}; use datafusion_physical_plan::test::TestMemoryExec; use datafusion_physical_plan::{collect, ExecutionPlan}; use rand::{rng, Rng}; use tokio::runtime::Runtime; /// Creates a RecordBatch of `num_rows` with completely random values in [0, 100_000]. pub fn create_random_batch(num_rows: usize) -> RecordBatch { let schema = Arc::new(Schema::new(vec![ Field::new("c0", DataType::Int32, true), Field::new("c1", DataType::Utf8, true), Field::new("c2", DataType::Date32, true), Field::new("c3", DataType::Decimal128(11, 2), true), ])); let mut rng = rng(); let mut a = Int32Builder::new(); let mut b = StringBuilder::new(); let mut c = Date32Builder::new(); let mut d = Decimal128Builder::new() .with_precision_and_scale(11, 2) .unwrap(); for _ in 0..num_rows { let int_val = rng.random_range(0..=100_000); a.append_value(int_val); b.append_value(format!("string_{int_val}")); c.append_value(int_val); let dec_val = (rng.random_range(0..=100_000) as i128) * 100; d.append_value(dec_val); } let a = Arc::new(a.finish()) as ArrayRef; let b = Arc::new(b.finish()) as ArrayRef; let c = Arc::new(c.finish()) as ArrayRef; let d = Arc::new(d.finish()) as ArrayRef; RecordBatch::try_new(schema.clone(), vec![a, b, c, d]).unwrap() } pub fn create_sorted_batch(num_rows: usize, max_increment: i32) -> RecordBatch { let schema = Arc::new(Schema::new(vec![ Field::new("c0", DataType::Int32, true), Field::new("c1", DataType::Utf8, true), Field::new("c2", DataType::Date32, true), Field::new("c3", DataType::Decimal128(11, 2), true), ])); let mut rng = rng(); let mut a = Int32Builder::new(); let mut b = StringBuilder::new(); let mut c = Date32Builder::new(); let mut d = Decimal128Builder::new() .with_precision_and_scale(11, 2) .unwrap(); let mut current = rng.random_range(0..=max_increment); for _ in 0..num_rows { let inc = rng.random_range(0..=max_increment); current = current.saturating_add(inc); a.append_value(current); b.append_value(format!("string_{current}")); c.append_value(current); d.append_value((current as i128) * 100); } let a = Arc::new(a.finish()) as ArrayRef; let b = Arc::new(b.finish()) as ArrayRef; let c = Arc::new(c.finish()) as ArrayRef; let d = Arc::new(d.finish()) as ArrayRef; RecordBatch::try_new(schema.clone(), vec![a, b, c, d]).unwrap() } fn make_memory_execs( left_rows: usize, right_rows: usize, ) -> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>, SchemaRef) { let left_batch = create_random_batch(left_rows); let schema = left_batch.schema(); let left_partitions = vec![vec![left_batch]]; let right_batch = create_sorted_batch(right_rows, 10); let right_partitions = vec![vec![right_batch]]; let left_mem = TestMemoryExec::try_new_exec(&left_partitions, schema.clone(), None).unwrap(); let right_mem = TestMemoryExec::try_new_exec(&right_partitions, schema.clone(), None).unwrap(); (left_mem, right_mem, schema) } fn build_two_joins( left: Arc<dyn ExecutionPlan>, right: Arc<dyn ExecutionPlan>, ) -> Result<( Arc<dyn ExecutionPlan>, // pwmj Arc<dyn ExecutionPlan>, // nlj )> { let left_on: Arc<dyn PhysicalExpr> = Arc::new( Column::new_with_schema("c0", &left.schema()) .expect("left schema must contain 'c0'"), ); let right_on: Arc<dyn PhysicalExpr> = Arc::new( Column::new_with_schema("c0", &right.schema()) .expect("right schema must contain 'c0'"), ); let hj = PiecewiseMergeJoinExec::try_new( left.clone(), right.clone(), (left_on.clone(), right_on.clone()), Operator::Lt, JoinType::Left, )?; let filter_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new( left_on.clone(), Operator::Lt, right_on.clone(), )); let column_indices = vec![ ColumnIndex { index: 0, side: JoinSide::Left, }, ColumnIndex { index: 0, side: JoinSide::Right, }, ]; let intermediate_schema = Arc::new(Schema::new(vec![ Field::new("c0_left", DataType::Int32, false), Field::new("c0_right", DataType::Int32, false), ])); let join_filter = JoinFilter::new(filter_expr, column_indices, intermediate_schema); // :contentReference[oaicite let nlj = NestedLoopJoinExec::try_new( left, right, Some(join_filter), &JoinType::Left, None, )?; Ok((Arc::new(hj), Arc::new(nlj))) } fn bench_joins(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let mut group = c.benchmark_group("joins"); // row pairs for each side in benchmarks let size_pairs = &[ (1000, 1000), (10000, 10000), (100000, 1000), (10000, 100), (1000000, 100), (1000, 10000), (100, 100000), (1000, 100000), ]; for &(left_rows, right_rows) in size_pairs.iter() { let (left_mem, right_mem, _schema) = make_memory_execs(left_rows, right_rows); let (pwmj_join, nested_loop_join) = build_two_joins(left_mem.clone(), right_mem.clone()).unwrap(); group.bench_with_input( BenchmarkId::new( "PiecewiseMergeJoin", format!("l={}_r={}", left_rows, right_rows), ), &pwmj_join, |b, plan| { b.iter_batched( || (), |_setup| { let ctx = TaskContext::default(); let fut = collect(plan.clone(), Arc::new(ctx)); rt.block_on(async { let _ = fut.await.unwrap(); }); }, BatchSize::SmallInput, ) }, ); group.bench_with_input( BenchmarkId::new( "NestedLoopJoin", format!("l={}_r={}", left_rows, right_rows), ), &nested_loop_join, |b, plan| { b.iter_batched( || (), |_setup| { let ctx = TaskContext::default(); let fut = collect(plan.clone(), Arc::new(ctx)); rt.block_on(async { let _ = fut.await.unwrap(); }); }, BatchSize::SmallInput, ) }, ); } group.finish(); } criterion_group!(benches, bench_joins); criterion_main!(benches); ``` </details> ## Next Steps Pull request was getting large, here are the following steps for this: - Serialization - Mark join support - swap support + looking into the bottlenecks for why the performance is so bad when left side is larger - enforce sorting + physical planner - fuzz tests - Implement metrics + memory reservation for rest of the join ## Are these changes tested? Yes unit tests <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org