jonathanc-n commented on PR #16210: URL: https://github.com/apache/datafusion/pull/16210#issuecomment-2945598547
@2010YOUY01 I was doing some benchmarks on NLJs vs. HJs and it looked bad even for cases where one table is very small which is what NLJs should excel at. One thing is to note that it does seem slightly faster when both tables are very small, especially in the case when the outer table is the larger table (makes sense due to hashing the build side, I should probably test this with SMJ cases). Duckdb has something interesting where if one of the tables have a certain row threshold (5), they choose to use a nested loop join for `on.is_some()` cases: [threshold](https://github.com/duckdb/duckdb/blob/a8a377580cc5d26ae90f16b41affad24ac6ee833/src/include/duckdb/main/client_config.hpp#L102) (we could probably do this as well when performance is better). ### Benchmarks Here is some experiment benchmarks that I had run if curious (one thing to note is that these benchmarks are just simulated on created row batches and are not read from disk + they are called directly from physical plan and not passed into the optimizer -- the batch sides would have been swapped): <details> <summary>Benchmark</summary> Benchmarking joins/HashJoin/l=5_r=10165536: Collecting 100 samples in estimated 7.3643 s (300 ite joins/HashJoin/l=5_r=10165536 time: [24.433 ms 24.499 ms 24.567 ms] change: [-26.227% -21.563% -17.138%] (p = 0.00 < 0.05) Performance has improved. Found 5 outliers among 100 measurements (5.00%) 5 (5.00%) high mild Benchmarking joins/NestedLoopJoin/l=5_r=10165536: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 40.0s, or reduce sample count to 10. Benchmarking joins/NestedLoopJoin/l=5_r=10165536: Collecting 100 samples in estimated 40.016 s (1 joins/NestedLoopJoin/l=5_r=10165536 time: [399.05 ms 400.87 ms 403.07 ms] change: [+0.2540% +0.7221% +1.3508%] (p = 0.00 < 0.05) Change within noise threshold. Found 10 outliers among 100 measurements (10.00%) 3 (3.00%) high mild 7 (7.00%) high severe Benchmarking joins/HashJoin/l=5_r=1165536: Collecting 100 samples in estimated 5.0385 s (2000 ite joins/HashJoin/l=5_r=1165536 time: [2.4954 ms 2.5147 ms 2.5488 ms] change: [+0.3564% +1.1539% +2.5753%] (p = 0.03 < 0.05) Change within noise threshold. Found 6 outliers among 100 measurements (6.00%) 2 (2.00%) high mild 4 (4.00%) high severe Benchmarking joins/NestedLoopJoin/l=5_r=1165536: Collecting 100 samples in estimated 8.8061 s (20 joins/NestedLoopJoin/l=5_r=1165536 time: [43.398 ms 43.540 ms 43.721 ms] change: [-3.4531% -2.3846% -1.4403%] (p = 0.00 < 0.05) Performance has improved. Found 8 outliers among 100 measurements (8.00%) 4 (4.00%) high mild 4 (4.00%) high severe Benchmarking joins/HashJoin/l=5_r=536: Collecting 100 samples in estimated 5.0472 s (510k iterati joins/HashJoin/l=5_r=536 time: [9.9513 µs 10.019 µs 10.090 µs] change: [-0.8239% -0.0294% +0.8130%] (p = 0.95 > 0.05) No change in performance detected. Found 2 outliers among 100 measurements (2.00%) 1 (1.00%) high mild 1 (1.00%) high severe Benchmarking joins/NestedLoopJoin/l=5_r=536: Collecting 100 samples in estimated 5.0749 s (202k i joins/NestedLoopJoin/l=5_r=536 time: [25.074 µs 25.114 µs 25.160 µs] change: [-79.602% -79.115% -78.699%] (p = 0.00 < 0.05) Performance has improved. Found 4 outliers among 100 measurements (4.00%) 1 (1.00%) high mild 3 (3.00%) high severe Benchmarking joins/HashJoin/l=5_r=36: Collecting 100 samples in estimated 5.0396 s (556k iteratio joins/HashJoin/l=5_r=36 time: [9.1466 µs 9.2176 µs 9.2864 µs] change: [+0.5504% +2.4160% +5.0403%] (p = 0.02 < 0.05) Change within noise threshold. Found 3 outliers among 100 measurements (3.00%) 1 (1.00%) high mild 2 (2.00%) high severe Benchmarking joins/NestedLoopJoin/l=5_r=36: Collecting 100 samples in estimated 5.0394 s (641k it joins/NestedLoopJoin/l=5_r=36 time: [7.7842 µs 7.8208 µs 7.8561 µs] change: [-92.509% -92.452% -92.375%] (p = 0.00 < 0.05) Performance has improved. Found 4 outliers among 100 measurements (4.00%) 1 (1.00%) low mild 2 (2.00%) high mild 1 (1.00%) high severe Benchmarking joins/HashJoin/l=1_r=10165536: Collecting 100 samples in estimated 6.5938 s (300 ite joins/HashJoin/l=1_r=10165536 time: [21.778 ms 21.825 ms 21.875 ms] change: [-0.7761% -0.2678% +0.1717%] (p = 0.28 > 0.05) No change in performance detected. Found 4 outliers among 100 measurements (4.00%) 3 (3.00%) high mild 1 (1.00%) high severe Benchmarking joins/NestedLoopJoin/l=1_r=10165536: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 7.7s, or reduce sample count to 60. Benchmarking joins/NestedLoopJoin/l=1_r=10165536: Collecting 100 samples in estimated 7.7104 s (1 joins/NestedLoopJoin/l=1_r=10165536 time: [76.483 ms 76.543 ms 76.607 ms] change: [-0.8023% -0.6128% -0.4446%] (p = 0.00 < 0.05) Change within noise threshold. Found 3 outliers among 100 measurements (3.00%) 3 (3.00%) high mild Benchmarking joins/HashJoin/l=1_r=1165536: Collecting 100 samples in estimated 5.0671 s (2300 ite joins/HashJoin/l=1_r=1165536 time: [2.2005 ms 2.2029 ms 2.2057 ms] change: [+0.4246% +0.5813% +0.7394%] (p = 0.00 < 0.05) Change within noise threshold. Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) high severe Benchmarking joins/NestedLoopJoin/l=1_r=1165536: Collecting 100 samples in estimated 5.8078 s (70 joins/NestedLoopJoin/l=1_r=1165536 time: [8.2899 ms 8.3103 ms 8.3466 ms] change: [-7.8900% -5.3165% -3.4247%] (p = 0.00 < 0.05) Performance has improved. Found 4 outliers among 100 measurements (4.00%) 1 (1.00%) high mild 3 (3.00%) high severe Benchmarking joins/HashJoin/l=1_r=536: Collecting 100 samples in estimated 5.0210 s (545k iterati joins/HashJoin/l=1_r=536 time: [9.2227 µs 9.2730 µs 9.3264 µs] change: [-0.9104% -0.0683% +0.6914%] (p = 0.86 > 0.05) No change in performance detected. Benchmarking joins/NestedLoopJoin/l=1_r=536: Collecting 100 samples in estimated 5.0185 s (515k i joins/NestedLoopJoin/l=1_r=536 time: [9.6997 µs 9.7260 µs 9.7531 µs] change: [-91.393% -91.304% -91.222%] (p = 0.00 < 0.05) Performance has improved. Found 2 outliers among 100 measurements (2.00%) 1 (1.00%) high mild 1 (1.00%) high severe Benchmarking joins/HashJoin/l=1_r=36: Collecting 100 samples in estimated 5.0175 s (606k iteratio joins/HashJoin/l=1_r=36 time: [8.3172 µs 8.3724 µs 8.4360 µs] change: [-4.5295% -2.7882% -1.4567%] (p = 0.00 < 0.05) Performance has improved. Found 2 outliers among 100 measurements (2.00%) 1 (1.00%) high mild 1 (1.00%) high severe Benchmarking joins/NestedLoopJoin/l=1_r=36: Collecting 100 samples in estimated 5.0127 s (803k it joins/NestedLoopJoin/l=1_r=36 time: [6.1413 µs 6.1649 µs 6.1893 µs] change: [-95.940% -95.689% -95.431%] (p = 0.00 < 0.05) Performance has improved. Found 2 outliers among 100 measurements (2.00%) 1 (1.00%) low mild 1 (1.00%) high mild Benchmarking joins/HashJoin/l=10165536_r=5: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 14.2s, or reduce sample count to 30. Benchmarking joins/HashJoin/l=10165536_r=5: Collecting 100 samples in estimated 14.174 s (100 ite joins/HashJoin/l=10165536_r=5 time: [139.17 ms 141.39 ms 144.00 ms] change: [-9.4179% -7.2756% -4.9624%] (p = 0.00 < 0.05) Performance has improved. Found 11 outliers among 100 measurements (11.00%) 7 (7.00%) high mild 4 (4.00%) high severe Benchmarking joins/NestedLoopJoin/l=10165536_r=5: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 39.7s, or reduce sample count to 10. Benchmarking joins/NestedLoopJoin/l=10165536_r=5: Collecting 100 samples in estimated 39.689 s (1 joins/NestedLoopJoin/l=10165536_r=5 time: [397.64 ms 399.12 ms 400.92 ms] change: [-2.0605% -0.9166% -0.0093%] (p = 0.08 > 0.05) No change in performance detected. Found 10 outliers among 100 measurements (10.00%) 2 (2.00%) high mild 8 (8.00%) high severe Benchmarking joins/HashJoin/l=1165536_r=5: Collecting 100 samples in estimated 5.3484 s (600 iter joins/HashJoin/l=1165536_r=5 time: [8.5434 ms 8.7560 ms 9.0212 ms] change: [-5.8975% -3.1249% -0.3413%] (p = 0.04 < 0.05) Change within noise threshold. Found 7 outliers among 100 measurements (7.00%) 3 (3.00%) high mild 4 (4.00%) high severe Benchmarking joins/NestedLoopJoin/l=1165536_r=5: Collecting 100 samples in estimated 8.5975 s (20 joins/NestedLoopJoin/l=1165536_r=5 time: [43.161 ms 43.425 ms 43.768 ms] change: [-0.8075% -0.2119% +0.5729%] (p = 0.60 > 0.05) No change in performance detected. Found 8 outliers among 100 measurements (8.00%) 5 (5.00%) high mild 3 (3.00%) high severe Benchmarking joins/HashJoin/l=536_r=5: Collecting 100 samples in estimated 5.0214 s (460k iterati joins/HashJoin/l=536_r=5 time: [11.736 µs 12.520 µs 13.378 µs] change: [+7.4909% +12.306% +17.627%] (p = 0.00 < 0.05) Performance has regressed. Found 7 outliers among 100 measurements (7.00%) 5 (5.00%) high mild 2 (2.00%) high severe Benchmarking joins/NestedLoopJoin/l=536_r=5: Collecting 100 samples in estimated 5.0127 s (197k i joins/NestedLoopJoin/l=536_r=5 time: [25.173 µs 25.299 µs 25.448 µs] change: [-1.5034% -0.4593% +0.3076%] (p = 0.38 > 0.05) No change in performance detected. Found 12 outliers among 100 measurements (12.00%) 2 (2.00%) low mild 1 (1.00%) high mild 9 (9.00%) high severe Benchmarking joins/HashJoin/l=36_r=5: Collecting 100 samples in estimated 5.0086 s (561k iteratio joins/HashJoin/l=36_r=5 time: [9.0865 µs 9.1404 µs 9.2022 µs] change: [+0.5672% +2.5431% +3.9872%] (p = 0.00 < 0.05) Change within noise threshold. Found 4 outliers among 100 measurements (4.00%) 4 (4.00%) high mild Benchmarking joins/NestedLoopJoin/l=36_r=5: Collecting 100 samples in estimated 5.0098 s (667k it joins/NestedLoopJoin/l=36_r=5 time: [7.9047 µs 7.9530 µs 7.9958 µs] change: [+1.7475% +2.8065% +3.8190%] (p = 0.00 < 0.05) Performance has regressed. Found 2 outliers among 100 measurements (2.00%) 1 (1.00%) low mild 1 (1.00%) high severe Benchmarking joins/HashJoin/l=10165536_r=1: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 16.4s, or reduce sample count to 30. Benchmarking joins/HashJoin/l=10165536_r=1: Collecting 100 samples in estimated 16.431 s (100 ite joins/HashJoin/l=10165536_r=1 time: [170.17 ms 181.86 ms 194.52 ms] change: [+17.971% +27.731% +36.798%] (p = 0.00 < 0.05) Performance has regressed. Found 9 outliers among 100 measurements (9.00%) 7 (7.00%) high mild 2 (2.00%) high severe Benchmarking joins/NestedLoopJoin/l=10165536_r=1: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 7.8s, or reduce sample count to 60. Benchmarking joins/NestedLoopJoin/l=10165536_r=1: Collecting 100 samples in estimated 7.8403 s (1 joins/NestedLoopJoin/l=10165536_r=1 time: [80.408 ms 81.251 ms 82.168 ms] change: [+5.9184% +6.9256% +8.1615%] (p = 0.00 < 0.05) Performance has regressed. Found 8 outliers among 100 measurements (8.00%) 5 (5.00%) high mild 3 (3.00%) high severe Benchmarking joins/HashJoin/l=1165536_r=1: Collecting 100 samples in estimated 5.6164 s (600 iter joins/HashJoin/l=1165536_r=1 time: [10.917 ms 11.369 ms 11.918 ms] change: [+24.789% +29.850% +37.372%] (p = 0.00 < 0.05) Performance has regressed. Found 6 outliers among 100 measurements (6.00%) 4 (4.00%) high mild 2 (2.00%) high severe Benchmarking joins/NestedLoopJoin/l=1165536_r=1: Collecting 100 samples in estimated 5.0288 s (50 joins/NestedLoopJoin/l=1165536_r=1 time: [9.6901 ms 9.9011 ms 10.182 ms] change: [+13.021% +15.449% +18.574%] (p = 0.00 < 0.05) Performance has regressed. Found 12 outliers among 100 measurements (12.00%) 8 (8.00%) high mild 4 (4.00%) high severe Benchmarking joins/HashJoin/l=536_r=1: Collecting 100 samples in estimated 5.0044 s (480k iterati joins/HashJoin/l=536_r=1 time: [10.219 µs 10.340 µs 10.447 µs] change: [-3.4348% -2.5041% -1.5280%] (p = 0.00 < 0.05) Performance has improved. Benchmarking joins/NestedLoopJoin/l=536_r=1: Collecting 100 samples in estimated 5.0492 s (510k i joins/NestedLoopJoin/l=536_r=1 time: [9.8366 µs 9.8801 µs 9.9240 µs] change: [+0.4580% +1.1528% +2.0755%] (p = 0.00 < 0.05) Change within noise threshold. Found 3 outliers among 100 measurements (3.00%) 2 (2.00%) low mild 1 (1.00%) high severe Benchmarking joins/HashJoin/l=36_r=1: Collecting 100 samples in estimated 5.0406 s (596k iteratio joins/HashJoin/l=36_r=1 time: [8.9515 µs 9.4730 µs 10.132 µs] change: [-0.6109% +4.0642% +10.933%] (p = 0.18 > 0.05) No change in performance detected. Found 10 outliers among 100 measurements (10.00%) 2 (2.00%) high mild 8 (8.00%) high severe Benchmarking joins/NestedLoopJoin/l=36_r=1: Collecting 100 samples in estimated 5.0112 s (798k it joins/NestedLoopJoin/l=36_r=1 time: [6.2231 µs 6.3843 µs 6.6209 µs] change: [-1.2056% +0.5125% +2.7669%] (p = 0.63 > 0.05) No change in performance detected. Found 6 outliers among 100 measurements (6.00%) 2 (2.00%) high mild 4 (4.00%) high severe </details> The tests seem to be a bit inconsistent when I run locally so if someone would like to run it on their computer, here is the code (make a file in `datafusion/physical_plan/benches`): <details> <summary>Code</summary> ``` use std::sync::Arc; use arrow::array::{Date32Builder, Decimal128Builder, Int32Builder, RecordBatch, StringBuilder}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion}; use datafusion_common::{Result}; use datafusion_execution::TaskContext; use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::Column; use datafusion_physical_plan::{collect, ExecutionPlan}; use tokio::runtime::Runtime; use datafusion_physical_plan::joins::{HashJoinExec, NestedLoopJoinExec, PartitionMode}; use datafusion_physical_plan::test::TestMemoryExec; // pub fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch { let schema = Arc::new(Schema::new(vec![ Field::new("c0", DataType::Int32, true), Field::new("c1", DataType::Utf8, true), Field::new("c2", DataType::Date32, true), Field::new("c3", DataType::Decimal128(11, 2), true), ])); let mut a = Int32Builder::new(); let mut b = StringBuilder::new(); let mut c = Date32Builder::new(); let mut d = Decimal128Builder::new().with_precision_and_scale(11, 2).unwrap(); for i in 0..num_rows { a.append_value(i as i32); c.append_value(i as i32); d.append_value((i as i128) * 1_000_000); if allow_nulls && i % 10 == 0 { b.append_null(); } else { b.append_value(format!("string_{i}")); } } let a = Arc::new(a.finish()); let b = Arc::new(b.finish()); let c = Arc::new(c.finish()); let d = Arc::new(d.finish()); RecordBatch::try_new( schema.clone(), vec![a as _, b as _, c as _, d as _], ) .unwrap() } fn make_memory_execs( left_rows: usize, right_rows: usize, ) -> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>, SchemaRef) { let left_batch = create_batch(left_rows, true); let schema = left_batch.schema(); let left_partitions = vec![vec![left_batch]]; let right_batch = create_batch(right_rows, true); let right_partitions = vec![vec![right_batch]]; let left_mem = TestMemoryExec::try_new_exec(&left_partitions, schema.clone(), None).unwrap(); let right_mem = TestMemoryExec::try_new_exec(&right_partitions, schema.clone(), None).unwrap(); (left_mem, right_mem, schema) } fn build_two_joins( left: Arc<dyn ExecutionPlan>, right: Arc<dyn ExecutionPlan>, ) -> Result<( Arc<dyn ExecutionPlan>, // hj Arc<dyn ExecutionPlan>, // nlj )> { let hj = HashJoinExec::try_new( left.clone(), right.clone(), vec![(Arc::new(Column::new("c0", 0)), Arc::new(Column::new("c0", 0)))], None, &JoinType::Inner, None, PartitionMode::Partitioned, false, // null_aware = false )?; let nlj = NestedLoopJoinExec::try_new( left, right, None, &JoinType::Inner, None, false, // null_aware = false Some(vec![(Arc::new(Column::new("c0", 0)), Arc::new(Column::new("c0", 0)))]), )?; Ok((Arc::new(hj), Arc::new(nlj))) } fn bench_joins(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let mut group = c.benchmark_group("joins"); // row pairs for each side in benchmarks let size_pairs = &[ (5, 10_165_536), (5, 1_165_536), (5, 536), (5, 36), (1, 10_165_536), (1, 1_165_536), (1, 536), (1, 36), (10_165_536, 5), (1_165_536, 5), (536, 5), (36, 5), (10_165_536, 1), (1_165_536, 1), (536, 1), (36, 1), ]; for &(left_rows, right_rows) in size_pairs.iter() { let (left_mem, right_mem, _schema) = make_memory_execs(left_rows, right_rows); let (hash_join, nested_loop_join) = build_two_joins(left_mem.clone(), right_mem.clone()).unwrap(); group.bench_with_input( BenchmarkId::new( "HashJoin", format!("l={}_r={}", left_rows, right_rows), ), &hash_join, |b, plan| { b.iter_batched( || (), |_setup| { let ctx = TaskContext::default(); let fut = collect(plan.clone(), Arc::new(ctx)); rt.block_on(async { let _ = fut.await.unwrap(); }); }, BatchSize::SmallInput, ) }, ); group.bench_with_input( BenchmarkId::new( "NestedLoopJoin", format!("l={}_r={}", left_rows, right_rows), ), &nested_loop_join, |b, plan| { b.iter_batched( || (), |_setup| { let ctx = TaskContext::default(); let fut = collect(plan.clone(), Arc::new(ctx)); rt.block_on(async { let _ = fut.await.unwrap(); }); }, BatchSize::SmallInput, ) }, ); } group.finish(); } criterion_group!(benches, bench_joins); criterion_main!(benches); ``` </details> I think this code should be run with different batches though. ### Consensus I think we can either add this to Datafusion more as an feature users can experiment with (users can pass a flag and experiment -- `datafusion.optimizer.use_nested_loop_join`) -> then we refactor for performance for NLJ equijoin cases. This seems to be fine as the current implementation doesnt affect existing performance but I'm not sure if Datafusion supports features that are purely for user experiments (@alamb would you be willing to fact check me on this?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org