This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ac434d0e5 Add case-heavy LEFT JOIN benchmark and debug timing/logging 
for PushDownFilter hot paths (#20664)
0ac434d0e5 is described below

commit 0ac434d0e5207b1e2d6fcc9c04b5a4de3b13bec8
Author: kosiew <[email protected]>
AuthorDate: Sat Mar 7 12:10:15 2026 +0800

    Add case-heavy LEFT JOIN benchmark and debug timing/logging for 
PushDownFilter hot paths (#20664)
    
    ## Which issue does this PR close?
    
    * Part of #20002.
    
    ## Rationale for this change
    
    The `PushDownFilter` optimizer rule shows a severe planner-time
    performance pathology in the `sql_planner_extended` benchmark, where
    profiling indicates it dominates total planning CPU time and repeatedly
    recomputes expression types.
    
    This PR adds a deterministic, CASE-heavy LEFT JOIN benchmark to reliably
    reproduce the worst-case behavior and introduces lightweight debug-only
    timing + counters inside `push_down_filter` to make it easier to
    pinpoint expensive sub-sections (e.g. predicate simplification and join
    predicate inference) during profiling.
    
    ## What changes are included in this PR?
    
    * **Benchmark: add a deterministic CASE-heavy LEFT JOIN workload**
    
    * Adds `build_case_heavy_left_join_query` and helpers to construct a
    CASE-nested predicate chain over a `LEFT JOIN`.
    * Adds a new benchmark `logical_plan_optimize_case_heavy_left_join` to
    stress planning/optimization time.
    * Adds an A/B benchmark group `push_down_filter_case_heavy_left_join_ab`
    that sweeps predicate counts and CASE depth, comparing:
    
        * default optimizer with `push_down_filter` enabled
        * optimizer with `push_down_filter` removed
    
    * **Optimizer instrumentation (debug-only)**
    
    * Adds a small `with_debug_timing` helper gated by `log_enabled!(Debug)`
    to record microsecond timings for specific sections.
      * Instruments and logs:
    
        * time spent in `infer_join_predicates`
        * time spent in `simplify_predicates`
    * counts of parent predicates, `on_filters`, inferred join predicates
        * before/after predicate counts for simplification
    
    ## Are these changes tested?
    
    * No new unit/integration tests were added because this PR is focused on
    **benchmarking and debug-only instrumentation** rather than changing
    optimizer semantics.
    * Coverage is provided by:
    
      * compiling/running the `sql_planner_extended` benchmark
    * validating both benchmark variants (with/without `push_down_filter`)
    produce optimized plans without errors
    * enabling `RUST_LOG=debug` to confirm timing sections and counters emit
    as expected
    
    ## Are there any user-facing changes?
    
    * No user-facing behavior changes.
    * The optimizer logic is unchanged; only **debug logging** is added
    (emits only when `RUST_LOG` enables Debug for the relevant modules).
    * Benchmark suite additions only affect developers running benches.
    
    ## LLM-generated code disclosure
    
    This PR includes LLM-generated code and comments. All LLM-generated
    content has been manually reviewed and tested.
---
 datafusion/core/benches/sql_planner_extended.rs | 239 +++++++++++++++++++++++-
 datafusion/optimizer/src/push_down_filter.rs    |  43 ++++-
 2 files changed, 276 insertions(+), 6 deletions(-)

diff --git a/datafusion/core/benches/sql_planner_extended.rs 
b/datafusion/core/benches/sql_planner_extended.rs
index adaf3e5911..d4955313c7 100644
--- a/datafusion/core/benches/sql_planner_extended.rs
+++ b/datafusion/core/benches/sql_planner_extended.rs
@@ -18,7 +18,7 @@
 use arrow::array::{ArrayRef, RecordBatch};
 use arrow_schema::DataType;
 use arrow_schema::TimeUnit::Nanosecond;
-use criterion::{Criterion, criterion_group, criterion_main};
+use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
 use datafusion::prelude::{DataFrame, SessionContext};
 use datafusion_catalog::MemTable;
 use datafusion_common::ScalarValue;
@@ -27,6 +27,7 @@ use datafusion_expr::{cast, col, lit, not, try_cast, when};
 use datafusion_functions::expr_fn::{
     btrim, length, regexp_like, regexp_replace, to_timestamp, upper,
 };
+use std::fmt::Write;
 use std::hint::black_box;
 use std::ops::Rem;
 use std::sync::Arc;
@@ -212,14 +213,127 @@ fn build_test_data_frame(ctx: &SessionContext, rt: 
&Runtime) -> DataFrame {
     })
 }
 
-fn criterion_benchmark(c: &mut Criterion) {
+/// Build a CASE-heavy dataframe over a non-inner join to stress
+/// planner-time filter pushdown and nullability/type inference.
+fn build_case_heavy_left_join_df(ctx: &SessionContext, rt: &Runtime) -> 
DataFrame {
+    register_string_table(ctx, 100, 1000);
+    let query = build_case_heavy_left_join_query(30, 1);
+    rt.block_on(async { ctx.sql(&query).await.unwrap() })
+}
+
+fn build_case_heavy_left_join_query(predicate_count: usize, case_depth: usize) 
-> String {
+    let mut query = String::from(
+        "SELECT l.c0, r.c0 AS rc0 FROM t l LEFT JOIN t r ON l.c0 = r.c0 WHERE 
",
+    );
+
+    if predicate_count == 0 {
+        query.push_str("TRUE");
+        return query;
+    }
+
+    // Keep this deterministic so comparisons between profiles are stable.
+    for i in 0..predicate_count {
+        if i > 0 {
+            query.push_str(" AND ");
+        }
+
+        let mut expr = format!("length(l.c{})", i % 20);
+        for depth in 0..case_depth {
+            let left_col = (i + depth + 1) % 20;
+            let right_col = (i + depth + 2) % 20;
+            expr = format!(
+                "CASE WHEN l.c{left_col} IS NOT NULL THEN {expr} ELSE 
length(r.c{right_col}) END"
+            );
+        }
+
+        let _ = write!(&mut query, "{expr} > 2");
+    }
+
+    query
+}
+
+fn build_case_heavy_left_join_df_with_push_down_filter(
+    rt: &Runtime,
+    predicate_count: usize,
+    case_depth: usize,
+    push_down_filter_enabled: bool,
+) -> DataFrame {
+    let ctx = SessionContext::new();
+    register_string_table(&ctx, 100, 1000);
+    if !push_down_filter_enabled {
+        let removed = ctx.remove_optimizer_rule("push_down_filter");
+        assert!(
+            removed,
+            "push_down_filter rule should be present in the default optimizer"
+        );
+    }
+
+    let query = build_case_heavy_left_join_query(predicate_count, case_depth);
+    rt.block_on(async { ctx.sql(&query).await.unwrap() })
+}
+
+fn build_non_case_left_join_query(
+    predicate_count: usize,
+    nesting_depth: usize,
+) -> String {
+    let mut query = String::from(
+        "SELECT l.c0, r.c0 AS rc0 FROM t l LEFT JOIN t r ON l.c0 = r.c0 WHERE 
",
+    );
+
+    if predicate_count == 0 {
+        query.push_str("TRUE");
+        return query;
+    }
+
+    // Keep this deterministic so comparisons between profiles are stable.
+    for i in 0..predicate_count {
+        if i > 0 {
+            query.push_str(" AND ");
+        }
+
+        let left_col = i % 20;
+        let mut expr = format!("l.c{left_col}");
+        for depth in 0..nesting_depth {
+            let right_col = (i + depth + 1) % 20;
+            expr = format!("coalesce({expr}, r.c{right_col})");
+        }
+
+        let _ = write!(&mut query, "length({expr}) > 2");
+    }
+
+    query
+}
+
+fn build_non_case_left_join_df_with_push_down_filter(
+    rt: &Runtime,
+    predicate_count: usize,
+    nesting_depth: usize,
+    push_down_filter_enabled: bool,
+) -> DataFrame {
     let ctx = SessionContext::new();
+    register_string_table(&ctx, 100, 1000);
+    if !push_down_filter_enabled {
+        let removed = ctx.remove_optimizer_rule("push_down_filter");
+        assert!(
+            removed,
+            "push_down_filter rule should be present in the default optimizer"
+        );
+    }
+
+    let query = build_non_case_left_join_query(predicate_count, nesting_depth);
+    rt.block_on(async { ctx.sql(&query).await.unwrap() })
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    let baseline_ctx = SessionContext::new();
+    let case_heavy_ctx = SessionContext::new();
     let rt = Runtime::new().unwrap();
 
     // validate logical plan optimize performance
     // https://github.com/apache/datafusion/issues/17261
 
-    let df = build_test_data_frame(&ctx, &rt);
+    let df = build_test_data_frame(&baseline_ctx, &rt);
+    let case_heavy_left_join_df = 
build_case_heavy_left_join_df(&case_heavy_ctx, &rt);
 
     c.bench_function("logical_plan_optimize", |b| {
         b.iter(|| {
@@ -227,6 +341,125 @@ fn criterion_benchmark(c: &mut Criterion) {
             black_box(rt.block_on(async { 
df_clone.into_optimized_plan().unwrap() }));
         })
     });
+
+    c.bench_function("logical_plan_optimize_hotspot_case_heavy_left_join", |b| 
{
+        b.iter(|| {
+            let df_clone = case_heavy_left_join_df.clone();
+            black_box(rt.block_on(async { 
df_clone.into_optimized_plan().unwrap() }));
+        })
+    });
+
+    let predicate_sweep = [10, 20, 30, 40, 60];
+    let case_depth_sweep = [1, 2, 3];
+
+    let mut hotspot_group =
+        c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab");
+    for case_depth in case_depth_sweep {
+        for predicate_count in predicate_sweep {
+            let with_push_down_filter =
+                build_case_heavy_left_join_df_with_push_down_filter(
+                    &rt,
+                    predicate_count,
+                    case_depth,
+                    true,
+                );
+            let without_push_down_filter =
+                build_case_heavy_left_join_df_with_push_down_filter(
+                    &rt,
+                    predicate_count,
+                    case_depth,
+                    false,
+                );
+
+            let input_label =
+                
format!("predicates={predicate_count},case_depth={case_depth}");
+            // A/B interpretation:
+            // - with_push_down_filter: default optimizer path (rule enabled)
+            // - without_push_down_filter: control path with the rule removed
+            // Compare both IDs at the same sweep point to isolate rule impact.
+            hotspot_group.bench_with_input(
+                BenchmarkId::new("with_push_down_filter", &input_label),
+                &with_push_down_filter,
+                |b, df| {
+                    b.iter(|| {
+                        let df_clone = df.clone();
+                        black_box(
+                            rt.block_on(async {
+                                df_clone.into_optimized_plan().unwrap()
+                            }),
+                        );
+                    })
+                },
+            );
+            hotspot_group.bench_with_input(
+                BenchmarkId::new("without_push_down_filter", &input_label),
+                &without_push_down_filter,
+                |b, df| {
+                    b.iter(|| {
+                        let df_clone = df.clone();
+                        black_box(
+                            rt.block_on(async {
+                                df_clone.into_optimized_plan().unwrap()
+                            }),
+                        );
+                    })
+                },
+            );
+        }
+    }
+    hotspot_group.finish();
+
+    let mut control_group =
+        c.benchmark_group("push_down_filter_control_non_case_left_join_ab");
+    for nesting_depth in case_depth_sweep {
+        for predicate_count in predicate_sweep {
+            let with_push_down_filter = 
build_non_case_left_join_df_with_push_down_filter(
+                &rt,
+                predicate_count,
+                nesting_depth,
+                true,
+            );
+            let without_push_down_filter =
+                build_non_case_left_join_df_with_push_down_filter(
+                    &rt,
+                    predicate_count,
+                    nesting_depth,
+                    false,
+                );
+
+            let input_label =
+                
format!("predicates={predicate_count},nesting_depth={nesting_depth}");
+            control_group.bench_with_input(
+                BenchmarkId::new("with_push_down_filter", &input_label),
+                &with_push_down_filter,
+                |b, df| {
+                    b.iter(|| {
+                        let df_clone = df.clone();
+                        black_box(
+                            rt.block_on(async {
+                                df_clone.into_optimized_plan().unwrap()
+                            }),
+                        );
+                    })
+                },
+            );
+            control_group.bench_with_input(
+                BenchmarkId::new("without_push_down_filter", &input_label),
+                &without_push_down_filter,
+                |b, df| {
+                    b.iter(|| {
+                        let df_clone = df.clone();
+                        black_box(
+                            rt.block_on(async {
+                                df_clone.into_optimized_plan().unwrap()
+                            }),
+                        );
+                    })
+                },
+            );
+        }
+    }
+    control_group.finish();
 }
 
 criterion_group!(benches, criterion_benchmark);
diff --git a/datafusion/optimizer/src/push_down_filter.rs 
b/datafusion/optimizer/src/push_down_filter.rs
index f1664f267b..03a7a0b864 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -23,7 +23,9 @@ use std::sync::Arc;
 use arrow::datatypes::DataType;
 use indexmap::IndexSet;
 use itertools::Itertools;
+use log::{Level, debug, log_enabled};
 
+use datafusion_common::instant::Instant;
 use datafusion_common::tree_node::{
     Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
 };
@@ -525,8 +527,19 @@ fn push_down_join(
         .map_or_else(Vec::new, |filter| 
split_conjunction_owned(filter.clone()));
 
     // Are there any new join predicates that can be inferred from the filter 
expressions?
-    let inferred_join_predicates =
-        infer_join_predicates(&join, &predicates, &on_filters)?;
+    let inferred_join_predicates = with_debug_timing("infer_join_predicates", 
|| {
+        infer_join_predicates(&join, &predicates, &on_filters)
+    })?;
+
+    if log_enabled!(Level::Debug) {
+        debug!(
+            "push_down_filter: join_type={:?}, parent_predicates={}, 
on_filters={}, inferred_join_predicates={}",
+            join.join_type,
+            predicates.len(),
+            on_filters.len(),
+            inferred_join_predicates.len()
+        );
+    }
 
     if on_filters.is_empty()
         && predicates.is_empty()
@@ -765,7 +778,15 @@ impl OptimizerRule for PushDownFilter {
 
         let predicate = split_conjunction_owned(filter.predicate.clone());
         let old_predicate_len = predicate.len();
-        let new_predicates = simplify_predicates(predicate)?;
+        let new_predicates =
+            with_debug_timing("simplify_predicates", || 
simplify_predicates(predicate))?;
+        if log_enabled!(Level::Debug) {
+            debug!(
+                "push_down_filter: simplify_predicates old_count={}, 
new_count={}",
+                old_predicate_len,
+                new_predicates.len()
+            );
+        }
         if old_predicate_len != new_predicates.len() {
             let Some(new_predicate) = conjunction(new_predicates) else {
                 // new_predicates is empty - remove the filter entirely
@@ -1377,6 +1398,22 @@ impl PushDownFilter {
     }
 }
 
+fn with_debug_timing<T, F>(label: &'static str, f: F) -> Result<T>
+where
+    F: FnOnce() -> Result<T>,
+{
+    if !log_enabled!(Level::Debug) {
+        return f();
+    }
+    let start = Instant::now();
+    let result = f();
+    debug!(
+        "push_down_filter_timing: section={label}, elapsed_us={}",
+        start.elapsed().as_micros()
+    );
+    result
+}
+
 /// replaces columns by its name on the projection.
 pub fn replace_cols_by_name(
     e: Expr,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to