haohuaijin opened a new issue, #21069:
URL: https://github.com/apache/datafusion/issues/21069
### Describe the bug
when i test the branch-53, i find this that when i use the `FilterPushdown`
to merge two `FilterExec` if the outer FilterExec have the fetch, after
`FilterPushdown` the fetch wil be discard.
```
before pushdown plan:
FilterExec: event@0 = Ingestion, projection=[size@1], fetch=10
FilterExec: time@0 < 350, projection=[event@1, size@2]
DataSourceExec: partitions=1, partition_sizes=[1]
after pushdown plan:
ProjectionExec: expr=[size@1 as size]
FilterExec: event@1 = Ingestion AND time@0 < 350, projection=[event@1,
size@2]
DataSourceExec: partitions=1, partition_sizes=[1]
```
### To Reproduce
```rust
use std::sync::Arc;
use arrow::array::{Int64Array, RecordBatch, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::config::ConfigOptions;
use datafusion::physical_expr::expressions::{BinaryExpr, Literal, col};
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::displayable;
use datafusion::physical_plan::filter::FilterExecBuilder;
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
#[tokio::main]
async fn main() {
// Create schema: [time, event, size]
let schema = Arc::new(Schema::new(vec![
Field::new("time", DataType::Int64, false),
Field::new("event", DataType::Utf8, false),
Field::new("size", DataType::Int64, false),
]));
// Create sample data
let timestamps = vec![100i64, 200, 300, 400, 500];
let events = vec!["Ingestion", "Ingestion", "Query", "Ingestion",
"Query"];
let sizes = vec![10i64, 20, 30, 40, 50];
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(timestamps)),
Arc::new(StringArray::from(events)),
Arc::new(Int64Array::from(sizes)),
],
)
.unwrap();
// Create data source
let memory_exec =
datafusion::catalog::memory::MemorySourceConfig::try_new_exec(
&[vec![batch]],
schema.clone(),
None,
)
.unwrap();
// First FilterExec: time < 350 with projection=[event@1, size@2]
let time_col = col("time", &memory_exec.schema()).unwrap();
let time_filter = Arc::new(BinaryExpr::new(
time_col,
Operator::Lt,
Arc::new(Literal::new(ScalarValue::Int64(Some(350)))),
));
let filter1 = Arc::new(
FilterExecBuilder::new(time_filter, memory_exec)
.apply_projection(Some(vec![1, 2]))
.unwrap()
.build()
.unwrap(),
);
// Second FilterExec: event = 'Ingestion' with projection=[size@1]
let event_col = col("event", &filter1.schema()).unwrap();
let event_filter = Arc::new(BinaryExpr::new(
event_col,
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Utf8(Some(
"Ingestion".to_string(),
)))),
));
let filter2 = Arc::new(
FilterExecBuilder::new(event_filter, filter1)
.apply_projection(Some(vec![1]))
.unwrap()
.with_fetch(Some(10))
.build()
.unwrap(),
);
let display = displayable(filter2.as_ref());
println!("before pushdown plan:\n{}", display.indent(true));
// Apply filter pushdown optimization
let config = ConfigOptions::default();
let optimized_plan = FilterPushdown::new()
.optimize(Arc::clone(&filter2) as Arc<dyn ExecutionPlan>, &config)
.unwrap();
// print the plan as an indented tree
let display = displayable(optimized_plan.as_ref());
println!("after pushdown plan:\n{}", display.indent(true));
// println!("Tree render:\n{}", display.tree_render());
}
```
### Expected behavior
the fetch should be keep
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]