zhuqi-lucas opened a new issue, #22949:
URL: https://github.com/apache/datafusion/issues/22949

   ### Describe the bug
   
   \`PhysicalExprSimplifier\` does not recognize \`DynamicFilterPhysicalExpr\` 
as a live mutable wrapper. When run over an expression that contains a 
\`DynamicFilterPhysicalExpr\`, the simplifier folds the wrapper into a stale 
literal snapshot, so subsequent \`update()\` calls on the original wrapper are 
no longer visible through the simplified output.
   
   This is in contrast to the schema adapter 
(\`DefaultPhysicalExprAdapterFactory\`), which uses \`TreeNode::rewrite\` + 
\`with_new_children\` and preserves the wrapper (and its \`Arc<Inner>\`) 
correctly.
   
   ### To Reproduce
   
   ```rust
   use std::sync::Arc;
   use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
   use datafusion::physical_expr::expressions::{BinaryExpr, Column, 
DynamicFilterPhysicalExpr, lit};
   use datafusion::physical_expr_adapter::{DefaultPhysicalExprAdapterFactory, 
PhysicalExprAdapterFactory};
   use datafusion::physical_expr::simplifier::PhysicalExprSimplifier;
   use datafusion::physical_plan::PhysicalExpr;
   use datafusion_expr::Operator;
   
   // Schemas with a column-position mismatch so the adapter has work to do.
   let logical: SchemaRef = Arc::new(Schema::new(vec![
       Field::new(\"ticker\", DataType::Int32, false),
   ]));
   let physical: SchemaRef = Arc::new(Schema::new(vec![
       Field::new(\"extra\", DataType::Int64, false),
       Field::new(\"ticker\", DataType::Int32, false),
   ]));
   
   // Build a DynamicFilterPhysicalExpr wrapping ticker > 100.
   let ticker_col: Arc<dyn PhysicalExpr> =
       Arc::new(Column::new_with_schema(\"ticker\", logical.as_ref()).unwrap());
   let inner: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
       Arc::clone(&ticker_col),
       Operator::Gt,
       lit(100_i32),
   ));
   let original: Arc<DynamicFilterPhysicalExpr> =
       Arc::new(DynamicFilterPhysicalExpr::new(vec![Arc::clone(&ticker_col)], 
inner));
   
   // Path 1: adapter only -- wrapper survives, live link preserved.
   let adapter = DefaultPhysicalExprAdapterFactory
       .create(Arc::clone(&logical), Arc::clone(&physical))
       .unwrap();
   let adapted = adapter
       .rewrite(Arc::clone(&original) as Arc<dyn PhysicalExpr>)
       .unwrap();
   assert!(
       adapted.as_any().is::<DynamicFilterPhysicalExpr>(),
       \"adapter should preserve DynamicFilterPhysicalExpr wrapper\",
   );
   
   // Path 2: adapter + simplifier -- wrapper is folded into a stale snapshot.
   let simplifier = PhysicalExprSimplifier::new(&physical);
   let adapted_and_simplified = 
simplifier.simplify(adapter.rewrite(Arc::clone(&original) as Arc<dyn 
PhysicalExpr>).unwrap()).unwrap();
   
   // Update the original AFTER simplification.
   let new_inner: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
       Arc::clone(&ticker_col),
       Operator::Gt,
       lit(500_i32),
   ));
   original.update(new_inner).unwrap();
   
   // Path 1's output sees the update; path 2's output is stuck at the
   // pre-update snapshot, so anything downstream (e.g. PruningPredicate
   // built from it) silently uses the stale threshold.
   ```
   
   ### Expected behavior
   
   \`PhysicalExprSimplifier\` should treat \`DynamicFilterPhysicalExpr\` as 
opaque (return it unchanged from the simplification pass), the same way 
\`TreeNode::rewrite\` does via \`with_new_children\`. The wrapper's 
\`Arc<Inner>\` is the point of contact for live updates from TopK / HashJoin 
runtime feedback, and folding it breaks that contract.
   
   ### Actual behavior
   
   The simplifier descends into the wrapper, fetches the current snapshot, and 
folds based on that snapshot. The returned expression is no longer a 
\`DynamicFilterPhysicalExpr\` and is decoupled from any subsequent \`update()\` 
on the original wrapper. Any downstream consumer (e.g. 
\`build_pruning_predicate\`) then sees a stale literal where it expected a live 
filter.
   
   ### Why this matters
   
   This came up in a real downstream setup where a custom parquet opener uses 
\`DynamicFilterPhysicalExpr\` from a TopK SortExec as the input to incremental 
row-group pruning. The pruner rebuilds its \`PruningPredicate\` whenever 
\`snapshot_generation()\` changes; if the predicate is fed through the standard 
schema-adaptation helper (which composes adapter + simplifier), the resulting 
pruning predicate is a no-op against the live filter. Observed end-to-end 
impact:
   
   - \`row_groups_pruned_statistics\`: 0 (expected ≈ 99% pruned)
   - \`bytes_scanned\`: 91 MB (expected ≈ 136 KB)
   - \`time_elapsed_processing\`: 31 s (expected ≈ 100 ms)
   
   The dyn filter itself was correctly populated and visible in \`EXPLAIN 
ANALYZE\`; only the simplifier-folded copy used by the pruner was stale.
   
   ### Suggested fix
   
   Either:
   
   1. Add a special-case in \`PhysicalExprSimplifier\` that recognizes 
\`DynamicFilterPhysicalExpr\` and returns it untouched.
   2. Expose a marker trait or method on \`PhysicalExpr\` (e.g. 
\`is_live_mutable() -> bool\` defaulting to \`false\`) that the simplifier 
checks before descending, and have \`DynamicFilterPhysicalExpr\` override it to 
\`true\`. This is more general and would cover any future live wrappers.
   
   Happy to open a PR once we agree on direction.
   
   ### Workaround
   
   Use the schema adapter alone 
(\`DefaultPhysicalExprAdapterFactory::create().rewrite(...)\`) without 
composing it with \`PhysicalExprSimplifier\` when the predicate may contain a 
\`DynamicFilterPhysicalExpr\`. The adapter handles column-ref remapping and 
casts correctly; the simplifier's null-folding step is not necessary when the 
input predicate is a dynamic filter (its column set is fixed by construction so 
no missing-column substitution can fire).
   
   ### Additional context
   
   DataFusion version: 53 (also reproduces on main as of 2026-06-15).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to