haohuaijin opened a new issue, #22941:
URL: https://github.com/apache/datafusion/issues/22941
### Describe the bug
`RowGroupAccessPlanFilter::prune_by_limit` counts a fully matched row group
by its full `RowGroupMetaData::num_rows()`.
This is wrong when the row group uses `RowGroupAccess::Selection(...)`,
because only the selected rows should count toward the `LIMIT`.
`prune_by_limit` also rebuilds the plan with:
```rust
new_access_plan.scan(idx);
```
This drops the original `RowSelection` and changes the row group back to a
full scan.
### To Reproduce
```rust
use std::fs::File;
use std::sync::Arc;
use arrow::array::{Array, Int32Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::{DFSchema, Result};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::memory::DataSourceExec;
use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder,
ParquetSource};
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::{SessionContext, col, lit};
use tempfile::NamedTempFile;
#[tokio::main]
async fn main() -> Result<()> {
let test_file = write_test_file()?;
let schema = test_file.schema;
let mut access_plan = ParquetAccessPlan::new_all(2);
access_plan.scan_selection(
0,
RowSelection::from(vec![RowSelector::select(10),
RowSelector::skip(990)]),
);
let partitioned_file =
PartitionedFile::new(test_file.path,
test_file.size).with_extension(access_plan);
let ctx = SessionContext::new();
let predicate_expr = col("id").gt_eq(lit(0_i32));
let df_schema = DFSchema::try_from(Arc::clone(&schema))?;
let predicate = ctx.create_physical_expr(predicate_expr, &df_schema)?;
let source =
Arc::new(ParquetSource::new(Arc::clone(&schema)).with_predicate(predicate));
let config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
.with_file(partitioned_file)
.with_limit(Some(50))
.build();
let plan: Arc<dyn ExecutionPlan> =
DataSourceExec::from_data_source(config);
let batches = datafusion::physical_plan::collect(plan,
ctx.task_ctx()).await?;
let actual_ids = collect_ids(&batches);
let expected_ids = (0..10).chain(1000..1040).collect::<Vec<_>>();
println!("actual ids: {actual_ids:?}");
println!("expected ids: {expected_ids:?}");
assert_eq!(
actual_ids, expected_ids,
"LIMIT pruning should preserve RowSelection and count only selected
rows"
);
Ok(())
}
struct TestFile {
path: String,
size: u64,
schema: Arc<Schema>,
_temp_file: NamedTempFile,
}
fn write_test_file() -> Result<TestFile> {
let temp_file = NamedTempFile::new()?;
let path = temp_file.path().to_string_lossy().to_string();
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32,
false)]));
let props = WriterProperties::builder()
.set_max_row_group_row_count(Some(1000))
.build();
let file = File::create(&path)?;
let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema),
Some(props))?;
writer.write(&batch(0, Arc::clone(&schema))?)?;
writer.write(&batch(1000, Arc::clone(&schema))?)?;
writer.close()?;
let size = temp_file.path().metadata()?.len();
Ok(TestFile {
path,
size,
schema,
_temp_file: temp_file,
})
}
fn batch(start: i32, schema: Arc<Schema>) -> Result<RecordBatch> {
let values = (start..start + 1000).collect::<Vec<_>>();
Ok(RecordBatch::try_new(
schema,
vec![Arc::new(Int32Array::from(values))],
)?)
}
fn collect_ids(batches: &[RecordBatch]) -> Vec<i32> {
batches
.iter()
.flat_map(|batch| {
batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.expect("id column should be Int32")
.iter()
.map(|value| value.expect("id should be non-null"))
.collect::<Vec<_>>()
})
.collect()
}
```
### Expected behavior
_No response_
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]