haohuaijin opened a new issue, #20109:
URL: https://github.com/apache/datafusion/issues/20109
### Describe the bug
i have a plan like below that have two consecutive FilterExec, so i use
FilterPushdown two merge it, but after the merge and execute, it report error.
if i not call FilterPushDown, the plan can output result.
```
Physical plan before pushdown filter:
FilterExec: event@0 = Ingestion, projection=[size@1]
FilterExec: time@0 < 1770020321951000, projection=[event@1, size@2]
DataSourceExec: partitions=1, partition_sizes=[11]
Get result before pushdown filter, result size: 8
Physical plan after pushdown filter:
ProjectionExec: expr=[size@1 as size]
FilterExec: event@0 = Ingestion AND time@0 < 1770020321951000
DataSourceExec: partitions=1, partition_sizes=[11]
Error: ArrowError(InvalidArgumentError("Invalid comparison operation: Int64
== Utf8"), Some(""))
```
the `event@0` is not correct, it should be `event@1`
### To Reproduce
```rust
use std::sync::Arc;
use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::Result;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::physical_expr::expressions::{BinaryExpr, col, lit};
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::{ExecutionPlan, collect, displayable};
use datafusion::prelude::SessionContext;
use datafusion_expr::Operator;
#[tokio::main]
async fn main() -> Result<()> {
// 1. Create data source
let memory_exec = create_data_source();
// 2. First FilterExec: time < 1770020321951000 with
projection=[event@1, size@2]
let timestamp_col = col("time", &memory_exec.schema())?;
let upper_bound = lit(1770020321951000i64);
let lt_expr = Arc::new(BinaryExpr::new(timestamp_col, Operator::Lt,
upper_bound));
let filter1_exec: Arc<dyn ExecutionPlan> = Arc::new(
FilterExec::try_new(lt_expr,
memory_exec)?.with_projection(Some(vec![1, 2]))?, // projection=[event@1,
size@2]
);
// 3. Second FilterExec: event = 'Ingestion' with projection=[size]
let event_col = col("event", &filter1_exec.schema())?;
let ingestion_lit = lit("Ingestion");
let event_filter = Arc::new(BinaryExpr::new(event_col, Operator::Eq,
ingestion_lit));
let filter2_exec: Arc<dyn ExecutionPlan> = Arc::new(
FilterExec::try_new(event_filter, filter1_exec)?
.with_projection(Some(vec![1]))?, // projection=[size@1]
);
println!("Physical plan before pushdown filter:");
print!(
"{}",
displayable(filter2_exec.as_ref()).indent(true).to_string()
);
// Execute the plan
let ctx = SessionContext::new();
let result = collect(filter2_exec.clone(), ctx.task_ctx()).await?;
println!(
"Get result before pushdown filter, result size: {:?}\n",
result.iter().fold(0, |acc, x| acc + x.num_rows())
);
// After pushdown filter
let pushdown_filter = FilterPushdown::new();
let optimized_plan =
pushdown_filter.optimize(filter2_exec.clone(),
ctx.state().config_options())?;
println!("Physical plan after pushdown filter:");
print!(
"{}",
displayable(optimized_plan.as_ref())
.indent(true)
.to_string(),
);
let result = collect(optimized_plan.clone(), ctx.task_ctx()).await?;
println!("{:?}", result);
Ok(())
}
fn create_data_source() -> Arc<dyn ExecutionPlan> {
// Create schema: time, event, size
let schema = Arc::new(Schema::new(vec![
Field::new("time", DataType::Int64, false),
Field::new("event", DataType::Utf8, false),
Field::new("size", DataType::Int64, false),
]));
// Create 11 batches of sample data to get partition_sizes=[11]
let timestamps = vec![
1770020021951000i64,
1770020021951001,
1770020021951002,
1770020021951003,
1770020021951004,
1770020021951005,
1770020021951006,
1770020021951007,
1770020021951008,
1770020021951009,
1770020021951010,
];
let events = vec![
"Ingestion",
"Ingestion",
"Query",
"Ingestion",
"Ingestion",
"Query",
"Ingestion",
"Ingestion",
"Ingestion",
"Query",
"Ingestion",
];
let sizes = vec![100i64, 200, 300, 400, 500, 600, 700, 800, 900, 1000,
1100];
// Create 11 separate batches (one row each) to get partition_sizes=[11]
let batches: Vec<RecordBatch> = (0..11)
.map(|i| {
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(vec![timestamps[i]])),
Arc::new(StringArray::from(vec![events[i]])),
Arc::new(Int64Array::from(vec![sizes[i]])),
],
)
.unwrap()
})
.collect();
// Create MemorySourceConfig to create the data source
MemorySourceConfig::try_new_exec(&[batches], schema.clone(),
None).unwrap()
}
```
### Expected behavior
success run
### Additional context
test all version that have `FilterPushdown`, all not work
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]