This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 022da71 feat: push down filters to parquet read path (#208)
022da71 is described below
commit 022da71dde98e5ff17638720e62e1c643e1b48ed
Author: Zach <[email protected]>
AuthorDate: Mon Apr 6 15:34:57 2026 +0800
feat: push down filters to parquet read path (#208)
---
Cargo.toml | 3 +-
bindings/c/src/table.rs | 6 +-
crates/integrations/datafusion/Cargo.toml | 6 +
.../datafusion/src/physical_plan/scan.rs | 123 +++-
crates/integrations/datafusion/src/table/mod.rs | 36 +-
crates/paimon/Cargo.toml | 1 +
crates/paimon/src/arrow/filtering.rs | 74 ++
crates/paimon/src/arrow/mod.rs | 1 +
crates/paimon/src/arrow/reader.rs | 810 ++++++++++++++++++++-
crates/paimon/src/io/file_io.rs | 62 +-
crates/paimon/src/lib.rs | 1 +
crates/paimon/src/predicate_stats.rs | 195 +++++
crates/paimon/src/table/read_builder.rs | 395 +++++++++-
crates/paimon/src/table/stats_filter.rs | 266 ++-----
crates/paimon/src/table/table_scan.rs | 109 ++-
crates/test_utils.rs | 101 +++
16 files changed, 1855 insertions(+), 334 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 004fb9d..cfba61f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -32,7 +32,8 @@ arrow = "57.0"
arrow-array = { version = "57.0", features = ["ffi"] }
arrow-schema = "57.0"
arrow-cast = "57.0"
+arrow-ord = "57.0"
datafusion = "52.3.0"
datafusion-ffi = "52.3.0"
parquet = "57.0"
-tokio = "1.39.2"
\ No newline at end of file
+tokio = "1.39.2"
diff --git a/bindings/c/src/table.rs b/bindings/c/src/table.rs
index c3f57fe..6f6637d 100644
--- a/bindings/c/src/table.rs
+++ b/bindings/c/src/table.rs
@@ -349,8 +349,10 @@ pub unsafe extern "C" fn paimon_table_read_to_arrow(
let end = (offset.saturating_add(length)).min(all_splits.len());
let selected = &all_splits[start..end];
- // Create TableRead with the stored read_type (projection)
- let table_read = paimon::table::TableRead::new(&state.table,
state.read_type.clone());
+ // C bindings currently persist only the projection, so reconstructing the
+ // read uses an empty predicate set.
+ let table_read =
+ paimon::table::TableRead::new(&state.table, state.read_type.clone(),
Vec::new());
match table_read.to_arrow(selected) {
Ok(stream) => {
diff --git a/crates/integrations/datafusion/Cargo.toml
b/crates/integrations/datafusion/Cargo.toml
index 4fdff40..370279c 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -35,4 +35,10 @@ futures = "0.3"
tokio = { workspace = true, features = ["rt", "time", "fs"] }
[dev-dependencies]
+arrow-array = { workspace = true }
+arrow-schema = { workspace = true }
+parquet = { workspace = true }
+serde = "1"
+serde_json = "1"
+tempfile = "3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs
b/crates/integrations/datafusion/src/physical_plan/scan.rs
index a8c9aea..d7dfc7a 100644
--- a/crates/integrations/datafusion/src/physical_plan/scan.rs
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -26,6 +26,7 @@ use datafusion::physical_plan::execution_plan::{Boundedness,
EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning,
PlanProperties};
use futures::{StreamExt, TryStreamExt};
+use paimon::spec::Predicate;
use paimon::table::Table;
use paimon::DataSplit;
@@ -41,6 +42,9 @@ pub struct PaimonTableScan {
table: Table,
/// Projected column names (if None, reads all columns).
projected_columns: Option<Vec<String>>,
+ /// Filter translated from DataFusion expressions and reused during
execute()
+ /// so reader-side pruning reaches the actual read path.
+ pushed_predicate: Option<Predicate>,
/// Pre-planned partition assignments: `planned_partitions[i]` contains the
/// Paimon splits that DataFusion partition `i` will read.
/// Wrapped in `Arc` to avoid deep-cloning `DataSplit` metadata in
`execute()`.
@@ -55,6 +59,7 @@ impl PaimonTableScan {
schema: ArrowSchemaRef,
table: Table,
projected_columns: Option<Vec<String>>,
+ pushed_predicate: Option<Predicate>,
planned_partitions: Vec<Arc<[DataSplit]>>,
limit: Option<usize>,
) -> Self {
@@ -67,6 +72,7 @@ impl PaimonTableScan {
Self {
table,
projected_columns,
+ pushed_predicate,
planned_partitions,
plan_properties,
limit,
@@ -82,6 +88,11 @@ impl PaimonTableScan {
&self.planned_partitions
}
+ #[cfg(test)]
+ pub(crate) fn pushed_predicate(&self) -> Option<&Predicate> {
+ self.pushed_predicate.as_ref()
+ }
+
pub fn limit(&self) -> Option<usize> {
self.limit
}
@@ -126,6 +137,7 @@ impl ExecutionPlan for PaimonTableScan {
let table = self.table.clone();
let schema = self.schema();
let projected_columns = self.projected_columns.clone();
+ let pushed_predicate = self.pushed_predicate.clone();
let fut = async move {
let mut read_builder = table.new_read_builder();
@@ -134,6 +146,9 @@ impl ExecutionPlan for PaimonTableScan {
let col_refs: Vec<&str> = columns.iter().map(|s|
s.as_str()).collect();
read_builder.with_projection(&col_refs);
}
+ if let Some(filter) = pushed_predicate {
+ read_builder.with_filter(filter);
+ }
let read = read_builder.new_read().map_err(to_datafusion_error)?;
let stream = read.to_arrow(&splits).map_err(to_datafusion_error)?;
@@ -173,11 +188,26 @@ impl DisplayAs for PaimonTableScan {
#[cfg(test)]
mod tests {
use super::*;
- use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field,
Schema};
+ mod test_utils {
+ include!(concat!(env!("CARGO_MANIFEST_DIR"), "/../../test_utils.rs"));
+ }
+
+ use datafusion::arrow::array::Int32Array;
+ use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field,
Schema as ArrowSchema};
use datafusion::physical_plan::ExecutionPlan;
+ use datafusion::prelude::SessionContext;
+ use futures::TryStreamExt;
+ use paimon::catalog::Identifier;
+ use paimon::io::FileIOBuilder;
+ use paimon::spec::{
+ BinaryRow, DataType, Datum, IntType, PredicateBuilder, Schema as
PaimonSchema, TableSchema,
+ };
+ use std::fs;
+ use tempfile::tempdir;
+ use test_utils::{local_file_path, test_data_file, write_int_parquet_file};
fn test_schema() -> ArrowSchemaRef {
- Arc::new(Schema::new(vec![Field::new(
+ Arc::new(ArrowSchema::new(vec![Field::new(
"id",
ArrowDataType::Int32,
false,
@@ -191,6 +221,7 @@ mod tests {
schema,
dummy_table(),
None,
+ None,
vec![Arc::from(Vec::new())],
None,
);
@@ -205,19 +236,16 @@ mod tests {
Arc::from(Vec::new()),
Arc::from(Vec::new()),
];
- let scan = PaimonTableScan::new(schema, dummy_table(), None,
planned_partitions, None);
+ let scan =
+ PaimonTableScan::new(schema, dummy_table(), None, None,
planned_partitions, None);
assert_eq!(scan.properties().output_partitioning().partition_count(),
3);
}
/// Constructs a minimal Table for testing (no real files needed since we
/// only test PlanProperties, not actual reads).
fn dummy_table() -> Table {
- use paimon::catalog::Identifier;
- use paimon::io::FileIOBuilder;
- use paimon::spec::{Schema, TableSchema};
-
let file_io = FileIOBuilder::new("file").build().unwrap();
- let schema = Schema::builder().build().unwrap();
+ let schema = PaimonSchema::builder().build().unwrap();
let table_schema = TableSchema::new(0, &schema);
Table::new(
file_io,
@@ -226,4 +254,83 @@ mod tests {
table_schema,
)
}
+
+ #[tokio::test]
+ async fn test_execute_applies_pushed_filter_during_read() {
+ let tempdir = tempdir().unwrap();
+ let table_path = local_file_path(tempdir.path());
+ let bucket_dir = tempdir.path().join("bucket-0");
+ fs::create_dir_all(&bucket_dir).unwrap();
+
+ write_int_parquet_file(
+ &bucket_dir.join("data.parquet"),
+ vec![("id", vec![1, 2, 3, 4]), ("value", vec![5, 20, 30, 40])],
+ Some(2),
+ );
+
+ let file_io = FileIOBuilder::new("file").build().unwrap();
+ let table_schema = TableSchema::new(
+ 0,
+ &paimon::spec::Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .build()
+ .unwrap(),
+ );
+ let table = Table::new(
+ file_io,
+ Identifier::new("default", "t"),
+ table_path,
+ table_schema,
+ );
+
+ let split = paimon::DataSplitBuilder::new()
+ .with_snapshot(1)
+ .with_partition(BinaryRow::new(0))
+ .with_bucket(0)
+ .with_bucket_path(local_file_path(&bucket_dir))
+ .with_total_buckets(1)
+ .with_data_files(vec![test_data_file("data.parquet", 4)])
+ .with_raw_convertible(true)
+ .build()
+ .unwrap();
+
+ let pushed_predicate = PredicateBuilder::new(table.schema().fields())
+ .greater_or_equal("value", Datum::Int(10))
+ .unwrap();
+
+ let schema = Arc::new(ArrowSchema::new(vec![Field::new(
+ "id",
+ ArrowDataType::Int32,
+ false,
+ )]));
+ let scan = PaimonTableScan::new(
+ schema,
+ table,
+ Some(vec!["id".to_string()]),
+ Some(pushed_predicate),
+ vec![Arc::from(vec![split])],
+ None,
+ );
+
+ let ctx = SessionContext::new();
+ let stream = scan
+ .execute(0, ctx.task_ctx())
+ .expect("execute should succeed");
+ let batches = stream.try_collect::<Vec<_>>().await.unwrap();
+
+ let actual_ids: Vec<i32> = batches
+ .iter()
+ .flat_map(|batch| {
+ let ids = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("id column should be Int32Array");
+ (0..ids.len()).map(|idx| ids.value(idx)).collect::<Vec<_>>()
+ })
+ .collect();
+
+ assert_eq!(actual_ids, vec![2, 3, 4]);
+ }
}
diff --git a/crates/integrations/datafusion/src/table/mod.rs
b/crates/integrations/datafusion/src/table/mod.rs
index 3c4a98c..a76174d 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -36,8 +36,13 @@ use crate::runtime::await_with_runtime;
/// Read-only table provider for a Paimon table.
///
-/// Supports full table scan, column projection, and partition predicate
pushdown.
-/// Data-level filtering remains a residual DataFusion filter.
+/// Supports full table scan, column projection, and predicate pushdown for
+/// planning. Partition predicates prune splits eagerly, while supported
+/// non-partition data predicates may also be reused by the Parquet read path
+/// for row-group pruning and partial decode-time filtering.
+///
+/// DataFusion still treats pushed filters as inexact because unsupported
+/// predicates and non-Parquet reads remain residual filters.
#[derive(Debug, Clone)]
pub struct PaimonTableProvider {
table: Table,
@@ -103,8 +108,9 @@ impl TableProvider for PaimonTableProvider {
};
// Plan splits eagerly so we know partition count upfront.
+ let pushed_predicate = build_pushed_predicate(filters,
self.table.schema().fields());
let mut read_builder = self.table.new_read_builder();
- if let Some(filter) = build_pushed_predicate(filters,
self.table.schema().fields()) {
+ if let Some(filter) = pushed_predicate.clone() {
read_builder.with_filter(filter);
}
// Push the limit hint to paimon-core planning to reduce splits when
possible.
@@ -141,6 +147,7 @@ impl TableProvider for PaimonTableProvider {
projected_schema,
self.table.clone(),
projected_columns,
+ pushed_predicate,
planned_partitions,
limit,
)))
@@ -318,4 +325,27 @@ mod tests {
BTreeSet::from([("2024-01-01".to_string(), 10)]),
);
}
+
+ #[tokio::test]
+ async fn test_scan_keeps_pushed_predicate_for_execute() {
+ let provider = create_provider("partitioned_log_table").await;
+ let filter = col("id").gt(lit(1));
+
+ let config = SessionConfig::new().with_target_partitions(8);
+ let ctx = SessionContext::new_with_config(config);
+ let state = ctx.state();
+ let plan = provider
+ .scan(&state, None, std::slice::from_ref(&filter), None)
+ .await
+ .expect("scan() should succeed");
+ let scan = plan
+ .as_any()
+ .downcast_ref::<PaimonTableScan>()
+ .expect("Expected PaimonTableScan");
+
+ let expected = build_pushed_predicate(&[filter],
provider.table().schema().fields())
+ .expect("data filter should translate");
+
+ assert_eq!(scan.pushed_predicate(), Some(&expected));
+ }
}
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index a4a4ae4..d5d75ae 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -57,6 +57,7 @@ indexmap = "2.5.0"
roaring = "0.11"
arrow-array = { workspace = true }
arrow-cast = { workspace = true }
+arrow-ord = { workspace = true }
arrow-schema = { workspace = true }
futures = "0.3"
parquet = { workspace = true, features = ["async", "zstd"] }
diff --git a/crates/paimon/src/arrow/filtering.rs
b/crates/paimon/src/arrow/filtering.rs
new file mode 100644
index 0000000..da2da14
--- /dev/null
+++ b/crates/paimon/src/arrow/filtering.rs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::schema_evolution::create_index_mapping;
+pub(crate) use crate::predicate_stats::{predicates_may_match_with_schema,
StatsAccessor};
+use crate::spec::{DataField, Predicate, PredicateOperator};
+
+pub(crate) fn reader_pruning_predicates(data_predicates: Vec<Predicate>) ->
Vec<Predicate> {
+ data_predicates
+ .into_iter()
+ .filter(predicate_supported_for_reader_pruning)
+ .collect()
+}
+
+pub(crate) fn build_field_mapping(
+ table_fields: &[DataField],
+ file_fields: &[DataField],
+) -> Vec<Option<usize>> {
+ normalize_field_mapping(
+ create_index_mapping(table_fields, file_fields),
+ table_fields.len(),
+ )
+}
+
+fn predicate_supported_for_reader_pruning(predicate: &Predicate) -> bool {
+ match predicate {
+ Predicate::AlwaysFalse => true,
+ Predicate::Leaf { op, .. } => {
+ matches!(
+ op,
+ PredicateOperator::IsNull
+ | PredicateOperator::IsNotNull
+ | PredicateOperator::Eq
+ | PredicateOperator::NotEq
+ | PredicateOperator::Lt
+ | PredicateOperator::LtEq
+ | PredicateOperator::Gt
+ | PredicateOperator::GtEq
+ | PredicateOperator::In
+ | PredicateOperator::NotIn
+ )
+ }
+ Predicate::AlwaysTrue | Predicate::And(_) | Predicate::Or(_) |
Predicate::Not(_) => false,
+ }
+}
+
+fn identity_field_mapping(num_fields: usize) -> Vec<Option<usize>> {
+ (0..num_fields).map(Some).collect()
+}
+
+fn normalize_field_mapping(mapping: Option<Vec<i32>>, num_fields: usize) ->
Vec<Option<usize>> {
+ mapping
+ .map(|field_mapping| {
+ field_mapping
+ .into_iter()
+ .map(|index| usize::try_from(index).ok())
+ .collect()
+ })
+ .unwrap_or_else(|| identity_field_mapping(num_fields))
+}
diff --git a/crates/paimon/src/arrow/mod.rs b/crates/paimon/src/arrow/mod.rs
index e823c90..f524d59 100644
--- a/crates/paimon/src/arrow/mod.rs
+++ b/crates/paimon/src/arrow/mod.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+pub(crate) mod filtering;
mod reader;
pub(crate) mod schema_evolution;
diff --git a/crates/paimon/src/arrow/reader.rs
b/crates/paimon/src/arrow/reader.rs
index a676e61..91a67c6 100644
--- a/crates/paimon/src/arrow/reader.rs
+++ b/crates/paimon/src/arrow/reader.rs
@@ -16,25 +16,39 @@
// under the License.
use crate::arrow::build_target_arrow_schema;
+use crate::arrow::filtering::{
+ build_field_mapping, predicates_may_match_with_schema, StatsAccessor,
+};
use crate::arrow::schema_evolution::{create_index_mapping, NULL_FIELD_INDEX};
use crate::deletion_vector::{DeletionVector, DeletionVectorFactory};
use crate::io::{FileIO, FileRead, FileStatus};
-use crate::spec::{DataField, DataFileMeta};
+use crate::spec::{DataField, DataFileMeta, DataType, Datum, Predicate,
PredicateOperator};
use crate::table::schema_manager::SchemaManager;
use crate::table::ArrowRecordBatchStream;
use crate::{DataSplit, Error};
-use arrow_array::RecordBatch;
+use arrow_array::{
+ Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array,
Float32Array,
+ Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch,
Scalar, StringArray,
+};
use arrow_cast::cast;
+use arrow_ord::cmp::{
+ eq as arrow_eq, gt as arrow_gt, gt_eq as arrow_gt_eq, lt as arrow_lt,
lt_eq as arrow_lt_eq,
+ neq as arrow_neq,
+};
+use arrow_schema::ArrowError;
use async_stream::try_stream;
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{StreamExt, TryFutureExt};
-use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection,
RowSelector};
+use parquet::arrow::arrow_reader::{
+ ArrowPredicate, ArrowPredicateFn, ArrowReaderOptions, RowFilter,
RowSelection, RowSelector,
+};
use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::ParquetMetaDataReader;
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use parquet::file::statistics::Statistics as ParquetStatistics;
use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;
@@ -46,6 +60,8 @@ pub struct ArrowReaderBuilder {
file_io: FileIO,
schema_manager: SchemaManager,
table_schema_id: i64,
+ predicates: Vec<Predicate>,
+ table_fields: Vec<DataField>,
}
impl ArrowReaderBuilder {
@@ -60,9 +76,24 @@ impl ArrowReaderBuilder {
file_io,
schema_manager,
table_schema_id,
+ predicates: Vec::new(),
+ table_fields: Vec::new(),
}
}
+ /// Set data predicates used for Parquet row-group pruning and partial
+ /// decode-time filtering.
+ pub(crate) fn with_predicates(mut self, predicates: Vec<Predicate>) ->
Self {
+ self.predicates = predicates;
+ self
+ }
+
+ /// Set the full table schema fields used for filter-to-file field mapping.
+ pub(crate) fn with_table_fields(mut self, table_fields: Vec<DataField>) ->
Self {
+ self.table_fields = table_fields;
+ self
+ }
+
/// Build the ArrowReader with the given read type (logical row type or
projected subset).
/// Used to clip Parquet schema to requested columns only.
pub fn build(self, read_type: Vec<DataField>) -> ArrowReader {
@@ -71,6 +102,8 @@ impl ArrowReaderBuilder {
file_io: self.file_io,
schema_manager: self.schema_manager,
table_schema_id: self.table_schema_id,
+ predicates: self.predicates,
+ table_fields: self.table_fields,
read_type,
}
}
@@ -83,6 +116,8 @@ pub struct ArrowReader {
file_io: FileIO,
schema_manager: SchemaManager,
table_schema_id: i64,
+ predicates: Vec<Predicate>,
+ table_fields: Vec<DataField>,
read_type: Vec<DataField>,
}
@@ -100,6 +135,8 @@ impl ArrowReader {
let batch_size = self.batch_size;
let splits: Vec<DataSplit> = data_splits.to_vec();
let read_type = self.read_type;
+ let predicates = self.predicates;
+ let table_fields = self.table_fields;
let schema_manager = self.schema_manager;
let table_schema_id = self.table_schema_id;
Ok(try_stream! {
@@ -137,12 +174,16 @@ impl ArrowReader {
let mut stream = read_single_file_stream(
file_io.clone(),
- split.clone(),
- file_meta,
- read_type.clone(),
- data_fields,
- batch_size,
- dv,
+ SingleFileReadRequest {
+ split: split.clone(),
+ file_meta,
+ read_type: read_type.clone(),
+ table_fields: table_fields.clone(),
+ data_fields,
+ predicates: predicates.clone(),
+ batch_size,
+ dv,
+ },
)?;
while let Some(batch) = stream.next().await {
yield batch?;
@@ -186,8 +227,17 @@ impl ArrowReader {
};
let mut stream = read_single_file_stream(
- file_io.clone(), split.clone(), file_meta,
read_type.clone(),
- data_fields, batch_size, None,
+ file_io.clone(),
+ SingleFileReadRequest {
+ split: split.clone(),
+ file_meta,
+ read_type: read_type.clone(),
+ table_fields: table_fields.clone(),
+ data_fields,
+ predicates: Vec::new(),
+ batch_size,
+ dv: None,
+ },
)?;
while let Some(batch) = stream.next().await {
yield batch?;
@@ -214,6 +264,17 @@ impl ArrowReader {
}
}
+struct SingleFileReadRequest {
+ split: DataSplit,
+ file_meta: DataFileMeta,
+ read_type: Vec<DataField>,
+ table_fields: Vec<DataField>,
+ data_fields: Option<Vec<DataField>>,
+ predicates: Vec<Predicate>,
+ batch_size: Option<usize>,
+ dv: Option<Arc<DeletionVector>>,
+}
+
/// Read a single parquet file from a split, returning a lazy stream of
batches.
/// Optionally applies a deletion vector.
///
@@ -226,14 +287,21 @@ impl ArrowReader {
/// Reference:
[RawFileSplitRead.createFileReader](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java)
fn read_single_file_stream(
file_io: FileIO,
- split: DataSplit,
- file_meta: DataFileMeta,
- read_type: Vec<DataField>,
- data_fields: Option<Vec<DataField>>,
- batch_size: Option<usize>,
- dv: Option<Arc<DeletionVector>>,
+ request: SingleFileReadRequest,
) -> crate::Result<ArrowRecordBatchStream> {
+ let SingleFileReadRequest {
+ split,
+ file_meta,
+ read_type,
+ table_fields,
+ data_fields,
+ predicates,
+ batch_size,
+ dv,
+ } = request;
+
let target_schema = build_target_arrow_schema(&read_type)?;
+ let file_fields = data_fields.clone().unwrap_or_else(||
table_fields.clone());
// Compute index mapping and determine which columns to read from the
parquet file.
// If data_fields is provided, use field-ID-based mapping; otherwise use
read_type names directly.
@@ -301,14 +369,33 @@ fn read_single_file_stream(
let mask = ProjectionMask::roots(&parquet_schema, root_indices);
batch_stream_builder = batch_stream_builder.with_projection(mask);
+ let row_filter =
+ build_parquet_row_filter(&parquet_schema, &predicates,
&table_fields, &file_fields)?;
+ if let Some(row_filter) = row_filter {
+ batch_stream_builder =
batch_stream_builder.with_row_filter(row_filter);
+ }
+
+ let predicate_row_selection = build_predicate_row_selection(
+ batch_stream_builder.metadata().row_groups(),
+ &predicates,
+ &table_fields,
+ &file_fields,
+ )?;
+ let mut row_selection = predicate_row_selection;
if let Some(ref dv) = dv {
if !dv.is_empty() {
- let row_selection =
+ let delete_row_selection =
build_deletes_row_selection(batch_stream_builder.metadata().row_groups(), dv)?;
- batch_stream_builder =
batch_stream_builder.with_row_selection(row_selection);
+ row_selection = intersect_optional_row_selections(
+ row_selection,
+ Some(delete_row_selection),
+ );
}
}
+ if let Some(row_selection) = row_selection {
+ batch_stream_builder =
batch_stream_builder.with_row_selection(row_selection);
+ }
if let Some(size) = batch_size {
batch_stream_builder = batch_stream_builder.with_batch_size(size);
}
@@ -551,12 +638,16 @@ fn merge_files_by_columns(
let stream = read_single_file_stream(
file_io.clone(),
- split.clone(),
- data_files[file_idx].clone(),
- file_read_type,
- data_fields.clone(),
- batch_size,
- None,
+ SingleFileReadRequest {
+ split: split.clone(),
+ file_meta: data_files[file_idx].clone(),
+ read_type: file_read_type,
+ table_fields: table_fields.clone(),
+ data_fields: data_fields.clone(),
+ predicates: Vec::new(),
+ batch_size,
+ dv: None,
+ },
)?;
file_streams.insert(file_idx, stream);
}
@@ -647,6 +738,623 @@ fn merge_files_by_columns(
.boxed())
}
+fn intersect_optional_row_selections(
+ left: Option<RowSelection>,
+ right: Option<RowSelection>,
+) -> Option<RowSelection> {
+ match (left, right) {
+ (Some(left), Some(right)) => Some(left.intersection(&right)),
+ (Some(selection), None) | (None, Some(selection)) => Some(selection),
+ (None, None) => None,
+ }
+}
+
+fn sanitize_filter_mask(mask: BooleanArray) -> BooleanArray {
+ if mask.null_count() == 0 {
+ return mask;
+ }
+
+ boolean_mask_from_predicate(mask.len(), |row_index| {
+ mask.is_valid(row_index) && mask.value(row_index)
+ })
+}
+
+fn evaluate_exact_leaf_predicate(
+ array: &ArrayRef,
+ data_type: &DataType,
+ op: PredicateOperator,
+ literals: &[Datum],
+) -> Result<BooleanArray, ArrowError> {
+ match op {
+ PredicateOperator::IsNull =>
Ok(boolean_mask_from_predicate(array.len(), |row_index| {
+ array.is_null(row_index)
+ })),
+ PredicateOperator::IsNotNull =>
Ok(boolean_mask_from_predicate(array.len(), |row_index| {
+ array.is_valid(row_index)
+ })),
+ PredicateOperator::In | PredicateOperator::NotIn => {
+ evaluate_set_membership_predicate(array, data_type, op, literals)
+ }
+ PredicateOperator::Eq
+ | PredicateOperator::NotEq
+ | PredicateOperator::Lt
+ | PredicateOperator::LtEq
+ | PredicateOperator::Gt
+ | PredicateOperator::GtEq => {
+ let Some(literal) = literals.first() else {
+ return Ok(BooleanArray::from(vec![true; array.len()]));
+ };
+ let Some(scalar) = literal_scalar_for_parquet_filter(literal,
data_type)
+ .map_err(|e| ArrowError::ComputeError(e.to_string()))?
+ else {
+ return Ok(BooleanArray::from(vec![true; array.len()]));
+ };
+ let result = evaluate_column_predicate(array, &scalar, op)?;
+ Ok(sanitize_filter_mask(result))
+ }
+ }
+}
+
+fn evaluate_set_membership_predicate(
+ array: &ArrayRef,
+ data_type: &DataType,
+ op: PredicateOperator,
+ literals: &[Datum],
+) -> Result<BooleanArray, ArrowError> {
+ if literals.is_empty() {
+ return Ok(match op {
+ PredicateOperator::In => BooleanArray::from(vec![false;
array.len()]),
+ PredicateOperator::NotIn => {
+ boolean_mask_from_predicate(array.len(), |row_index|
array.is_valid(row_index))
+ }
+ PredicateOperator::IsNull
+ | PredicateOperator::IsNotNull
+ | PredicateOperator::Eq
+ | PredicateOperator::NotEq
+ | PredicateOperator::Lt
+ | PredicateOperator::LtEq
+ | PredicateOperator::Gt
+ | PredicateOperator::GtEq => unreachable!(),
+ });
+ }
+
+ let mut combined = match op {
+ PredicateOperator::In => BooleanArray::from(vec![false; array.len()]),
+ PredicateOperator::NotIn => {
+ boolean_mask_from_predicate(array.len(), |row_index|
array.is_valid(row_index))
+ }
+ PredicateOperator::IsNull
+ | PredicateOperator::IsNotNull
+ | PredicateOperator::Eq
+ | PredicateOperator::NotEq
+ | PredicateOperator::Lt
+ | PredicateOperator::LtEq
+ | PredicateOperator::Gt
+ | PredicateOperator::GtEq => unreachable!(),
+ };
+
+ for literal in literals {
+ let Some(scalar) = literal_scalar_for_parquet_filter(literal,
data_type)
+ .map_err(|e| ArrowError::ComputeError(e.to_string()))?
+ else {
+ return Ok(BooleanArray::from(vec![true; array.len()]));
+ };
+ let comparison_op = match op {
+ PredicateOperator::In => PredicateOperator::Eq,
+ PredicateOperator::NotIn => PredicateOperator::NotEq,
+ PredicateOperator::IsNull
+ | PredicateOperator::IsNotNull
+ | PredicateOperator::Eq
+ | PredicateOperator::NotEq
+ | PredicateOperator::Lt
+ | PredicateOperator::LtEq
+ | PredicateOperator::Gt
+ | PredicateOperator::GtEq => unreachable!(),
+ };
+ let mask = sanitize_filter_mask(evaluate_column_predicate(array,
&scalar, comparison_op)?);
+ combined = combine_filter_masks(&combined, &mask, matches!(op,
PredicateOperator::In));
+ }
+
+ Ok(combined)
+}
+
+fn combine_filter_masks(left: &BooleanArray, right: &BooleanArray, use_or:
bool) -> BooleanArray {
+ debug_assert_eq!(left.len(), right.len());
+ boolean_mask_from_predicate(left.len(), |row_index| {
+ if use_or {
+ left.value(row_index) || right.value(row_index)
+ } else {
+ left.value(row_index) && right.value(row_index)
+ }
+ })
+}
+
+fn boolean_mask_from_predicate(
+ len: usize,
+ mut predicate: impl FnMut(usize) -> bool,
+) -> BooleanArray {
+ BooleanArray::from((0..len).map(&mut predicate).collect::<Vec<_>>())
+}
+
+struct ParquetRowGroupStats<'a> {
+ row_group: &'a RowGroupMetaData,
+ column_indices: &'a [Option<usize>],
+}
+
+impl StatsAccessor for ParquetRowGroupStats<'_> {
+ fn row_count(&self) -> i64 {
+ self.row_group.num_rows()
+ }
+
+ fn null_count(&self, index: usize) -> Option<i64> {
+ let _ = index;
+ // parquet::Statistics::null_count_opt() may return Some(0) even when
+ // the null-count statistic is absent, so treating it as authoritative
+ // would make IS NULL / IS NOT NULL pruning unsafe. Fail open here.
+ None
+ }
+
+ fn min_value(&self, index: usize, data_type: &DataType) -> Option<Datum> {
+ let column_index = self.column_indices.get(index).copied().flatten()?;
+ parquet_stats_to_datum(
+ self.row_group.column(column_index).statistics()?,
+ data_type,
+ true,
+ )
+ }
+
+ fn max_value(&self, index: usize, data_type: &DataType) -> Option<Datum> {
+ let column_index = self.column_indices.get(index).copied().flatten()?;
+ parquet_stats_to_datum(
+ self.row_group.column(column_index).statistics()?,
+ data_type,
+ false,
+ )
+ }
+}
+
+fn build_predicate_row_selection(
+ row_groups: &[RowGroupMetaData],
+ predicates: &[Predicate],
+ table_fields: &[DataField],
+ file_fields: &[DataField],
+) -> crate::Result<Option<RowSelection>> {
+ if predicates.is_empty() || row_groups.is_empty() {
+ return Ok(None);
+ }
+
+ let field_mapping = build_field_mapping(table_fields, file_fields);
+ let column_indices =
build_row_group_column_indices(row_groups[0].columns(), file_fields);
+ let mut selectors = Vec::with_capacity(row_groups.len());
+ let mut all_selected = true;
+
+ for row_group in row_groups {
+ let stats = ParquetRowGroupStats {
+ row_group,
+ column_indices: &column_indices,
+ };
+ let may_match =
+ predicates_may_match_with_schema(predicates, &stats,
&field_mapping, file_fields);
+ if !may_match {
+ all_selected = false;
+ }
+ selectors.push(if may_match {
+ RowSelector::select(row_group.num_rows() as usize)
+ } else {
+ RowSelector::skip(row_group.num_rows() as usize)
+ });
+ }
+
+ if all_selected {
+ Ok(None)
+ } else {
+ Ok(Some(selectors.into()))
+ }
+}
+
+fn build_parquet_row_filter(
+ parquet_schema: &parquet::schema::types::SchemaDescriptor,
+ predicates: &[Predicate],
+ table_fields: &[DataField],
+ file_fields: &[DataField],
+) -> crate::Result<Option<RowFilter>> {
+ // Keep decode-time filtering intentionally narrow for the submit-ready
+ // Parquet path. Unsupported predicate shapes or types remain residual
+ // filters above the reader.
+ if predicates.is_empty() {
+ return Ok(None);
+ }
+
+ let field_mapping = build_field_mapping(table_fields, file_fields);
+ let mut filters: Vec<Box<dyn ArrowPredicate>> = Vec::new();
+
+ for predicate in predicates {
+ if let Some(filter) =
+ build_parquet_arrow_predicate(parquet_schema, predicate,
&field_mapping, file_fields)?
+ {
+ filters.push(filter);
+ }
+ }
+
+ if filters.is_empty() {
+ Ok(None)
+ } else {
+ Ok(Some(RowFilter::new(filters)))
+ }
+}
+
+fn build_parquet_arrow_predicate(
+ parquet_schema: &parquet::schema::types::SchemaDescriptor,
+ predicate: &Predicate,
+ field_mapping: &[Option<usize>],
+ file_fields: &[DataField],
+) -> crate::Result<Option<Box<dyn ArrowPredicate>>> {
+ let Predicate::Leaf {
+ index,
+ data_type: _,
+ op,
+ literals,
+ ..
+ } = predicate
+ else {
+ return Ok(None);
+ };
+ if !predicate_supported_for_parquet_row_filter(*op) {
+ return Ok(None);
+ }
+
+ let Some(file_index) = field_mapping.get(*index).copied().flatten() else {
+ return Ok(None);
+ };
+ let Some(file_field) = file_fields.get(file_index) else {
+ return Ok(None);
+ };
+ let Some(root_index) = parquet_root_index(parquet_schema,
file_field.name()) else {
+ return Ok(None);
+ };
+ if !parquet_row_filter_literals_supported(*op, literals,
file_field.data_type())? {
+ return Ok(None);
+ }
+
+ let projection = ProjectionMask::roots(parquet_schema, [root_index]);
+ let op = *op;
+ let data_type = file_field.data_type().clone();
+ let literals = literals.to_vec();
+ Ok(Some(Box::new(ArrowPredicateFn::new(
+ projection,
+ move |batch: RecordBatch| {
+ let Some(column) = batch.columns().first() else {
+ return Ok(BooleanArray::new_null(batch.num_rows()));
+ };
+ evaluate_exact_leaf_predicate(column, &data_type, op, &literals)
+ },
+ ))))
+}
+
+fn predicate_supported_for_parquet_row_filter(op: PredicateOperator) -> bool {
+ matches!(
+ op,
+ PredicateOperator::IsNull
+ | PredicateOperator::IsNotNull
+ | PredicateOperator::Eq
+ | PredicateOperator::NotEq
+ | PredicateOperator::Lt
+ | PredicateOperator::LtEq
+ | PredicateOperator::Gt
+ | PredicateOperator::GtEq
+ | PredicateOperator::In
+ | PredicateOperator::NotIn
+ )
+}
+
+fn parquet_row_filter_literals_supported(
+ op: PredicateOperator,
+ literals: &[Datum],
+ file_data_type: &DataType,
+) -> crate::Result<bool> {
+ match op {
+ PredicateOperator::IsNull | PredicateOperator::IsNotNull => Ok(true),
+ PredicateOperator::Eq
+ | PredicateOperator::NotEq
+ | PredicateOperator::Lt
+ | PredicateOperator::LtEq
+ | PredicateOperator::Gt
+ | PredicateOperator::GtEq => {
+ let Some(literal) = literals.first() else {
+ return Ok(false);
+ };
+ Ok(literal_scalar_for_parquet_filter(literal,
file_data_type)?.is_some())
+ }
+ PredicateOperator::In | PredicateOperator::NotIn => {
+ for literal in literals {
+ if literal_scalar_for_parquet_filter(literal,
file_data_type)?.is_none() {
+ return Ok(false);
+ }
+ }
+ Ok(true)
+ }
+ }
+}
+
+fn parquet_root_index(
+ parquet_schema: &parquet::schema::types::SchemaDescriptor,
+ root_name: &str,
+) -> Option<usize> {
+ parquet_schema
+ .root_schema()
+ .get_fields()
+ .iter()
+ .position(|field| field.name() == root_name)
+}
+
+fn evaluate_column_predicate(
+ column: &ArrayRef,
+ scalar: &Scalar<ArrayRef>,
+ op: PredicateOperator,
+) -> Result<BooleanArray, ArrowError> {
+ match op {
+ PredicateOperator::Eq => arrow_eq(column, scalar),
+ PredicateOperator::NotEq => arrow_neq(column, scalar),
+ PredicateOperator::Lt => arrow_lt(column, scalar),
+ PredicateOperator::LtEq => arrow_lt_eq(column, scalar),
+ PredicateOperator::Gt => arrow_gt(column, scalar),
+ PredicateOperator::GtEq => arrow_gt_eq(column, scalar),
+ PredicateOperator::IsNull
+ | PredicateOperator::IsNotNull
+ | PredicateOperator::In
+ | PredicateOperator::NotIn => Ok(BooleanArray::new_null(column.len())),
+ }
+}
+
+fn literal_scalar_for_parquet_filter(
+ literal: &Datum,
+ file_data_type: &DataType,
+) -> crate::Result<Option<Scalar<ArrayRef>>> {
+ let array: ArrayRef = match file_data_type {
+ DataType::Boolean(_) => match literal {
+ Datum::Bool(value) =>
Arc::new(BooleanArray::new_scalar(*value).into_inner()),
+ _ => return Ok(None),
+ },
+ DataType::TinyInt(_) => {
+ match integer_literal(literal).and_then(|value|
i8::try_from(value).ok()) {
+ Some(value) =>
Arc::new(Int8Array::new_scalar(value).into_inner()),
+ None => return Ok(None),
+ }
+ }
+ DataType::SmallInt(_) => {
+ match integer_literal(literal).and_then(|value|
i16::try_from(value).ok()) {
+ Some(value) =>
Arc::new(Int16Array::new_scalar(value).into_inner()),
+ None => return Ok(None),
+ }
+ }
+ DataType::Int(_) => {
+ match integer_literal(literal).and_then(|value|
i32::try_from(value).ok()) {
+ Some(value) =>
Arc::new(Int32Array::new_scalar(value).into_inner()),
+ None => return Ok(None),
+ }
+ }
+ DataType::BigInt(_) => {
+ match integer_literal(literal).and_then(|value|
i64::try_from(value).ok()) {
+ Some(value) =>
Arc::new(Int64Array::new_scalar(value).into_inner()),
+ None => return Ok(None),
+ }
+ }
+ DataType::Float(_) => match float32_literal(literal) {
+ Some(value) =>
Arc::new(Float32Array::new_scalar(value).into_inner()),
+ None => return Ok(None),
+ },
+ DataType::Double(_) => match float64_literal(literal) {
+ Some(value) =>
Arc::new(Float64Array::new_scalar(value).into_inner()),
+ None => return Ok(None),
+ },
+ DataType::Char(_) | DataType::VarChar(_) => match literal {
+ Datum::String(value) =>
Arc::new(StringArray::new_scalar(value.as_str()).into_inner()),
+ _ => return Ok(None),
+ },
+ DataType::Binary(_) | DataType::VarBinary(_) => match literal {
+ Datum::Bytes(value) =>
Arc::new(BinaryArray::new_scalar(value.as_slice()).into_inner()),
+ _ => return Ok(None),
+ },
+ DataType::Date(_) => match literal {
+ Datum::Date(value) =>
Arc::new(Date32Array::new_scalar(*value).into_inner()),
+ _ => return Ok(None),
+ },
+ DataType::Decimal(decimal) => match literal {
+ Datum::Decimal {
+ unscaled,
+ precision,
+ scale,
+ } if *precision <= decimal.precision() && *scale ==
decimal.scale() => {
+ let precision =
+ u8::try_from(decimal.precision()).map_err(|_|
Error::Unsupported {
+ message: "Decimal precision exceeds Arrow decimal128
range".to_string(),
+ })?;
+ let scale =
+ i8::try_from(decimal.scale() as i32).map_err(|_|
Error::Unsupported {
+ message: "Decimal scale exceeds Arrow decimal128
range".to_string(),
+ })?;
+ Arc::new(
+ Decimal128Array::new_scalar(*unscaled)
+ .into_inner()
+ .with_precision_and_scale(precision, scale)
+ .map_err(|e| Error::UnexpectedError {
+ message: format!(
+ "Failed to build decimal scalar for parquet
row filter: {e}"
+ ),
+ source: Some(Box::new(e)),
+ })?,
+ )
+ }
+ _ => return Ok(None),
+ },
+ DataType::Time(_)
+ | DataType::Timestamp(_)
+ | DataType::LocalZonedTimestamp(_)
+ | DataType::Array(_)
+ | DataType::Map(_)
+ | DataType::Multiset(_)
+ | DataType::Row(_) => return Ok(None),
+ };
+
+ Ok(Some(Scalar::new(array)))
+}
+
+fn integer_literal(literal: &Datum) -> Option<i128> {
+ match literal {
+ Datum::TinyInt(value) => Some(i128::from(*value)),
+ Datum::SmallInt(value) => Some(i128::from(*value)),
+ Datum::Int(value) => Some(i128::from(*value)),
+ Datum::Long(value) => Some(i128::from(*value)),
+ _ => None,
+ }
+}
+
+fn float32_literal(literal: &Datum) -> Option<f32> {
+ match literal {
+ Datum::Float(value) => Some(*value),
+ Datum::Double(value) => {
+ let casted = *value as f32;
+ ((casted as f64) == *value).then_some(casted)
+ }
+ _ => None,
+ }
+}
+
+fn float64_literal(literal: &Datum) -> Option<f64> {
+ match literal {
+ Datum::Float(value) => Some(f64::from(*value)),
+ Datum::Double(value) => Some(*value),
+ _ => None,
+ }
+}
+
+fn build_row_group_column_indices(
+ columns: &[parquet::file::metadata::ColumnChunkMetaData],
+ file_fields: &[DataField],
+) -> Vec<Option<usize>> {
+ let mut by_root_name: HashMap<&str, Option<usize>> = HashMap::new();
+ for (column_index, column) in columns.iter().enumerate() {
+ let Some(root_name) = column.column_path().parts().first() else {
+ continue;
+ };
+ let entry = by_root_name
+ .entry(root_name.as_str())
+ .or_insert(Some(column_index));
+ if entry.is_some() && *entry != Some(column_index) {
+ *entry = None;
+ }
+ }
+
+ file_fields
+ .iter()
+ .map(|field| by_root_name.get(field.name()).copied().flatten())
+ .collect()
+}
+
+fn parquet_stats_to_datum(
+ stats: &ParquetStatistics,
+ data_type: &DataType,
+ is_min: bool,
+) -> Option<Datum> {
+ let exact = if is_min {
+ stats.min_is_exact()
+ } else {
+ stats.max_is_exact()
+ };
+ if !exact {
+ return None;
+ }
+
+ match (stats, data_type) {
+ (ParquetStatistics::Boolean(stats), DataType::Boolean(_)) => {
+ exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+ .copied()
+ .map(Datum::Bool)
+ }
+ (ParquetStatistics::Int32(stats), DataType::TinyInt(_)) => {
+ exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+ .and_then(|value| i8::try_from(*value).ok())
+ .map(Datum::TinyInt)
+ }
+ (ParquetStatistics::Int32(stats), DataType::SmallInt(_)) => {
+ exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+ .and_then(|value| i16::try_from(*value).ok())
+ .map(Datum::SmallInt)
+ }
+ (ParquetStatistics::Int32(stats), DataType::Int(_)) => {
+ exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+ .copied()
+ .map(Datum::Int)
+ }
+ (ParquetStatistics::Int32(stats), DataType::Date(_)) => {
+ exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+ .copied()
+ .map(Datum::Date)
+ }
+ (ParquetStatistics::Int32(stats), DataType::Time(_)) => {
+ exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+ .copied()
+ .map(Datum::Time)
+ }
+ (ParquetStatistics::Int64(stats), DataType::BigInt(_)) => {
+ exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+ .copied()
+ .map(Datum::Long)
+ }
+ (ParquetStatistics::Int64(stats), DataType::Timestamp(ts)) if
ts.precision() <= 3 => {
+ exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+ .copied()
+ .map(|millis| Datum::Timestamp { millis, nanos: 0 })
+ }
+ (ParquetStatistics::Int64(stats), DataType::LocalZonedTimestamp(ts))
+ if ts.precision() <= 3 =>
+ {
+ exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+ .copied()
+ .map(|millis| Datum::LocalZonedTimestamp { millis, nanos: 0 })
+ }
+ (ParquetStatistics::Float(stats), DataType::Float(_)) => {
+ exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+ .copied()
+ .map(Datum::Float)
+ }
+ (ParquetStatistics::Double(stats), DataType::Double(_)) => {
+ exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+ .copied()
+ .map(Datum::Double)
+ }
+ (ParquetStatistics::ByteArray(stats), DataType::Char(_))
+ | (ParquetStatistics::ByteArray(stats), DataType::VarChar(_)) => {
+ exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+ .and_then(|value| std::str::from_utf8(value.data()).ok())
+ .map(|value| Datum::String(value.to_string()))
+ }
+ (ParquetStatistics::ByteArray(stats), DataType::Binary(_))
+ | (ParquetStatistics::ByteArray(stats), DataType::VarBinary(_)) => {
+ exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+ .map(|value| Datum::Bytes(value.data().to_vec()))
+ }
+ (ParquetStatistics::FixedLenByteArray(stats), DataType::Binary(_))
+ | (ParquetStatistics::FixedLenByteArray(stats),
DataType::VarBinary(_)) => {
+ exact_parquet_value(is_min, stats.min_opt(), stats.max_opt())
+ .map(|value| Datum::Bytes(value.data().to_vec()))
+ }
+ _ => None,
+ }
+}
+
+fn exact_parquet_value<'a, T>(
+ is_min: bool,
+ min: Option<&'a T>,
+ max: Option<&'a T>,
+) -> Option<&'a T> {
+ if is_min {
+ min
+ } else {
+ max
+ }
+}
+
/// Builds a Parquet [RowSelection] from deletion vector.
/// Only rows not in the deletion vector are selected; deleted rows are
skipped at read time.
/// todo: Uses [DeletionVectorIterator] with
[advance_to](DeletionVectorIterator::advance_to) when skipping row groups
similar to iceberg-rust
@@ -772,3 +1480,55 @@ impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
})
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::build_parquet_row_filter;
+ use crate::spec::{DataField, DataType, Datum, IntType, PredicateBuilder};
+ use parquet::schema::{parser::parse_message_type, types::SchemaDescriptor};
+ use std::sync::Arc;
+
+ fn test_fields() -> Vec<DataField> {
+ vec![
+ DataField::new(0, "id".to_string(), DataType::Int(IntType::new())),
+ DataField::new(1, "score".to_string(),
DataType::Int(IntType::new())),
+ ]
+ }
+
+ fn test_parquet_schema() -> SchemaDescriptor {
+ SchemaDescriptor::new(Arc::new(
+ parse_message_type(
+ "
+ message test_schema {
+ OPTIONAL INT32 id;
+ OPTIONAL INT32 score;
+ }
+ ",
+ )
+ .expect("test schema should parse"),
+ ))
+ }
+
+ #[test]
+ fn test_build_parquet_row_filter_supports_null_and_membership_predicates()
{
+ let fields = test_fields();
+ let builder = PredicateBuilder::new(&fields);
+ let predicates = vec![
+ builder
+ .is_null("id")
+ .expect("is null predicate should build"),
+ builder
+ .is_in("score", vec![Datum::Int(7)])
+ .expect("in predicate should build"),
+ builder
+ .is_not_in("score", vec![Datum::Int(9)])
+ .expect("not in predicate should build"),
+ ];
+
+ let row_filter =
+ build_parquet_row_filter(&test_parquet_schema(), &predicates,
&fields, &fields)
+ .expect("parquet row filter should build");
+
+ assert!(row_filter.is_some());
+ }
+}
diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
index aba327f..4b0ffab 100644
--- a/crates/paimon/src/io/file_io.rs
+++ b/crates/paimon/src/io/file_io.rs
@@ -54,15 +54,22 @@ impl FileIO {
///
/// Otherwise will return parsing error.
pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
- let url = Url::parse(path.as_ref())
- .map_err(|_| Error::ConfigInvalid {
- message: format!("Invalid URL: {}", path.as_ref()),
- })
- .or_else(|_| {
- Url::from_file_path(path.as_ref()).map_err(|_|
Error::ConfigInvalid {
- message: format!("Input {} is neither a valid url nor
path", path.as_ref()),
+ let path = path.as_ref();
+ let url = if looks_like_windows_drive_path(path) {
+ Url::from_file_path(path).map_err(|_| Error::ConfigInvalid {
+ message: format!("Input {path} is neither a valid url nor
path"),
+ })?
+ } else {
+ Url::parse(path)
+ .map_err(|_| Error::ConfigInvalid {
+ message: format!("Invalid URL: {path}"),
})
- })?;
+ .or_else(|_| {
+ Url::from_file_path(path).map_err(|_| Error::ConfigInvalid
{
+ message: format!("Input {path} is neither a valid url
nor path"),
+ })
+ })?
+ };
Ok(FileIOBuilder::new(url.scheme()))
}
@@ -223,6 +230,14 @@ impl FileIO {
}
}
+fn looks_like_windows_drive_path(path: &str) -> bool {
+ let bytes = path.as_bytes();
+ bytes.len() >= 3
+ && bytes[0].is_ascii_alphabetic()
+ && bytes[1] == b':'
+ && matches!(bytes[2], b'\\' | b'/')
+}
+
#[derive(Debug)]
pub struct FileIOBuilder {
scheme_str: Option<String>,
@@ -385,6 +400,7 @@ impl OutputFile {
mod file_action_test {
use std::collections::BTreeSet;
use std::fs;
+ use tempfile::tempdir;
use super::*;
use bytes::Bytes;
@@ -397,6 +413,15 @@ mod file_action_test {
FileIOBuilder::new("file").build().unwrap()
}
+ fn local_file_path(path: &std::path::Path) -> String {
+ let normalized = path.to_string_lossy().replace('\\', "/");
+ if normalized.starts_with('/') {
+ format!("file:{normalized}")
+ } else {
+ format!("file:/{normalized}")
+ }
+ }
+
async fn common_test_get_status(file_io: &FileIO, path: &str) {
let output = file_io.new_output(path).unwrap();
let mut writer = output.writer().await.unwrap();
@@ -593,6 +618,27 @@ mod file_action_test {
let file_io = setup_fs_file_io();
common_test_list_status_paths(&file_io,
"file:/tmp/test_list_status_paths_fs/").await;
}
+
+ #[test]
+ fn test_from_path_detects_local_fs_path() {
+ let dir = tempdir().unwrap();
+ let file_io = FileIO::from_path(dir.path().to_string_lossy())
+ .unwrap()
+ .build()
+ .unwrap();
+ let path =
local_file_path(&dir.path().join("from_path_detects_local_fs_path.txt"));
+
+ let rt = tokio::runtime::Runtime::new().unwrap();
+ rt.block_on(async {
+ file_io
+ .new_output(&path)
+ .unwrap()
+ .write(Bytes::from("data"))
+ .await
+ .unwrap();
+ assert!(file_io.exists(&path).await.unwrap());
+ });
+ }
}
#[cfg(test)]
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs
index 082f8f9..fe340f3 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/lib.rs
@@ -30,6 +30,7 @@ pub mod catalog;
mod deletion_vector;
pub mod file_index;
pub mod io;
+mod predicate_stats;
pub mod spec;
pub mod table;
diff --git a/crates/paimon/src/predicate_stats.rs
b/crates/paimon/src/predicate_stats.rs
new file mode 100644
index 0000000..cb44c72
--- /dev/null
+++ b/crates/paimon/src/predicate_stats.rs
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::spec::{DataField, DataType, Datum, Predicate, PredicateOperator};
+use std::cmp::Ordering;
+
+pub(crate) trait StatsAccessor {
+ fn row_count(&self) -> i64;
+ fn null_count(&self, index: usize) -> Option<i64>;
+ fn min_value(&self, index: usize, data_type: &DataType) -> Option<Datum>;
+ fn max_value(&self, index: usize, data_type: &DataType) -> Option<Datum>;
+}
+
+pub(crate) fn predicates_may_match_with_schema<T: StatsAccessor>(
+ predicates: &[Predicate],
+ stats: &T,
+ field_mapping: &[Option<usize>],
+ file_fields: &[DataField],
+) -> bool {
+ predicates.iter().all(|predicate| {
+ predicate_may_match_with_schema(predicate, stats, field_mapping,
file_fields)
+ })
+}
+
+pub(crate) fn data_leaf_may_match<T: StatsAccessor>(
+ index: usize,
+ stats_data_type: &DataType,
+ predicate_data_type: &DataType,
+ op: PredicateOperator,
+ literals: &[Datum],
+ stats: &T,
+) -> bool {
+ let row_count = stats.row_count();
+ if row_count <= 0 {
+ return false;
+ }
+
+ let null_count = stats.null_count(index);
+ let all_null = null_count.map(|count| count == row_count);
+
+ match op {
+ PredicateOperator::IsNull => {
+ return null_count.is_none_or(|count| count > 0);
+ }
+ PredicateOperator::IsNotNull => {
+ return all_null != Some(true);
+ }
+ PredicateOperator::In | PredicateOperator::NotIn => {
+ return true;
+ }
+ PredicateOperator::Eq
+ | PredicateOperator::NotEq
+ | PredicateOperator::Lt
+ | PredicateOperator::LtEq
+ | PredicateOperator::Gt
+ | PredicateOperator::GtEq => {}
+ }
+
+ if all_null == Some(true) {
+ return false;
+ }
+
+ let literal = match literals.first() {
+ Some(literal) => literal,
+ None => return true,
+ };
+
+ let min_value = match stats
+ .min_value(index, stats_data_type)
+ .and_then(|datum| coerce_stats_datum_for_predicate(datum,
predicate_data_type))
+ {
+ Some(value) => value,
+ None => return true,
+ };
+ let max_value = match stats
+ .max_value(index, stats_data_type)
+ .and_then(|datum| coerce_stats_datum_for_predicate(datum,
predicate_data_type))
+ {
+ Some(value) => value,
+ None => return true,
+ };
+
+ match op {
+ PredicateOperator::Eq => {
+ !matches!(literal.partial_cmp(&min_value), Some(Ordering::Less))
+ && !matches!(literal.partial_cmp(&max_value),
Some(Ordering::Greater))
+ }
+ PredicateOperator::NotEq => !(min_value == *literal && max_value ==
*literal),
+ PredicateOperator::Lt => !matches!(
+ min_value.partial_cmp(literal),
+ Some(Ordering::Greater | Ordering::Equal)
+ ),
+ PredicateOperator::LtEq => {
+ !matches!(min_value.partial_cmp(literal), Some(Ordering::Greater))
+ }
+ PredicateOperator::Gt => !matches!(
+ max_value.partial_cmp(literal),
+ Some(Ordering::Less | Ordering::Equal)
+ ),
+ PredicateOperator::GtEq => !matches!(max_value.partial_cmp(literal),
Some(Ordering::Less)),
+ PredicateOperator::IsNull
+ | PredicateOperator::IsNotNull
+ | PredicateOperator::In
+ | PredicateOperator::NotIn => true,
+ }
+}
+
+pub(crate) fn missing_field_may_match(op: PredicateOperator, row_count: i64)
-> bool {
+ if row_count <= 0 {
+ return false;
+ }
+
+ matches!(op, PredicateOperator::IsNull)
+}
+
+fn predicate_may_match_with_schema<T: StatsAccessor>(
+ predicate: &Predicate,
+ stats: &T,
+ field_mapping: &[Option<usize>],
+ file_fields: &[DataField],
+) -> bool {
+ match predicate {
+ Predicate::AlwaysTrue => true,
+ Predicate::AlwaysFalse => false,
+ Predicate::And(children) => children
+ .iter()
+ .all(|child| predicate_may_match_with_schema(child, stats,
field_mapping, file_fields)),
+ Predicate::Or(_) | Predicate::Not(_) => true,
+ Predicate::Leaf {
+ index,
+ data_type,
+ op,
+ literals,
+ ..
+ } => match field_mapping.get(*index).copied().flatten() {
+ Some(file_index) => {
+ let Some(file_field) = file_fields.get(file_index) else {
+ return true;
+ };
+ data_leaf_may_match(
+ file_index,
+ file_field.data_type(),
+ data_type,
+ *op,
+ literals,
+ stats,
+ )
+ }
+ None => missing_field_may_match(*op, stats.row_count()),
+ },
+ }
+}
+
+fn coerce_stats_datum_for_predicate(datum: Datum, predicate_data_type:
&DataType) -> Option<Datum> {
+ match (datum, predicate_data_type) {
+ (datum @ Datum::Bool(_), DataType::Boolean(_))
+ | (datum @ Datum::TinyInt(_), DataType::TinyInt(_))
+ | (datum @ Datum::SmallInt(_), DataType::SmallInt(_))
+ | (datum @ Datum::Int(_), DataType::Int(_))
+ | (datum @ Datum::Long(_), DataType::BigInt(_))
+ | (datum @ Datum::Float(_), DataType::Float(_))
+ | (datum @ Datum::Double(_), DataType::Double(_))
+ | (datum @ Datum::String(_), DataType::VarChar(_))
+ | (datum @ Datum::String(_), DataType::Char(_))
+ | (datum @ Datum::Bytes(_), DataType::Binary(_))
+ | (datum @ Datum::Bytes(_), DataType::VarBinary(_))
+ | (datum @ Datum::Date(_), DataType::Date(_))
+ | (datum @ Datum::Time(_), DataType::Time(_))
+ | (datum @ Datum::Timestamp { .. }, DataType::Timestamp(_))
+ | (datum @ Datum::LocalZonedTimestamp { .. },
DataType::LocalZonedTimestamp(_))
+ | (datum @ Datum::Decimal { .. }, DataType::Decimal(_)) => Some(datum),
+ (Datum::TinyInt(value), DataType::SmallInt(_)) =>
Some(Datum::SmallInt(value as i16)),
+ (Datum::TinyInt(value), DataType::Int(_)) => Some(Datum::Int(value as
i32)),
+ (Datum::TinyInt(value), DataType::BigInt(_)) => Some(Datum::Long(value
as i64)),
+ (Datum::SmallInt(value), DataType::Int(_)) => Some(Datum::Int(value as
i32)),
+ (Datum::SmallInt(value), DataType::BigInt(_)) =>
Some(Datum::Long(value as i64)),
+ (Datum::Int(value), DataType::BigInt(_)) => Some(Datum::Long(value as
i64)),
+ (Datum::Float(value), DataType::Double(_)) => Some(Datum::Double(value
as f64)),
+ _ => None,
+ }
+}
diff --git a/crates/paimon/src/table/read_builder.rs
b/crates/paimon/src/table/read_builder.rs
index d55ecc1..4a405ea 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -20,13 +20,81 @@
//! Reference: [Java
ReadBuilder.withProjection](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java)
//! and
[TypeUtils.project](https://github.com/apache/paimon/blob/master/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java).
+use super::bucket_filter::{extract_predicate_for_keys,
split_partition_and_data_predicates};
use super::{ArrowRecordBatchStream, Table, TableScan};
+use crate::arrow::filtering::reader_pruning_predicates;
use crate::arrow::ArrowReaderBuilder;
use crate::spec::{CoreOptions, DataField, Predicate};
use crate::Result;
use crate::{DataSplit, Error};
use std::collections::{HashMap, HashSet};
+#[derive(Debug, Clone, Default)]
+struct NormalizedFilter {
+ partition_predicate: Option<Predicate>,
+ data_predicates: Vec<Predicate>,
+ bucket_predicate: Option<Predicate>,
+}
+
+fn split_scan_predicates(table: &Table, filter: Predicate) ->
(Option<Predicate>, Vec<Predicate>) {
+ let partition_keys = table.schema().partition_keys();
+ if partition_keys.is_empty() {
+ (None, filter.split_and())
+ } else {
+ split_partition_and_data_predicates(filter, table.schema().fields(),
partition_keys)
+ }
+}
+
+fn bucket_predicate(table: &Table, filter: &Predicate) -> Option<Predicate> {
+ let core_options = CoreOptions::new(table.schema().options());
+ if !core_options.is_default_bucket_function() {
+ return None;
+ }
+
+ let bucket_keys = core_options.bucket_key().unwrap_or_else(|| {
+ if table.schema().primary_keys().is_empty() {
+ Vec::new()
+ } else {
+ table
+ .schema()
+ .primary_keys()
+ .iter()
+ .map(|key| key.to_string())
+ .collect()
+ }
+ });
+ if bucket_keys.is_empty() {
+ return None;
+ }
+
+ let has_all_bucket_fields = bucket_keys.iter().all(|key| {
+ table
+ .schema()
+ .fields()
+ .iter()
+ .any(|field| field.name() == key)
+ });
+ if !has_all_bucket_fields {
+ return None;
+ }
+
+ extract_predicate_for_keys(filter, table.schema().fields(), &bucket_keys)
+}
+
+fn normalize_filter(table: &Table, filter: Predicate) -> NormalizedFilter {
+ let (partition_predicate, data_predicates) = split_scan_predicates(table,
filter.clone());
+ NormalizedFilter {
+ partition_predicate,
+ data_predicates,
+ bucket_predicate: bucket_predicate(table, &filter),
+ }
+}
+
+fn read_data_predicates(table: &Table, filter: Predicate) -> Vec<Predicate> {
+ let (_, data_predicates) = split_scan_predicates(table, filter);
+ reader_pruning_predicates(data_predicates)
+}
+
/// Builder for table scan and table read (new_scan, new_read).
///
/// Rust keeps a names-based projection API for ergonomics, while aligning the
@@ -35,7 +103,7 @@ use std::collections::{HashMap, HashSet};
pub struct ReadBuilder<'a> {
table: &'a Table,
projected_fields: Option<Vec<String>>,
- filter: Option<Predicate>,
+ filter: NormalizedFilter,
limit: Option<usize>,
}
@@ -44,7 +112,7 @@ impl<'a> ReadBuilder<'a> {
Self {
table,
projected_fields: None,
- filter: None,
+ filter: NormalizedFilter::default(),
limit: None,
}
}
@@ -57,7 +125,7 @@ impl<'a> ReadBuilder<'a> {
self
}
- /// Set a filter predicate for scan planning.
+ /// Set a filter predicate for scan planning and conservative read pruning.
///
/// The predicate should use table schema field indices (as produced by
/// [`PredicateBuilder`]). During [`TableScan::plan`], partition-only
@@ -67,10 +135,13 @@ impl<'a> ReadBuilder<'a> {
/// Stats pruning is per file. Files with a different `schema_id`,
/// incompatible stats layout, or inconclusive stats are kept.
///
- /// [`TableRead`] does not evaluate row-level filters; callers must apply
- /// any remaining predicates themselves.
+ /// [`TableRead`] may use supported non-partition data predicates only on
+ /// the regular Parquet read path for conservative row-group pruning and
+ /// native Parquet row filtering. Unsupported predicates, non-Parquet
+ /// reads, and data-evolution reads remain residual and should still be
+ /// applied by the caller if exact filtering semantics are required.
pub fn with_filter(&mut self, filter: Predicate) -> &mut Self {
- self.filter = Some(filter);
+ self.filter = normalize_filter(self.table, filter);
self
}
@@ -89,7 +160,13 @@ impl<'a> ReadBuilder<'a> {
/// Create a table scan. Call [TableScan::plan] to get splits.
pub fn new_scan(&self) -> TableScan<'a> {
- TableScan::new(self.table, self.filter.clone(), self.limit)
+ TableScan::new(
+ self.table,
+ self.filter.partition_predicate.clone(),
+ self.filter.data_predicates.clone(),
+ self.filter.bucket_predicate.clone(),
+ self.limit,
+ )
}
/// Create a table read for consuming splits (e.g. from a scan plan).
@@ -99,7 +176,11 @@ impl<'a> ReadBuilder<'a> {
Some(projected) => self.resolve_projected_fields(projected)?,
};
- Ok(TableRead::new(self.table, read_type))
+ Ok(TableRead::new(
+ self.table,
+ read_type,
+ reader_pruning_predicates(self.filter.data_predicates.clone()),
+ ))
}
fn resolve_projected_fields(&self, projected_fields: &[String]) ->
Result<Vec<DataField>> {
@@ -146,12 +227,21 @@ impl<'a> ReadBuilder<'a> {
pub struct TableRead<'a> {
table: &'a Table,
read_type: Vec<DataField>,
+ data_predicates: Vec<Predicate>,
}
impl<'a> TableRead<'a> {
/// Create a new TableRead with a specific read type (projected fields).
- pub fn new(table: &'a Table, read_type: Vec<DataField>) -> Self {
- Self { table, read_type }
+ pub fn new(
+ table: &'a Table,
+ read_type: Vec<DataField>,
+ data_predicates: Vec<Predicate>,
+ ) -> Self {
+ Self {
+ table,
+ read_type,
+ data_predicates,
+ }
}
/// Schema (fields) that this read will produce.
@@ -164,6 +254,19 @@ impl<'a> TableRead<'a> {
self.table
}
+ /// Set a filter predicate for conservative read-side pruning.
+ ///
+ /// This is the direct-`TableRead` equivalent of
[`ReadBuilder::with_filter`].
+ /// Supported non-partition data predicates may be used only on the regular
+ /// Parquet read path for row-group pruning and native Parquet row
+ /// filtering. Callers should still keep residual filtering at the query
+ /// layer for unsupported predicates, non-Parquet files, and data-evolution
+ /// reads.
+ pub fn with_filter(mut self, filter: Predicate) -> Self {
+ self.data_predicates = read_data_predicates(self.table, filter);
+ self
+ }
+
/// Returns an [`ArrowRecordBatchStream`].
pub fn to_arrow(&self, data_splits: &[DataSplit]) ->
crate::Result<ArrowRecordBatchStream> {
// todo: consider get read batch size from table
@@ -186,6 +289,8 @@ impl<'a> TableRead<'a> {
self.table.schema_manager().clone(),
self.table.schema().id(),
)
+ .with_predicates(self.data_predicates.clone())
+ .with_table_fields(self.table.schema.fields().to_vec())
.build(self.read_type().to_vec());
if data_evolution {
@@ -195,3 +300,273 @@ impl<'a> TableRead<'a> {
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::TableRead;
+ mod test_utils {
+ include!(concat!(env!("CARGO_MANIFEST_DIR"), "/../test_utils.rs"));
+ }
+
+ use crate::catalog::Identifier;
+ use crate::io::FileIOBuilder;
+ use crate::spec::{
+ BinaryRow, DataType, IntType, Predicate, PredicateBuilder, Schema,
TableSchema, VarCharType,
+ };
+ use crate::table::{DataSplitBuilder, Table};
+ use arrow_array::{Int32Array, RecordBatch};
+ use futures::TryStreamExt;
+ use std::fs;
+ use tempfile::tempdir;
+ use test_utils::{local_file_path, test_data_file, write_int_parquet_file};
+
+ fn collect_int_column(batches: &[RecordBatch], column_name: &str) ->
Vec<i32> {
+ batches
+ .iter()
+ .flat_map(|batch| {
+ let column_index =
batch.schema().index_of(column_name).unwrap();
+ let array = batch.column(column_index);
+ let values =
array.as_any().downcast_ref::<Int32Array>().unwrap();
+ (0..values.len())
+ .map(|index| values.value(index))
+ .collect::<Vec<_>>()
+ })
+ .collect()
+ }
+
+ #[tokio::test]
+ async fn
test_new_read_pushes_filter_to_reader_when_filter_column_not_projected() {
+ let tempdir = tempdir().unwrap();
+ let table_path = local_file_path(tempdir.path());
+ let bucket_dir = tempdir.path().join("bucket-0");
+ fs::create_dir_all(&bucket_dir).unwrap();
+
+ let parquet_path = bucket_dir.join("data.parquet");
+ write_int_parquet_file(
+ &parquet_path,
+ vec![("id", vec![1, 2, 3, 4]), ("value", vec![1, 2, 20, 30])],
+ Some(2),
+ );
+
+ let file_io = FileIOBuilder::new("file").build().unwrap();
+ let table_schema = TableSchema::new(
+ 0,
+ &Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .build()
+ .unwrap(),
+ );
+ let table = Table::new(
+ file_io,
+ Identifier::new("default", "t"),
+ table_path,
+ table_schema,
+ );
+
+ let split = DataSplitBuilder::new()
+ .with_snapshot(1)
+ .with_partition(BinaryRow::new(0))
+ .with_bucket(0)
+ .with_bucket_path(local_file_path(&bucket_dir))
+ .with_total_buckets(1)
+ .with_data_files(vec![test_data_file("data.parquet", 4)])
+ .with_raw_convertible(true)
+ .build()
+ .unwrap();
+
+ let predicate = PredicateBuilder::new(table.schema().fields())
+ .greater_or_equal("value", crate::spec::Datum::Int(10))
+ .unwrap();
+
+ let mut builder = table.new_read_builder();
+ builder.with_projection(&["id"]).with_filter(predicate);
+ let read = builder.new_read().unwrap();
+ let batches = read
+ .to_arrow(&[split])
+ .unwrap()
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap();
+
+ assert_eq!(collect_int_column(&batches, "id"), vec![3, 4]);
+ }
+
+ #[tokio::test]
+ async fn test_direct_table_read_with_filter_pushes_filter_to_reader() {
+ let tempdir = tempdir().unwrap();
+ let table_path = local_file_path(tempdir.path());
+ let bucket_dir = tempdir.path().join("bucket-0");
+ fs::create_dir_all(&bucket_dir).unwrap();
+
+ let parquet_path = bucket_dir.join("data.parquet");
+ write_int_parquet_file(
+ &parquet_path,
+ vec![("id", vec![1, 2, 3, 4]), ("value", vec![1, 2, 20, 30])],
+ Some(2),
+ );
+
+ let file_io = FileIOBuilder::new("file").build().unwrap();
+ let table_schema = TableSchema::new(
+ 0,
+ &Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .build()
+ .unwrap(),
+ );
+ let table = Table::new(
+ file_io,
+ Identifier::new("default", "t"),
+ table_path,
+ table_schema,
+ );
+
+ let split = DataSplitBuilder::new()
+ .with_snapshot(1)
+ .with_partition(BinaryRow::new(0))
+ .with_bucket(0)
+ .with_bucket_path(local_file_path(&bucket_dir))
+ .with_total_buckets(1)
+ .with_data_files(vec![test_data_file("data.parquet", 4)])
+ .with_raw_convertible(true)
+ .build()
+ .unwrap();
+
+ let predicate = PredicateBuilder::new(table.schema().fields())
+ .greater_or_equal("value", crate::spec::Datum::Int(10))
+ .unwrap();
+ let read = TableRead::new(&table,
vec![table.schema().fields()[0].clone()], Vec::new())
+ .with_filter(predicate);
+ let batches = read
+ .to_arrow(&[split])
+ .unwrap()
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap();
+
+ assert_eq!(collect_int_column(&batches, "id"), vec![3, 4]);
+ }
+
+ #[tokio::test]
+ async fn test_new_read_row_filter_filters_rows_within_matching_row_group()
{
+ let tempdir = tempdir().unwrap();
+ let table_path = local_file_path(tempdir.path());
+ let bucket_dir = tempdir.path().join("bucket-0");
+ fs::create_dir_all(&bucket_dir).unwrap();
+
+ let parquet_path = bucket_dir.join("data.parquet");
+ write_int_parquet_file(
+ &parquet_path,
+ vec![("id", vec![1, 2, 3, 4]), ("value", vec![5, 20, 30, 40])],
+ Some(2),
+ );
+
+ let file_io = FileIOBuilder::new("file").build().unwrap();
+ let table_schema = TableSchema::new(
+ 0,
+ &Schema::builder()
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .build()
+ .unwrap(),
+ );
+ let table = Table::new(
+ file_io,
+ Identifier::new("default", "t"),
+ table_path,
+ table_schema,
+ );
+
+ let split = DataSplitBuilder::new()
+ .with_snapshot(1)
+ .with_partition(BinaryRow::new(0))
+ .with_bucket(0)
+ .with_bucket_path(local_file_path(&bucket_dir))
+ .with_total_buckets(1)
+ .with_data_files(vec![test_data_file("data.parquet", 4)])
+ .with_raw_convertible(true)
+ .build()
+ .unwrap();
+
+ let predicate = PredicateBuilder::new(table.schema().fields())
+ .greater_or_equal("value", crate::spec::Datum::Int(10))
+ .unwrap();
+
+ let mut builder = table.new_read_builder();
+ builder.with_projection(&["id"]).with_filter(predicate);
+ let read = builder.new_read().unwrap();
+ let batches = read
+ .to_arrow(&[split])
+ .unwrap()
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap();
+
+ assert_eq!(collect_int_column(&batches, "id"), vec![2, 3, 4]);
+ }
+
+ #[tokio::test]
+ async fn test_reader_pruning_ignores_partition_conjuncts() {
+ let tempdir = tempdir().unwrap();
+ let table_path = local_file_path(tempdir.path());
+ let bucket_dir = tempdir.path().join("dt=2024-01-01").join("bucket-0");
+ fs::create_dir_all(&bucket_dir).unwrap();
+
+ write_int_parquet_file(
+ &bucket_dir.join("data.parquet"),
+ vec![("id", vec![1, 2, 3, 4]), ("value", vec![1, 2, 20, 30])],
+ Some(2),
+ );
+
+ let file_io = FileIOBuilder::new("file").build().unwrap();
+ let table_schema = TableSchema::new(
+ 0,
+ &Schema::builder()
+ .column("dt", DataType::VarChar(VarCharType::string_type()))
+ .column("id", DataType::Int(IntType::new()))
+ .column("value", DataType::Int(IntType::new()))
+ .partition_keys(["dt"])
+ .build()
+ .unwrap(),
+ );
+ let table = Table::new(
+ file_io,
+ Identifier::new("default", "t"),
+ table_path,
+ table_schema,
+ );
+
+ let split = DataSplitBuilder::new()
+ .with_snapshot(1)
+ .with_partition(BinaryRow::new(1))
+ .with_bucket(0)
+ .with_bucket_path(local_file_path(&bucket_dir))
+ .with_total_buckets(1)
+ .with_data_files(vec![test_data_file("data.parquet", 4)])
+ .with_raw_convertible(true)
+ .build()
+ .unwrap();
+
+ let predicate = Predicate::and(vec![
+ PredicateBuilder::new(table.schema().fields())
+ .equal("dt",
crate::spec::Datum::String("2024-01-01".to_string()))
+ .unwrap(),
+ PredicateBuilder::new(table.schema().fields())
+ .greater_or_equal("value", crate::spec::Datum::Int(10))
+ .unwrap(),
+ ]);
+
+ let mut builder = table.new_read_builder();
+ builder.with_projection(&["id"]).with_filter(predicate);
+ let read = builder.new_read().unwrap();
+ let batches = read
+ .to_arrow(&[split])
+ .unwrap()
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap();
+
+ assert_eq!(collect_int_column(&batches, "id"), vec![3, 4]);
+ }
+}
diff --git a/crates/paimon/src/table/stats_filter.rs
b/crates/paimon/src/table/stats_filter.rs
index 171b00d..9796bf4 100644
--- a/crates/paimon/src/table/stats_filter.rs
+++ b/crates/paimon/src/table/stats_filter.rs
@@ -19,11 +19,10 @@
use super::Table;
use crate::arrow::schema_evolution::create_index_mapping;
-use crate::spec::{
- extract_datum, BinaryRow, DataField, DataFileMeta, DataType, Datum,
Predicate,
- PredicateOperator,
+use crate::predicate_stats::{
+ data_leaf_may_match, missing_field_may_match,
predicates_may_match_with_schema, StatsAccessor,
};
-use std::cmp::Ordering;
+use crate::spec::{extract_datum, BinaryRow, DataField, DataFileMeta, DataType,
Datum, Predicate};
use std::collections::HashMap;
use std::sync::Arc;
@@ -97,11 +96,36 @@ impl FileStatsRows {
}
}
- fn null_count(&self, stats_index: usize) -> Option<i64> {
+ fn stats_null_count(&self, stats_index: usize) -> Option<i64> {
self.null_counts.get(stats_index).copied().flatten()
}
}
+impl StatsAccessor for FileStatsRows {
+ fn row_count(&self) -> i64 {
+ self.row_count
+ }
+
+ fn null_count(&self, index: usize) -> Option<i64> {
+ let stats_index = self.stats_index(index)?;
+ self.stats_null_count(stats_index)
+ }
+
+ fn min_value(&self, index: usize, data_type: &DataType) -> Option<Datum> {
+ let stats_index = self.stats_index(index)?;
+ self.min_values
+ .as_ref()
+ .and_then(|row| extract_stats_datum(row, stats_index, data_type))
+ }
+
+ fn max_value(&self, index: usize, data_type: &DataType) -> Option<Datum> {
+ let stats_index = self.stats_index(index)?;
+ self.max_values
+ .as_ref()
+ .and_then(|row| extract_stats_datum(row, stats_index, data_type))
+ }
+}
+
#[derive(Debug)]
pub(super) struct ResolvedStatsSchema {
file_fields: Vec<DataField>,
@@ -156,10 +180,8 @@ pub(super) fn data_file_matches_predicates(
}
let stats = FileStatsRows::from_data_file(file, schema_fields);
-
- predicates
- .iter()
- .all(|predicate| data_predicate_may_match(predicate, &stats))
+ let field_mapping = identity_field_mapping(schema_fields.len());
+ predicates_may_match_with_schema(predicates, &stats, &field_mapping,
schema_fields)
}
async fn resolve_stats_schema(
@@ -218,203 +240,12 @@ pub(super) async fn
data_file_matches_predicates_for_table(
};
let stats = FileStatsRows::from_data_file(file, &resolved.file_fields);
-
- predicates.iter().all(|predicate| {
- data_predicate_may_match_with_schema(
- predicate,
- &stats,
- &resolved.field_mapping,
- &resolved.file_fields,
- )
- })
-}
-
-fn data_predicate_may_match(predicate: &Predicate, stats: &FileStatsRows) ->
bool {
- match predicate {
- Predicate::AlwaysTrue => true,
- Predicate::AlwaysFalse => false,
- Predicate::And(children) => children
- .iter()
- .all(|child| data_predicate_may_match(child, stats)),
- Predicate::Or(_) | Predicate::Not(_) => true,
- Predicate::Leaf {
- index,
- data_type,
- op,
- literals,
- ..
- } => {
- let Some(stats_idx) = stats.stats_index(*index) else {
- return true;
- };
- data_leaf_may_match(stats_idx, data_type, data_type, *op,
literals, stats)
- }
- }
-}
-
-fn data_predicate_may_match_with_schema(
- predicate: &Predicate,
- stats: &FileStatsRows,
- field_mapping: &[Option<usize>],
- file_fields: &[DataField],
-) -> bool {
- match predicate {
- Predicate::AlwaysTrue => true,
- Predicate::AlwaysFalse => false,
- Predicate::And(children) => children.iter().all(|child| {
- data_predicate_may_match_with_schema(child, stats, field_mapping,
file_fields)
- }),
- Predicate::Or(_) | Predicate::Not(_) => true,
- Predicate::Leaf {
- index,
- data_type,
- op,
- literals,
- ..
- } => match field_mapping.get(*index).copied().flatten() {
- Some(file_index) => {
- let Some(file_field) = file_fields.get(file_index) else {
- return true;
- };
- let Some(stats_idx) = stats.stats_index(file_index) else {
- return true;
- };
- data_leaf_may_match(
- stats_idx,
- file_field.data_type(),
- data_type,
- *op,
- literals,
- stats,
- )
- }
- None => missing_field_may_match(*op, stats.row_count),
- },
- }
-}
-
-pub(super) fn data_leaf_may_match(
- index: usize,
- stats_data_type: &DataType,
- predicate_data_type: &DataType,
- op: PredicateOperator,
- literals: &[Datum],
- stats: &FileStatsRows,
-) -> bool {
- let row_count = stats.row_count;
- if row_count <= 0 {
- return false;
- }
-
- let null_count = stats.null_count(index);
- let all_null = null_count.map(|count| count == row_count);
-
- match op {
- PredicateOperator::IsNull => {
- return null_count.is_none_or(|count| count > 0);
- }
- PredicateOperator::IsNotNull => {
- return all_null != Some(true);
- }
- PredicateOperator::In | PredicateOperator::NotIn => {
- return true;
- }
- PredicateOperator::Eq
- | PredicateOperator::NotEq
- | PredicateOperator::Lt
- | PredicateOperator::LtEq
- | PredicateOperator::Gt
- | PredicateOperator::GtEq => {}
- }
-
- if all_null == Some(true) {
- return false;
- }
-
- let literal = match literals.first() {
- Some(literal) => literal,
- None => return true,
- };
-
- let min_value = match stats
- .min_values
- .as_ref()
- .and_then(|row| extract_stats_datum(row, index, stats_data_type))
- .and_then(|datum| coerce_stats_datum_for_predicate(datum,
predicate_data_type))
- {
- Some(value) => value,
- None => return true,
- };
- let max_value = match stats
- .max_values
- .as_ref()
- .and_then(|row| extract_stats_datum(row, index, stats_data_type))
- .and_then(|datum| coerce_stats_datum_for_predicate(datum,
predicate_data_type))
- {
- Some(value) => value,
- None => return true,
- };
-
- match op {
- PredicateOperator::Eq => {
- !matches!(literal.partial_cmp(&min_value), Some(Ordering::Less))
- && !matches!(literal.partial_cmp(&max_value),
Some(Ordering::Greater))
- }
- PredicateOperator::NotEq => !(min_value == *literal && max_value ==
*literal),
- PredicateOperator::Lt => !matches!(
- min_value.partial_cmp(literal),
- Some(Ordering::Greater | Ordering::Equal)
- ),
- PredicateOperator::LtEq => {
- !matches!(min_value.partial_cmp(literal), Some(Ordering::Greater))
- }
- PredicateOperator::Gt => !matches!(
- max_value.partial_cmp(literal),
- Some(Ordering::Less | Ordering::Equal)
- ),
- PredicateOperator::GtEq => !matches!(max_value.partial_cmp(literal),
Some(Ordering::Less)),
- PredicateOperator::IsNull
- | PredicateOperator::IsNotNull
- | PredicateOperator::In
- | PredicateOperator::NotIn => true,
- }
-}
-
-fn missing_field_may_match(op: PredicateOperator, row_count: i64) -> bool {
- if row_count <= 0 {
- return false;
- }
-
- matches!(op, PredicateOperator::IsNull)
-}
-
-fn coerce_stats_datum_for_predicate(datum: Datum, predicate_data_type:
&DataType) -> Option<Datum> {
- match (datum, predicate_data_type) {
- (datum @ Datum::Bool(_), DataType::Boolean(_))
- | (datum @ Datum::TinyInt(_), DataType::TinyInt(_))
- | (datum @ Datum::SmallInt(_), DataType::SmallInt(_))
- | (datum @ Datum::Int(_), DataType::Int(_))
- | (datum @ Datum::Long(_), DataType::BigInt(_))
- | (datum @ Datum::Float(_), DataType::Float(_))
- | (datum @ Datum::Double(_), DataType::Double(_))
- | (datum @ Datum::String(_), DataType::VarChar(_))
- | (datum @ Datum::String(_), DataType::Char(_))
- | (datum @ Datum::Bytes(_), DataType::Binary(_))
- | (datum @ Datum::Bytes(_), DataType::VarBinary(_))
- | (datum @ Datum::Date(_), DataType::Date(_))
- | (datum @ Datum::Time(_), DataType::Time(_))
- | (datum @ Datum::Timestamp { .. }, DataType::Timestamp(_))
- | (datum @ Datum::LocalZonedTimestamp { .. },
DataType::LocalZonedTimestamp(_))
- | (datum @ Datum::Decimal { .. }, DataType::Decimal(_)) => Some(datum),
- (Datum::TinyInt(value), DataType::SmallInt(_)) =>
Some(Datum::SmallInt(value as i16)),
- (Datum::TinyInt(value), DataType::Int(_)) => Some(Datum::Int(value as
i32)),
- (Datum::TinyInt(value), DataType::BigInt(_)) => Some(Datum::Long(value
as i64)),
- (Datum::SmallInt(value), DataType::Int(_)) => Some(Datum::Int(value as
i32)),
- (Datum::SmallInt(value), DataType::BigInt(_)) =>
Some(Datum::Long(value as i64)),
- (Datum::Int(value), DataType::BigInt(_)) => Some(Datum::Long(value as
i64)),
- (Datum::Float(value), DataType::Double(_)) => Some(Datum::Double(value
as f64)),
- _ => None,
- }
+ predicates_may_match_with_schema(
+ predicates,
+ &stats,
+ &resolved.field_mapping,
+ &resolved.file_fields,
+ )
}
fn extract_stats_datum(row: &BinaryRow, index: usize, data_type: &DataType) ->
Option<Datum> {
@@ -463,16 +294,18 @@ pub(super) fn data_evolution_group_matches_predicates(
let mut sorted_files: Vec<&DataFileMeta> = group.iter().collect();
sorted_files.sort_by(|a, b|
b.max_sequence_number.cmp(&a.max_sequence_number));
- // For each table field, find which file (index in sorted_files) provides
it,
- // and the field's offset within that file's stats.
+ // For each table field, find which file (index in sorted_files) provides
it.
+ // The field index remains a table-field index so FileStatsRows can resolve
+ // it through its own schema-to-stats mapping.
let field_sources: Vec<Option<(usize, usize)>> = table_fields
.iter()
- .map(|field| {
+ .enumerate()
+ .map(|(field_idx, field)| {
for (file_idx, file) in sorted_files.iter().enumerate() {
let file_columns = file_stats_columns(file, table_fields);
- for (stats_idx, col_name) in file_columns.iter().enumerate() {
+ for col_name in &file_columns {
if *col_name == field.name() {
- return Some((file_idx, stats_idx));
+ return Some((file_idx, field_idx));
}
}
}
@@ -544,13 +377,20 @@ fn data_evolution_predicate_may_match(
let Some(source) = field_sources.get(*index).copied().flatten()
else {
return missing_field_may_match(*op, row_count);
};
- let (file_idx, stats_idx) = source;
+ let (file_idx, field_index) = source;
let stats = &file_stats[file_idx];
let stats_data_type = table_fields
.get(*index)
.map(|f| f.data_type())
.unwrap_or(data_type);
- data_leaf_may_match(stats_idx, stats_data_type, data_type, *op,
literals, stats)
+ data_leaf_may_match(
+ field_index,
+ stats_data_type,
+ data_type,
+ *op,
+ literals,
+ stats,
+ )
}
}
}
diff --git a/crates/paimon/src/table/table_scan.rs
b/crates/paimon/src/table/table_scan.rs
index 9f2ebf9..da2dc24 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -20,16 +20,15 @@
//! Reference:
[pypaimon.read.table_scan.TableScan](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/read/table_scan.py)
//! and
[FullStartingScanner](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/read/scanner/full_starting_scanner.py).
-use super::bucket_filter::{
- compute_target_buckets, extract_predicate_for_keys,
split_partition_and_data_predicates,
-};
+use super::bucket_filter::compute_target_buckets;
use super::stats_filter::{
data_evolution_group_matches_predicates, data_file_matches_predicates,
- data_file_matches_predicates_for_table, data_leaf_may_match,
group_by_overlapping_row_id,
- FileStatsRows, ResolvedStatsSchema,
+ data_file_matches_predicates_for_table, group_by_overlapping_row_id,
FileStatsRows,
+ ResolvedStatsSchema,
};
use super::Table;
use crate::io::FileIO;
+use crate::predicate_stats::data_leaf_may_match;
use crate::spec::{
eval_row, BinaryRow, CoreOptions, DataField, DataFileMeta, FileKind,
IndexManifest,
ManifestEntry, ManifestFileMeta, PartitionComputer, Predicate, Snapshot,
@@ -299,17 +298,27 @@ fn partition_matches_predicate(
#[derive(Debug, Clone)]
pub struct TableScan<'a> {
table: &'a Table,
- filter: Option<Predicate>,
+ partition_predicate: Option<Predicate>,
+ data_predicates: Vec<Predicate>,
+ bucket_predicate: Option<Predicate>,
/// Optional limit on the number of rows to return.
/// When set, the scan will try to return only enough splits to satisfy
the limit.
limit: Option<usize>,
}
impl<'a> TableScan<'a> {
- pub fn new(table: &'a Table, filter: Option<Predicate>, limit:
Option<usize>) -> Self {
+ pub fn new(
+ table: &'a Table,
+ partition_predicate: Option<Predicate>,
+ data_predicates: Vec<Predicate>,
+ bucket_predicate: Option<Predicate>,
+ limit: Option<usize>,
+ ) -> Self {
Self {
table,
- filter,
+ partition_predicate,
+ data_predicates,
+ bucket_predicate,
limit,
}
}
@@ -416,24 +425,12 @@ impl<'a> TableScan<'a> {
let target_split_size = core_options.source_split_target_size();
let open_file_cost = core_options.source_split_open_file_cost();
- // Compute predicates before reading manifests so they can be pushed
down.
- let partition_keys = self.table.schema().partition_keys();
- let (partition_predicate, data_predicates) = if let Some(filter) =
self.filter.clone() {
- if partition_keys.is_empty() {
- (None, filter.split_and())
- } else {
- split_partition_and_data_predicates(
- filter,
- self.table.schema().fields(),
- partition_keys,
- )
- }
- } else {
- (None, Vec::new())
- };
-
// Resolve partition fields for manifest-file-level stats pruning.
- let partition_fields: Vec<DataField> = partition_keys
+ let partition_keys = self.table.schema().partition_keys();
+ let partition_fields: Vec<DataField> = self
+ .table
+ .schema()
+ .partition_keys()
.iter()
.filter_map(|key| {
self.table
@@ -449,17 +446,17 @@ impl<'a> TableScan<'a> {
let pushdown_data_predicates = if data_evolution_enabled {
&[][..]
} else {
- &data_predicates
+ self.data_predicates.as_slice()
};
let has_primary_keys = !self.table.schema().primary_keys().is_empty();
// Compute bucket predicate and key fields for per-entry bucket
pruning.
// Only supported for the default bucket function (MurmurHash3-based).
- let (bucket_predicate, bucket_key_fields): (Option<Predicate>,
Vec<DataField>) =
- if !core_options.is_default_bucket_function() {
- (None, Vec::new())
- } else if let Some(filter) = &self.filter {
+ let bucket_key_fields: Vec<DataField> =
+ if self.bucket_predicate.is_none() ||
!core_options.is_default_bucket_function() {
+ Vec::new()
+ } else {
let bucket_keys = core_options.bucket_key().unwrap_or_else(|| {
if has_primary_keys {
self.table
@@ -472,33 +469,17 @@ impl<'a> TableScan<'a> {
Vec::new()
}
});
- if bucket_keys.is_empty() {
- (None, Vec::new())
- } else {
- let fields: Vec<DataField> = bucket_keys
- .iter()
- .filter_map(|key| {
- self.table
- .schema()
- .fields()
- .iter()
- .find(|f| f.name() == key)
- .cloned()
- })
- .collect();
- if fields.len() == bucket_keys.len() {
- let pred = extract_predicate_for_keys(
- filter,
- self.table.schema().fields(),
- &bucket_keys,
- );
- (pred, fields)
- } else {
- (None, Vec::new())
- }
- }
- } else {
- (None, Vec::new())
+ bucket_keys
+ .iter()
+ .filter_map(|key| {
+ self.table
+ .schema()
+ .fields()
+ .iter()
+ .find(|f| f.name() == key)
+ .cloned()
+ })
+ .collect::<Vec<_>>()
};
let entries = read_all_manifest_entries(
@@ -507,12 +488,12 @@ impl<'a> TableScan<'a> {
&snapshot,
deletion_vectors_enabled,
has_primary_keys,
- partition_predicate.as_ref(),
+ self.partition_predicate.as_ref(),
&partition_fields,
pushdown_data_predicates,
self.table.schema().id(),
self.table.schema().fields(),
- bucket_predicate.as_ref(),
+ self.bucket_predicate.as_ref(),
&bucket_key_fields,
)
.await?;
@@ -523,7 +504,7 @@ impl<'a> TableScan<'a> {
// For non-data-evolution tables, cross-schema files were kept
(fail-open)
// by the pushdown. Apply the full schema-aware filter for those files.
- let entries = if data_predicates.is_empty() || data_evolution_enabled {
+ let entries = if self.data_predicates.is_empty() ||
data_evolution_enabled {
entries
} else {
let current_schema_id = self.table.schema().id();
@@ -541,7 +522,7 @@ impl<'a> TableScan<'a> {
|| data_file_matches_predicates_for_table(
self.table,
entry.file(),
- &data_predicates,
+ &self.data_predicates,
&mut schema_cache,
)
.await
@@ -625,7 +606,7 @@ impl<'a> TableScan<'a> {
let row_id_groups = group_by_overlapping_row_id(data_files);
// Filter groups by merged stats before splitting.
- let row_id_groups: Vec<Vec<DataFileMeta>> = if
data_predicates.is_empty() {
+ let row_id_groups: Vec<Vec<DataFileMeta>> = if
self.data_predicates.is_empty() {
row_id_groups
} else {
row_id_groups
@@ -633,7 +614,7 @@ impl<'a> TableScan<'a> {
.filter(|group| {
data_evolution_group_matches_predicates(
group,
- &data_predicates,
+ &self.data_predicates,
self.table.schema().fields(),
)
})
@@ -688,7 +669,7 @@ impl<'a> TableScan<'a> {
// Apply limit pushdown only when there are no data predicates.
// With data predicates, merged_row_count() reflects pre-filter row
counts,
// so stopping early could return fewer rows than the limit after
filtering.
- let splits = if data_predicates.is_empty() {
+ let splits = if self.data_predicates.is_empty() {
self.apply_limit_pushdown(splits)
} else {
splits
diff --git a/crates/test_utils.rs b/crates/test_utils.rs
new file mode 100644
index 0000000..48f6fa2
--- /dev/null
+++ b/crates/test_utils.rs
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fs::File;
+use std::path::Path;
+use std::sync::Arc;
+
+use arrow_array::{Array, Int32Array, RecordBatch};
+use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as
ArrowSchema};
+use chrono::Utc;
+use parquet::arrow::ArrowWriter;
+use parquet::file::properties::WriterProperties;
+use serde::de::DeserializeOwned;
+
+pub(crate) fn write_int_parquet_file(
+ path: &Path,
+ columns: Vec<(&str, Vec<i32>)>,
+ max_row_group_size: Option<usize>,
+) {
+ let schema = Arc::new(ArrowSchema::new(
+ columns
+ .iter()
+ .map(|(name, _)| ArrowField::new(*name, ArrowDataType::Int32,
false))
+ .collect::<Vec<_>>(),
+ ));
+ let arrays: Vec<Arc<dyn Array>> = columns
+ .iter()
+ .map(|(_, values)| Arc::new(Int32Array::from(values.clone())) as
Arc<dyn Array>)
+ .collect();
+ let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap();
+
+ let props = max_row_group_size.map(|size| {
+ WriterProperties::builder()
+ .set_max_row_group_size(size)
+ .build()
+ });
+ let file = File::create(path).unwrap();
+ let mut writer = ArrowWriter::try_new(file, schema, props).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+}
+
+pub(crate) fn local_file_path(path: &Path) -> String {
+ let normalized = path.to_string_lossy().replace('\\', "/");
+ if normalized.starts_with('/') {
+ format!("file:{normalized}")
+ } else {
+ format!("file:/{normalized}")
+ }
+}
+
+pub(crate) fn test_data_file<T>(file_name: &str, row_count: i64) -> T
+where
+ T: DeserializeOwned,
+{
+ serde_json::from_value(serde_json::json!({
+ "_FILE_NAME": file_name,
+ "_FILE_SIZE": 0,
+ "_ROW_COUNT": row_count,
+ "_MIN_KEY": [],
+ "_MAX_KEY": [],
+ "_KEY_STATS": {
+ "_MIN_VALUES": [],
+ "_MAX_VALUES": [],
+ "_NULL_COUNTS": []
+ },
+ "_VALUE_STATS": {
+ "_MIN_VALUES": [],
+ "_MAX_VALUES": [],
+ "_NULL_COUNTS": []
+ },
+ "_MIN_SEQUENCE_NUMBER": 0,
+ "_MAX_SEQUENCE_NUMBER": 0,
+ "_SCHEMA_ID": 0,
+ "_LEVEL": 1,
+ "_EXTRA_FILES": [],
+ "_CREATION_TIME": Utc::now().timestamp_millis(),
+ "_DELETE_ROW_COUNT": null,
+ "_EMBEDDED_FILE_INDEX": null,
+ "_FILE_SOURCE": null,
+ "_VALUE_STATS_COLS": null,
+ "_FIRST_ROW_ID": null,
+ "_WRITE_COLS": null,
+ "_EXTERNAL_PATH": null
+ }))
+ .unwrap()
+}