This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f4aac6  feat(scan): support data-level stats pruning in TableScan 
(#196)
1f4aac6 is described below

commit 1f4aac69a2a932c2d2a0821d4d5921d100898db9
Author: Zach <[email protected]>
AuthorDate: Sat Apr 4 10:43:16 2026 +0800

    feat(scan): support data-level stats pruning in TableScan (#196)
---
 crates/integration_tests/tests/read_tables.rs | 207 ++++++-
 crates/paimon/src/spec/mod.rs                 |   1 +
 crates/paimon/src/spec/predicate.rs           | 184 +++++-
 crates/paimon/src/table/read_builder.rs       |  16 +-
 crates/paimon/src/table/table_scan.rs         | 860 +++++++++++++++++++++++---
 5 files changed, 1142 insertions(+), 126 deletions(-)

diff --git a/crates/integration_tests/tests/read_tables.rs 
b/crates/integration_tests/tests/read_tables.rs
index 2775978..2411a9c 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -543,7 +543,6 @@ async fn test_read_partitioned_table_with_filter() {
 
     let catalog = create_file_system_catalog();
     let table = get_table_from_catalog(&catalog, 
"partitioned_log_table").await;
-    // Build a filter: dt = '2024-01-01'
     let schema = table.schema();
     let pb = PredicateBuilder::new(schema.fields());
     let filter = pb
@@ -577,7 +576,6 @@ async fn test_read_multi_partitioned_table_with_filter() {
     let schema = table.schema();
     let pb = PredicateBuilder::new(schema.fields());
 
-    // Filter: dt = '2024-01-01' AND hr = 10
     let filter = Predicate::and(vec![
         pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
         pb.equal("hr", Datum::Int(10)).unwrap(),
@@ -600,7 +598,7 @@ async fn test_read_multi_partitioned_table_with_filter() {
 }
 
 #[tokio::test]
-async fn 
test_read_partitioned_table_data_only_filter_preserves_all_partitions() {
+async fn test_read_partitioned_table_data_only_filter_prunes_all_files() {
     use paimon::spec::{Datum, PredicateBuilder};
 
     let catalog = create_file_system_catalog();
@@ -608,8 +606,6 @@ async fn 
test_read_partitioned_table_data_only_filter_preserves_all_partitions()
     let schema = table.schema();
     let pb = PredicateBuilder::new(schema.fields());
 
-    // Data-only filter: id > 10 — should NOT prune any partitions,
-    // and is still ignored at read level in Phase 2.
     let filter = pb
         .greater_than("id", Datum::Int(10))
         .expect("Failed to build predicate");
@@ -618,24 +614,18 @@ async fn 
test_read_partitioned_table_data_only_filter_preserves_all_partitions()
     let seen_partitions = extract_plan_partitions(&plan);
     assert_eq!(
         seen_partitions,
-        HashSet::from(["2024-01-01".into(), "2024-01-02".into()]),
-        "Data-only filter should not prune any partitions"
+        HashSet::<String>::new(),
+        "Data-only filter should prune all files when stats prove no match"
     );
 
     let actual = extract_id_name(&batches);
     assert_eq!(
         actual,
-        vec![
-            (1, "alice".to_string()),
-            (2, "bob".to_string()),
-            (3, "carol".to_string()),
-        ],
-        "Data predicate is not applied at read level; all rows are still 
returned"
+        Vec::<(i32, String)>::new(),
+        "No rows should be planned when stats prove the predicate is 
unsatisfiable"
     );
 }
 
-/// Mixed AND: partition predicate prunes partitions, but data predicate is
-/// silently ignored — all rows from the matching partition are returned.
 #[tokio::test]
 async fn test_read_partitioned_table_mixed_and_filter() {
     use paimon::spec::{Datum, Predicate, PredicateBuilder};
@@ -645,8 +635,6 @@ async fn test_read_partitioned_table_mixed_and_filter() {
     let schema = table.schema();
     let pb = PredicateBuilder::new(schema.fields());
 
-    // dt = '2024-01-01' AND id > 10
-    // Partition conjunct (dt) is applied; data conjunct (id) is NOT.
     let filter = Predicate::and(vec![
         pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
         pb.greater_than("id", Datum::Int(10)).unwrap(),
@@ -656,20 +644,92 @@ async fn test_read_partitioned_table_mixed_and_filter() {
     let seen_partitions = extract_plan_partitions(&plan);
     assert_eq!(
         seen_partitions,
-        HashSet::from(["2024-01-01".into()]),
-        "Only dt=2024-01-01 should survive"
+        HashSet::<String>::new(),
+        "The matching partition should also be pruned when file stats prove no 
match"
     );
 
     let actual = extract_id_name(&batches);
     assert_eq!(
         actual,
-        vec![(1, "alice".to_string()), (2, "bob".to_string())],
-        "Data predicate (id > 10) is NOT applied — all rows from matching 
partition returned"
+        Vec::<(i32, String)>::new(),
+        "No rows should remain after partition pruning and data stats pruning"
+    );
+}
+
+#[tokio::test]
+async fn 
test_read_partitioned_table_data_only_filter_keeps_matching_partition() {
+    use paimon::spec::{Datum, PredicateBuilder};
+
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, 
"partitioned_log_table").await;
+    let schema = table.schema();
+    let pb = PredicateBuilder::new(schema.fields());
+
+    let filter = pb
+        .greater_than("id", Datum::Int(2))
+        .expect("Failed to build predicate");
+
+    let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+    let seen_partitions = extract_plan_partitions(&plan);
+    assert_eq!(
+        seen_partitions,
+        HashSet::from(["2024-01-02".into()]),
+        "Only files whose stats may satisfy the predicate should remain in the 
plan"
+    );
+
+    let actual = extract_id_name(&batches);
+    assert_eq!(
+        actual,
+        vec![(3, "carol".to_string())],
+        "Only rows from files that survive stats pruning should be returned"
     );
 }
 
-/// Mixed OR: `dt = '...' OR id > 10` cannot be split into a pure partition
-/// predicate, so no partitions should be pruned.
+/// Java-style inclusive projection can still extract partition predicates from
+/// an OR of mixed AND branches.
+#[tokio::test]
+async fn 
test_read_multi_partitioned_table_or_of_mixed_ands_prunes_partitions() {
+    use paimon::spec::{Datum, Predicate, PredicateBuilder};
+
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, 
"multi_partitioned_log_table").await;
+    let schema = table.schema();
+    let pb = PredicateBuilder::new(schema.fields());
+
+    let filter = Predicate::or(vec![
+        Predicate::and(vec![
+            pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
+            pb.equal("hr", Datum::Int(10)).unwrap(),
+            pb.greater_than("id", Datum::Int(10)).unwrap(),
+        ]),
+        Predicate::and(vec![
+            pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
+            pb.equal("hr", Datum::Int(20)).unwrap(),
+        ]),
+    ]);
+
+    let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+    let seen_partitions = extract_plan_multi_partitions(&plan);
+    assert_eq!(
+        seen_partitions,
+        HashSet::from([("2024-01-01".into(), 10), ("2024-01-01".into(), 20)]),
+        "Inclusive projection should prune the dt=2024-01-02 partition"
+    );
+
+    let actual = extract_id_name(&batches);
+    assert_eq!(
+        actual,
+        vec![
+            (1, "alice".to_string()),
+            (2, "bob".to_string()),
+            (3, "carol".to_string()),
+        ],
+        "All rows from the surviving partitions should be returned"
+    );
+}
+
+/// A directly mixed OR like `dt = '...' OR id > 10` is still not safely
+/// splittable into a partition predicate, so no partitions should be pruned.
 #[tokio::test]
 async fn test_read_partitioned_table_mixed_or_filter_preserves_all() {
     use paimon::spec::{Datum, Predicate, PredicateBuilder};
@@ -679,7 +739,6 @@ async fn 
test_read_partitioned_table_mixed_or_filter_preserves_all() {
     let schema = table.schema();
     let pb = PredicateBuilder::new(schema.fields());
 
-    // dt = '2024-01-01' OR id > 10 — mixed OR is not safely splittable.
     let filter = Predicate::or(vec![
         pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
         pb.greater_than("id", Datum::Int(10)).unwrap(),
@@ -705,7 +764,7 @@ async fn 
test_read_partitioned_table_mixed_or_filter_preserves_all() {
     );
 }
 
-/// Filter that matches no existing partition — all entries pruned, 0 splits.
+/// A filter that matches no partition should produce no splits.
 #[tokio::test]
 async fn test_read_partitioned_table_filter_matches_no_partition() {
     use paimon::spec::{Datum, PredicateBuilder};
@@ -715,7 +774,6 @@ async fn 
test_read_partitioned_table_filter_matches_no_partition() {
     let schema = table.schema();
     let pb = PredicateBuilder::new(schema.fields());
 
-    // dt = '9999-12-31' matches no partition.
     let filter = pb
         .equal("dt", Datum::String("9999-12-31".into()))
         .expect("Failed to build predicate");
@@ -744,8 +802,7 @@ async fn 
test_read_partitioned_table_eval_row_error_fails_plan() {
         .position(|f| f.name() == "dt")
         .expect("dt partition column should exist");
 
-    // Use an unsupported DataType in a partition leaf so remapping succeeds
-    // but `eval_row` fails during partition pruning.
+    // Use an unsupported partition type so remapping succeeds but `eval_row` 
fails.
     let filter = Predicate::Leaf {
         column: "dt".into(),
         index: dt_index,
@@ -1086,6 +1143,100 @@ async fn test_read_schema_evolution_type_promotion() {
     );
 }
 
+/// Stats pruning should treat a newly added column as all-NULL for old files.
+#[tokio::test]
+async fn 
test_stats_pruning_schema_evolution_added_column_eq_prunes_old_files() {
+    use paimon::spec::{Datum, PredicateBuilder};
+
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, 
"schema_evolution_add_column").await;
+    let pb = PredicateBuilder::new(table.schema().fields());
+    let filter = pb
+        .equal("age", Datum::Int(30))
+        .expect("Failed to build predicate");
+
+    let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+    assert_eq!(
+        plan.splits().len(),
+        1,
+        "Only the file written after ADD COLUMN should survive stats pruning"
+    );
+
+    let actual = extract_id_name(&batches);
+    assert_eq!(
+        actual,
+        vec![(3, "carol".to_string())],
+        "Old files missing 'age' and rows with age != 30 should be pruned"
+    );
+}
+
+/// Stats pruning should keep only old files for IS NULL on a newly added 
column.
+#[tokio::test]
+async fn 
test_stats_pruning_schema_evolution_added_column_is_null_prunes_new_files() {
+    use paimon::spec::PredicateBuilder;
+
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, 
"schema_evolution_add_column").await;
+    let pb = PredicateBuilder::new(table.schema().fields());
+    let filter = pb.is_null("age").expect("Failed to build predicate");
+
+    let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+    assert_eq!(
+        plan.splits().len(),
+        1,
+        "Only files missing 'age' should survive stats pruning for age IS NULL"
+    );
+
+    let actual = extract_id_name(&batches);
+    assert_eq!(
+        actual,
+        vec![(1, "alice".to_string()), (2, "bob".to_string())],
+        "New files with non-null age should be pruned for age IS NULL"
+    );
+}
+
+/// Stats pruning should still work after INT -> BIGINT type promotion.
+#[tokio::test]
+async fn 
test_stats_pruning_schema_evolution_type_promotion_prunes_old_int_files() {
+    use paimon::spec::{Datum, PredicateBuilder};
+
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, 
"schema_evolution_type_promotion").await;
+    let pb = PredicateBuilder::new(table.schema().fields());
+    let filter = pb
+        .greater_than("value", Datum::Long(250))
+        .expect("Failed to build predicate");
+
+    let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+    assert_eq!(
+        plan.splits().len(),
+        1,
+        "Old INT files should still be pruned using promoted BIGINT predicates"
+    );
+
+    let mut rows: Vec<(i32, i64)> = Vec::new();
+    for batch in &batches {
+        let id = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("id");
+        let value = batch
+            .column_by_name("value")
+            .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
+            .expect("value");
+        for i in 0..batch.num_rows() {
+            rows.push((id.value(i), value.value(i)));
+        }
+    }
+    rows.sort_by_key(|(id, _)| *id);
+
+    assert_eq!(
+        rows,
+        vec![(3, 3_000_000_000i64)],
+        "Only the BIGINT file should remain after value > 250 pruning"
+    );
+}
+
 /// Test reading a data-evolution table after ALTER TABLE ADD COLUMNS.
 /// Old files lack the new column; reader should fill nulls even in data 
evolution mode.
 #[tokio::test]
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index 970fb5e..bcfe5ad 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -57,6 +57,7 @@ mod partition_utils;
 pub(crate) use partition_utils::PartitionComputer;
 mod predicate;
 pub(crate) use predicate::eval_row;
+pub(crate) use predicate::extract_datum;
 pub use predicate::{
     field_idx_to_partition_idx, Datum, Predicate, PredicateBuilder, 
PredicateOperator,
 };
diff --git a/crates/paimon/src/spec/predicate.rs 
b/crates/paimon/src/spec/predicate.rs
index 4c89820..953b92c 100644
--- a/crates/paimon/src/spec/predicate.rs
+++ b/crates/paimon/src/spec/predicate.rs
@@ -391,7 +391,7 @@ impl Predicate {
     /// a mixed predicate is never partially remapped.
     ///
     /// `mapping` is the output of `field_idx_to_partition_idx`.
-    pub(crate) fn remap_field_index(self, mapping: &[Option<usize>]) -> 
Option<Predicate> {
+    pub(crate) fn remap_field_index(&self, mapping: &[Option<usize>]) -> 
Option<Predicate> {
         match self {
             Predicate::Leaf {
                 column,
@@ -400,25 +400,25 @@ impl Predicate {
                 op,
                 literals,
             } => {
-                let new_index = (*mapping.get(index)?)?;
+                let new_index = (*mapping.get(*index)?)?;
                 Some(Predicate::Leaf {
-                    column,
+                    column: column.clone(),
                     index: new_index,
-                    data_type,
-                    op,
-                    literals,
+                    data_type: data_type.clone(),
+                    op: *op,
+                    literals: literals.clone(),
                 })
             }
             Predicate::And(children) => {
                 let remapped: Option<Vec<_>> = children
-                    .into_iter()
+                    .iter()
                     .map(|c| c.remap_field_index(mapping))
                     .collect();
                 Some(Predicate::and(remapped?))
             }
             Predicate::Or(children) => {
                 let remapped: Option<Vec<_>> = children
-                    .into_iter()
+                    .iter()
                     .map(|c| c.remap_field_index(mapping))
                     .collect();
                 Some(Predicate::or(remapped?))
@@ -431,6 +431,77 @@ impl Predicate {
             Predicate::AlwaysFalse => Some(Predicate::AlwaysFalse),
         }
     }
+
+    /// Check whether every leaf field in this subtree is present in `mapping`.
+    ///
+    /// This is used to decide whether the original conjunct still needs to be
+    /// retained as a residual data predicate after partition projection.
+    pub(crate) fn references_only_mapped_fields(&self, mapping: 
&[Option<usize>]) -> bool {
+        match self {
+            Predicate::Leaf { index, .. } => 
mapping.get(*index).is_some_and(Option::is_some),
+            Predicate::And(children) | Predicate::Or(children) => children
+                .iter()
+                .all(|child| child.references_only_mapped_fields(mapping)),
+            Predicate::Not(inner) => 
inner.references_only_mapped_fields(mapping),
+            Predicate::AlwaysTrue | Predicate::AlwaysFalse => true,
+        }
+    }
+
+    /// Project leaf field indices from table schema space into a smaller 
field space.
+    ///
+    /// Unlike [`Self::remap_field_index`], mixed `AND` subtrees keep the 
children
+    /// that can be projected and drop the rest. `OR` and `NOT` still require 
all
+    /// children to be projectable to preserve correctness.
+    ///
+    /// This matches the partition predicate extraction semantics used by Java
+    /// `splitPartitionPredicatesAndDataPredicates`.
+    pub(crate) fn project_field_index_inclusive(
+        &self,
+        mapping: &[Option<usize>],
+    ) -> Option<Predicate> {
+        match self {
+            Predicate::Leaf {
+                column,
+                index,
+                data_type,
+                op,
+                literals,
+            } => {
+                let new_index = (*mapping.get(*index)?)?;
+                Some(Predicate::Leaf {
+                    column: column.clone(),
+                    index: new_index,
+                    data_type: data_type.clone(),
+                    op: *op,
+                    literals: literals.clone(),
+                })
+            }
+            Predicate::And(children) => {
+                let projected: Vec<_> = children
+                    .iter()
+                    .filter_map(|c| c.project_field_index_inclusive(mapping))
+                    .collect();
+                if projected.is_empty() {
+                    None
+                } else {
+                    Some(Predicate::and(projected))
+                }
+            }
+            Predicate::Or(children) => {
+                let projected: Option<Vec<_>> = children
+                    .iter()
+                    .map(|c| c.project_field_index_inclusive(mapping))
+                    .collect();
+                Some(Predicate::or(projected?))
+            }
+            Predicate::Not(inner) => {
+                let projected = inner.remap_field_index(mapping)?;
+                Some(Predicate::negate(projected))
+            }
+            Predicate::AlwaysTrue => Some(Predicate::AlwaysTrue),
+            Predicate::AlwaysFalse => Some(Predicate::AlwaysFalse),
+        }
+    }
 }
 
 impl fmt::Display for Predicate {
@@ -1713,4 +1784,101 @@ mod tests {
         // NOT(partition AND data) → mixed under NOT → None
         assert!(negated.remap_field_index(&mapping).is_none());
     }
+
+    // ================== project_field_index_inclusive ==================
+
+    #[test]
+    fn test_project_inclusive_and_keeps_partition_children() {
+        let pb = PredicateBuilder::new(&test_fields());
+        let mixed = Predicate::and(vec![
+            pb.equal("dt", Datum::Date(19723)).unwrap(),
+            pb.greater_than("id", Datum::Int(10)).unwrap(),
+        ]);
+        let mapping = vec![None, None, Some(0), Some(1)];
+
+        let projected = mixed.project_field_index_inclusive(&mapping).unwrap();
+        match projected {
+            Predicate::Leaf { column, index, .. } => {
+                assert_eq!(column, "dt");
+                assert_eq!(index, 0);
+            }
+            other => panic!("expected projected partition leaf, got 
{other:?}"),
+        }
+    }
+
+    #[test]
+    fn test_project_inclusive_and_all_data_returns_none() {
+        let pb = PredicateBuilder::new(&test_fields());
+        let data_only = Predicate::and(vec![
+            pb.equal("id", Datum::Int(1)).unwrap(),
+            pb.equal("name", Datum::String("alice".into())).unwrap(),
+        ]);
+        let mapping = vec![None, None, Some(0), Some(1)];
+
+        assert!(data_only.project_field_index_inclusive(&mapping).is_none());
+    }
+
+    #[test]
+    fn test_project_inclusive_or_with_mixed_returns_none() {
+        let pb = PredicateBuilder::new(&test_fields());
+        let p_partition = pb.equal("dt", Datum::Date(19723)).unwrap();
+        let p_data = pb.equal("id", Datum::Int(1)).unwrap();
+        let combined = Predicate::or(vec![p_partition, p_data]);
+        let mapping = vec![None, None, Some(0), Some(1)];
+
+        assert!(combined.project_field_index_inclusive(&mapping).is_none());
+    }
+
+    #[test]
+    fn test_project_inclusive_or_of_mixed_ands_projects_each_branch() {
+        let pb = PredicateBuilder::new(&test_fields());
+        let left = Predicate::and(vec![
+            pb.equal("dt", Datum::Date(19723)).unwrap(),
+            pb.greater_than("id", Datum::Int(10)).unwrap(),
+        ]);
+        let right = Predicate::and(vec![
+            pb.equal("hr", Datum::Int(10)).unwrap(),
+            pb.equal("name", Datum::String("alice".into())).unwrap(),
+        ]);
+        let combined = Predicate::or(vec![left, right]);
+        let mapping = vec![None, None, Some(0), Some(1)];
+
+        let projected = 
combined.project_field_index_inclusive(&mapping).unwrap();
+        match projected {
+            Predicate::Or(children) => {
+                assert_eq!(children.len(), 2);
+                assert!(matches!(
+                    &children[0],
+                    Predicate::Leaf {
+                        column,
+                        index: 0,
+                        ..
+                    } if column == "dt"
+                ));
+                assert!(matches!(
+                    &children[1],
+                    Predicate::Leaf {
+                        column,
+                        index: 1,
+                        ..
+                    } if column == "hr"
+                ));
+            }
+            other => panic!("expected projected OR, got {other:?}"),
+        }
+    }
+
+    #[test]
+    fn test_project_inclusive_not_with_mixed_returns_none() {
+        let pb = PredicateBuilder::new(&test_fields());
+        let inner = Predicate::and(vec![
+            pb.equal("dt", Datum::Date(19723)).unwrap(),
+            pb.greater_than("id", Datum::Int(10)).unwrap(),
+        ]);
+        let mapping = vec![None, None, Some(0), Some(1)];
+
+        assert!(Predicate::negate(inner)
+            .project_field_index_inclusive(&mapping)
+            .is_none());
+    }
 }
diff --git a/crates/paimon/src/table/read_builder.rs 
b/crates/paimon/src/table/read_builder.rs
index db5c42c..d55ecc1 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -60,15 +60,15 @@ impl<'a> ReadBuilder<'a> {
     /// Set a filter predicate for scan planning.
     ///
     /// The predicate should use table schema field indices (as produced by
-    /// [`PredicateBuilder`]).  During [`TableScan::plan`] the filter is
-    /// decomposed at AND boundaries: partition-only conjuncts are extracted
-    /// and used to prune partitions; all other conjuncts are currently
-    /// **ignored** (neither `TableScan` nor `TableRead` applies data-level
-    /// predicates yet).
+    /// [`PredicateBuilder`]). During [`TableScan::plan`], partition-only
+    /// conjuncts are used for partition pruning and supported data conjuncts
+    /// may be used for conservative file-stats pruning.
     ///
-    /// This means rows returned by `TableRead` may **not** satisfy the full
-    /// filter — callers must apply remaining predicates themselves until
-    /// data-level pushdown is implemented.
+    /// Stats pruning is per file. Files with a different `schema_id`,
+    /// incompatible stats layout, or inconclusive stats are kept.
+    ///
+    /// [`TableRead`] does not evaluate row-level filters; callers must apply
+    /// any remaining predicates themselves.
     pub fn with_filter(&mut self, filter: Predicate) -> &mut Self {
         self.filter = Some(filter);
         self
diff --git a/crates/paimon/src/table/table_scan.rs 
b/crates/paimon/src/table/table_scan.rs
index 60d32a5..2f230ce 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -21,17 +21,21 @@
 //! and 
[FullStartingScanner](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/read/scanner/full_starting_scanner.py).
 
 use super::Table;
+use crate::arrow::schema_evolution::create_index_mapping;
 use crate::io::FileIO;
 use crate::spec::{
-    eval_row, field_idx_to_partition_idx, BinaryRow, CoreOptions, 
DataFileMeta, FileKind,
-    IndexManifest, ManifestEntry, PartitionComputer, Predicate, Snapshot,
+    eval_row, extract_datum, field_idx_to_partition_idx, BinaryRow, 
CoreOptions, DataField,
+    DataFileMeta, DataType, Datum, FileKind, IndexManifest, ManifestEntry, 
PartitionComputer,
+    Predicate, PredicateOperator, Snapshot,
 };
 use crate::table::bin_pack::split_for_batch;
 use crate::table::source::{DataSplit, DataSplitBuilder, DeletionFile, 
PartitionBucket, Plan};
 use crate::table::SnapshotManager;
 use crate::table::TagManager;
 use crate::Error;
+use std::cmp::Ordering;
 use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
 
 /// Path segment for manifest directory under table.
 const MANIFEST_DIR: &str = "manifest";
@@ -219,6 +223,411 @@ pub(crate) fn group_by_overlapping_row_id(mut files: 
Vec<DataFileMeta>) -> Vec<V
     result
 }
 
+#[derive(Debug, Clone)]
+struct FileStatsRows {
+    row_count: i64,
+    min_values: Option<BinaryRow>,
+    max_values: Option<BinaryRow>,
+    null_counts: Vec<i64>,
+}
+
+impl FileStatsRows {
+    /// Build file stats only when they are compatible with the expected file 
schema.
+    fn try_from_data_file(file: &DataFileMeta, expected_fields: usize) -> 
Option<Self> {
+        let stats = Self {
+            row_count: file.row_count,
+            min_values: 
BinaryRow::from_serialized_bytes(file.value_stats.min_values()).ok(),
+            max_values: 
BinaryRow::from_serialized_bytes(file.value_stats.max_values()).ok(),
+            null_counts: file.value_stats.null_counts().clone(),
+        };
+
+        stats.arity_matches(expected_fields).then_some(stats)
+    }
+
+    fn null_count(&self, index: usize) -> Option<i64> {
+        self.null_counts.get(index).copied()
+    }
+
+    /// Check whether the stats rows have the expected number of fields.
+    ///
+    /// If either min or max BinaryRow has an arity different from
+    /// `expected_fields`, the stats were likely written in dense mode or
+    /// under a different schema — making index-based access unsafe.
+    fn arity_matches(&self, expected_fields: usize) -> bool {
+        let min_ok = self
+            .min_values
+            .as_ref()
+            .is_none_or(|r| r.arity() as usize == expected_fields);
+        let max_ok = self
+            .max_values
+            .as_ref()
+            .is_none_or(|r| r.arity() as usize == expected_fields);
+        let null_ok = self.null_counts.is_empty() || self.null_counts.len() == 
expected_fields;
+        min_ok && max_ok && null_ok
+    }
+}
+
+#[derive(Debug)]
+struct ResolvedStatsSchema {
+    file_fields: Vec<DataField>,
+    field_mapping: Vec<Option<usize>>,
+}
+
+fn identity_field_mapping(num_fields: usize) -> Vec<Option<usize>> {
+    (0..num_fields).map(Some).collect()
+}
+
+fn normalize_field_mapping(mapping: Option<Vec<i32>>, num_fields: usize) -> 
Vec<Option<usize>> {
+    mapping
+        .map(|field_mapping| {
+            field_mapping
+                .into_iter()
+                .map(|index| usize::try_from(index).ok())
+                .collect()
+        })
+        .unwrap_or_else(|| identity_field_mapping(num_fields))
+}
+
+fn split_partition_and_data_predicates(
+    filter: Predicate,
+    fields: &[DataField],
+    partition_keys: &[String],
+) -> (Option<Predicate>, Vec<Predicate>) {
+    let mapping = field_idx_to_partition_idx(fields, partition_keys);
+    let mut partition_predicates = Vec::new();
+    let mut data_predicates = Vec::new();
+
+    for conjunct in filter.split_and() {
+        let strict_partition_only = 
conjunct.references_only_mapped_fields(&mapping);
+
+        if let Some(projected) = 
conjunct.project_field_index_inclusive(&mapping) {
+            partition_predicates.push(projected);
+        }
+
+        // Keep any conjunct that is not fully partition-only for data-level
+        // stats pruning, even if part of it contributed to partition pruning.
+        if !strict_partition_only {
+            data_predicates.push(conjunct);
+        }
+    }
+
+    let partition_predicate = if partition_predicates.is_empty() {
+        None
+    } else {
+        Some(Predicate::and(partition_predicates))
+    };
+
+    (partition_predicate, data_predicates)
+}
+
+/// Check whether a data file *may* contain rows matching all `predicates`.
+///
+/// Pruning is evaluated per file and fails open when stats cannot be
+/// interpreted safely, including schema mismatches, incompatible stats arity,
+/// and missing or corrupted stats. Mixed-schema tables can still prune files
+/// written with the current schema; unsupported or inconclusive predicates are
+/// conservatively kept.
+fn data_file_matches_predicates(
+    file: &DataFileMeta,
+    predicates: &[Predicate],
+    current_schema_id: i64,
+    num_fields: usize,
+) -> bool {
+    if predicates.is_empty() {
+        return true;
+    }
+
+    // Evaluate constant predicates before consulting stats.
+    if predicates
+        .iter()
+        .any(|p| matches!(p, Predicate::AlwaysFalse))
+    {
+        return false;
+    }
+    if predicates
+        .iter()
+        .all(|p| matches!(p, Predicate::AlwaysTrue))
+    {
+        return true;
+    }
+
+    if file.schema_id != current_schema_id {
+        return true;
+    }
+
+    // Fail open if schema evolution or stats layout make index-based access 
unsafe.
+    let Some(stats) = FileStatsRows::try_from_data_file(file, num_fields) else 
{
+        return true;
+    };
+
+    predicates
+        .iter()
+        .all(|predicate| data_predicate_may_match(predicate, &stats))
+}
+
+async fn resolve_stats_schema(
+    table: &Table,
+    file_schema_id: i64,
+    schema_cache: &mut HashMap<i64, Option<Arc<ResolvedStatsSchema>>>,
+) -> Option<Arc<ResolvedStatsSchema>> {
+    if let Some(cached) = schema_cache.get(&file_schema_id) {
+        return cached.clone();
+    }
+
+    let table_schema = table.schema();
+    let current_fields = table_schema.fields();
+    let resolved = if file_schema_id == table_schema.id() {
+        Some(Arc::new(ResolvedStatsSchema {
+            file_fields: current_fields.to_vec(),
+            field_mapping: identity_field_mapping(current_fields.len()),
+        }))
+    } else {
+        let file_schema = 
table.schema_manager().schema(file_schema_id).await.ok()?;
+        let file_fields = file_schema.fields().to_vec();
+        Some(Arc::new(ResolvedStatsSchema {
+            field_mapping: normalize_field_mapping(
+                create_index_mapping(current_fields, &file_fields),
+                current_fields.len(),
+            ),
+            file_fields,
+        }))
+    };
+
+    schema_cache.insert(file_schema_id, resolved.clone());
+    resolved
+}
+
+async fn data_file_matches_predicates_for_table(
+    table: &Table,
+    file: &DataFileMeta,
+    predicates: &[Predicate],
+    schema_cache: &mut HashMap<i64, Option<Arc<ResolvedStatsSchema>>>,
+) -> bool {
+    if predicates.is_empty() {
+        return true;
+    }
+
+    if file.schema_id == table.schema().id() {
+        return data_file_matches_predicates(
+            file,
+            predicates,
+            table.schema().id(),
+            table.schema().fields().len(),
+        );
+    }
+
+    let Some(resolved) = resolve_stats_schema(table, file.schema_id, 
schema_cache).await else {
+        return true;
+    };
+
+    let Some(stats) = FileStatsRows::try_from_data_file(file, 
resolved.file_fields.len()) else {
+        return true;
+    };
+
+    predicates.iter().all(|predicate| {
+        data_predicate_may_match_with_schema(
+            predicate,
+            &stats,
+            &resolved.field_mapping,
+            &resolved.file_fields,
+        )
+    })
+}
+
+fn data_predicate_may_match(predicate: &Predicate, stats: &FileStatsRows) -> 
bool {
+    match predicate {
+        Predicate::AlwaysTrue => true,
+        Predicate::AlwaysFalse => false,
+        Predicate::And(children) => children
+            .iter()
+            .all(|child| data_predicate_may_match(child, stats)),
+        // Keep the first version conservative: only prune simple leaves and 
conjunctions.
+        Predicate::Or(_) | Predicate::Not(_) => true,
+        Predicate::Leaf {
+            index,
+            data_type,
+            op,
+            literals,
+            ..
+        } => data_leaf_may_match(*index, data_type, data_type, *op, literals, 
stats),
+    }
+}
+
+fn data_predicate_may_match_with_schema(
+    predicate: &Predicate,
+    stats: &FileStatsRows,
+    field_mapping: &[Option<usize>],
+    file_fields: &[DataField],
+) -> bool {
+    match predicate {
+        Predicate::AlwaysTrue => true,
+        Predicate::AlwaysFalse => false,
+        Predicate::And(children) => children.iter().all(|child| {
+            data_predicate_may_match_with_schema(child, stats, field_mapping, 
file_fields)
+        }),
+        // Keep the first version conservative: only prune simple leaves and 
conjunctions.
+        Predicate::Or(_) | Predicate::Not(_) => true,
+        Predicate::Leaf {
+            index,
+            data_type,
+            op,
+            literals,
+            ..
+        } => match field_mapping.get(*index).copied().flatten() {
+            Some(file_index) => {
+                let Some(file_field) = file_fields.get(file_index) else {
+                    return true;
+                };
+                data_leaf_may_match(
+                    file_index,
+                    file_field.data_type(),
+                    data_type,
+                    *op,
+                    literals,
+                    stats,
+                )
+            }
+            None => missing_field_may_match(*op, stats.row_count),
+        },
+    }
+}
+
+fn data_leaf_may_match(
+    index: usize,
+    stats_data_type: &DataType,
+    predicate_data_type: &DataType,
+    op: PredicateOperator,
+    literals: &[Datum],
+    stats: &FileStatsRows,
+) -> bool {
+    let row_count = stats.row_count;
+    if row_count <= 0 {
+        return false;
+    }
+
+    let null_count = stats.null_count(index);
+    let all_null = null_count.map(|count| count == row_count);
+
+    match op {
+        PredicateOperator::IsNull => {
+            return null_count.is_none_or(|count| count > 0);
+        }
+        PredicateOperator::IsNotNull => {
+            return all_null != Some(true);
+        }
+        PredicateOperator::In | PredicateOperator::NotIn => {
+            return true;
+        }
+        PredicateOperator::Eq
+        | PredicateOperator::NotEq
+        | PredicateOperator::Lt
+        | PredicateOperator::LtEq
+        | PredicateOperator::Gt
+        | PredicateOperator::GtEq => {}
+    }
+
+    if all_null == Some(true) {
+        return false;
+    }
+
+    let literal = match literals.first() {
+        Some(literal) => literal,
+        None => return true,
+    };
+
+    let min_value = match stats
+        .min_values
+        .as_ref()
+        .and_then(|row| extract_stats_datum(row, index, stats_data_type))
+        .and_then(|datum| coerce_stats_datum_for_predicate(datum, 
predicate_data_type))
+    {
+        Some(value) => value,
+        None => return true,
+    };
+    let max_value = match stats
+        .max_values
+        .as_ref()
+        .and_then(|row| extract_stats_datum(row, index, stats_data_type))
+        .and_then(|datum| coerce_stats_datum_for_predicate(datum, 
predicate_data_type))
+    {
+        Some(value) => value,
+        None => return true,
+    };
+
+    match op {
+        PredicateOperator::Eq => {
+            !matches!(literal.partial_cmp(&min_value), Some(Ordering::Less))
+                && !matches!(literal.partial_cmp(&max_value), 
Some(Ordering::Greater))
+        }
+        PredicateOperator::NotEq => !(min_value == *literal && max_value == 
*literal),
+        PredicateOperator::Lt => !matches!(
+            min_value.partial_cmp(literal),
+            Some(Ordering::Greater | Ordering::Equal)
+        ),
+        PredicateOperator::LtEq => {
+            !matches!(min_value.partial_cmp(literal), Some(Ordering::Greater))
+        }
+        PredicateOperator::Gt => !matches!(
+            max_value.partial_cmp(literal),
+            Some(Ordering::Less | Ordering::Equal)
+        ),
+        PredicateOperator::GtEq => !matches!(max_value.partial_cmp(literal), 
Some(Ordering::Less)),
+        PredicateOperator::IsNull
+        | PredicateOperator::IsNotNull
+        | PredicateOperator::In
+        | PredicateOperator::NotIn => true,
+    }
+}
+
+fn missing_field_may_match(op: PredicateOperator, row_count: i64) -> bool {
+    if row_count <= 0 {
+        return false;
+    }
+
+    matches!(op, PredicateOperator::IsNull)
+}
+
+fn coerce_stats_datum_for_predicate(datum: Datum, predicate_data_type: 
&DataType) -> Option<Datum> {
+    match (datum, predicate_data_type) {
+        (datum @ Datum::Bool(_), DataType::Boolean(_))
+        | (datum @ Datum::TinyInt(_), DataType::TinyInt(_))
+        | (datum @ Datum::SmallInt(_), DataType::SmallInt(_))
+        | (datum @ Datum::Int(_), DataType::Int(_))
+        | (datum @ Datum::Long(_), DataType::BigInt(_))
+        | (datum @ Datum::Float(_), DataType::Float(_))
+        | (datum @ Datum::Double(_), DataType::Double(_))
+        | (datum @ Datum::String(_), DataType::VarChar(_))
+        | (datum @ Datum::String(_), DataType::Char(_))
+        | (datum @ Datum::Bytes(_), DataType::Binary(_))
+        | (datum @ Datum::Bytes(_), DataType::VarBinary(_))
+        | (datum @ Datum::Date(_), DataType::Date(_))
+        | (datum @ Datum::Time(_), DataType::Time(_))
+        | (datum @ Datum::Timestamp { .. }, DataType::Timestamp(_))
+        | (datum @ Datum::LocalZonedTimestamp { .. }, 
DataType::LocalZonedTimestamp(_))
+        | (datum @ Datum::Decimal { .. }, DataType::Decimal(_)) => Some(datum),
+        (Datum::TinyInt(value), DataType::SmallInt(_)) => 
Some(Datum::SmallInt(value as i16)),
+        (Datum::TinyInt(value), DataType::Int(_)) => Some(Datum::Int(value as 
i32)),
+        (Datum::TinyInt(value), DataType::BigInt(_)) => Some(Datum::Long(value 
as i64)),
+        (Datum::SmallInt(value), DataType::Int(_)) => Some(Datum::Int(value as 
i32)),
+        (Datum::SmallInt(value), DataType::BigInt(_)) => 
Some(Datum::Long(value as i64)),
+        (Datum::Int(value), DataType::BigInt(_)) => Some(Datum::Long(value as 
i64)),
+        (Datum::Float(value), DataType::Double(_)) => Some(Datum::Double(value 
as f64)),
+        _ => None,
+    }
+}
+
+fn extract_stats_datum(row: &BinaryRow, index: usize, data_type: &DataType) -> 
Option<Datum> {
+    let min_row_len = BinaryRow::cal_fix_part_size_in_bytes(row.arity()) as 
usize;
+    if index >= row.arity() as usize || row.data().len() < min_row_len {
+        return None;
+    }
+
+    match extract_datum(row, index, data_type) {
+        Ok(Some(datum)) => Some(datum),
+        Ok(None) | Err(_) => None,
+    }
+}
+
 /// TableScan for full table scan (no incremental, no predicate).
 ///
 /// Reference: 
[pypaimon.read.table_scan.TableScan](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/table_scan.py)
@@ -348,35 +757,23 @@ impl<'a> TableScan<'a> {
             return Ok(Plan::new(Vec::new()));
         }
 
-        // --- Partition predicate extraction ---
         let partition_keys = self.table.schema().partition_keys();
-        let partition_predicate = if !partition_keys.is_empty() {
-            self.filter.clone().and_then(|filter| {
-                let mapping =
-                    field_idx_to_partition_idx(self.table.schema().fields(), 
partition_keys);
-                let conjuncts = filter.split_and();
-                let remapped: Vec<Predicate> = conjuncts
-                    .into_iter()
-                    .filter_map(|c| c.remap_field_index(&mapping))
-                    .collect();
-                if remapped.is_empty() {
-                    None
-                } else {
-                    Some(Predicate::and(remapped))
-                }
-            })
+        let (partition_predicate, data_predicates) = if let Some(filter) = 
self.filter.clone() {
+            if partition_keys.is_empty() {
+                (None, filter.split_and())
+            } else {
+                split_partition_and_data_predicates(
+                    filter,
+                    self.table.schema().fields(),
+                    partition_keys,
+                )
+            }
         } else {
-            None
+            (None, Vec::new())
         };
 
-        // --- Partition pruning: filter manifest entries before grouping ---
-        //
-        // Note: split construction later still requires a decodable BinaryRow
-        // and will fail on corrupt partition bytes. Pruning is intentionally
-        // best-effort; split construction is mandatory.
         let entries = if let Some(ref pred) = partition_predicate {
             let mut kept = Vec::with_capacity(entries.len());
-            // Cache: partition bytes → accept/reject to avoid re-decoding.
             let mut cache: HashMap<Vec<u8>, bool> = HashMap::new();
             for e in entries {
                 let accept = match cache.get(e.partition()) {
@@ -400,6 +797,32 @@ impl<'a> TableScan<'a> {
             return Ok(Plan::new(Vec::new()));
         }
 
+        // Data-evolution tables can spread one logical row across multiple 
files with
+        // different column sets. Pruning files independently would split 
merge groups,
+        // so keep the current fail-open behavior until we support group-aware 
pruning.
+        let entries = if data_predicates.is_empty() || data_evolution_enabled {
+            entries
+        } else {
+            let mut kept = Vec::with_capacity(entries.len());
+            let mut schema_cache: HashMap<i64, 
Option<Arc<ResolvedStatsSchema>>> = HashMap::new();
+            for entry in entries {
+                if data_file_matches_predicates_for_table(
+                    self.table,
+                    entry.file(),
+                    &data_predicates,
+                    &mut schema_cache,
+                )
+                .await
+                {
+                    kept.push(entry);
+                }
+            }
+            kept
+        };
+        if entries.is_empty() {
+            return Ok(Plan::new(Vec::new()));
+        }
+
         // Group by (partition, bucket). Key = (partition_bytes, bucket).
         let mut groups: HashMap<(Vec<u8>, i32), Vec<ManifestEntry>> = 
HashMap::new();
         for e in entries {
@@ -462,21 +885,19 @@ impl<'a> TableScan<'a> {
                 .as_ref()
                 .and_then(|map| map.get(&PartitionBucket::new(partition, 
bucket)));
 
-            // Split files into groups: data evolution merges overlapping 
row_id ranges;
-            // multi-file groups need column-wise merge, single-file groups 
can be bin-packed.
+            // Data-evolution tables merge overlapping row-id groups 
column-wise during read.
+            // Keep that split boundary intact and only bin-pack single-file 
groups.
             let file_groups_with_raw: Vec<(Vec<DataFileMeta>, bool)> = if 
data_evolution_enabled {
                 let row_id_groups = group_by_overlapping_row_id(data_files);
-                let (singles, multis): (Vec<_>, Vec<_>) =
-                    row_id_groups.into_iter().partition(|g| g.len() == 1);
-
-                let mut result: Vec<(Vec<DataFileMeta>, bool)> = Vec::new();
+                let (singles, multis): (Vec<_>, Vec<_>) = row_id_groups
+                    .into_iter()
+                    .partition(|group| group.len() == 1);
 
-                // Multi-file groups: each becomes its own split, 
raw_convertible=false
+                let mut result = Vec::new();
                 for group in multis {
                     result.push((group, false));
                 }
 
-                // Single-file groups: flatten and bin-pack, 
raw_convertible=true
                 let single_files: Vec<DataFileMeta> = 
singles.into_iter().flatten().collect();
                 for file_group in split_for_batch(single_files, 
target_split_size, open_file_cost) {
                     result.push((file_group, true));
@@ -486,7 +907,7 @@ impl<'a> TableScan<'a> {
             } else {
                 split_for_batch(data_files, target_split_size, open_file_cost)
                     .into_iter()
-                    .map(|g| (g, true))
+                    .map(|group| (group, true))
                     .collect()
             };
 
@@ -522,7 +943,9 @@ impl<'a> TableScan<'a> {
 
 #[cfg(test)]
 mod tests {
-    use super::{group_by_overlapping_row_id, partition_matches_predicate};
+    use super::{
+        data_file_matches_predicates, group_by_overlapping_row_id, 
partition_matches_predicate,
+    };
     use crate::spec::{
         stats::BinaryTableStats, ArrayType, DataField, DataFileMeta, DataType, 
Datum,
         DeletionVectorMeta, FileKind, IndexFileMeta, IndexManifestEntry, 
IntType, Predicate,
@@ -530,43 +953,7 @@ mod tests {
     };
     use crate::table::source::DeletionFile;
     use crate::Error;
-    use chrono::{DateTime, Utc};
-
-    /// Helper to build a DataFileMeta with data evolution fields.
-    fn make_evo_file(
-        name: &str,
-        file_size: i64,
-        row_count: i64,
-        max_seq: i64,
-        first_row_id: Option<i64>,
-    ) -> DataFileMeta {
-        DataFileMeta {
-            file_name: name.to_string(),
-            file_size,
-            row_count,
-            min_key: Vec::new(),
-            max_key: Vec::new(),
-            key_stats: BinaryTableStats::new(Vec::new(), Vec::new(), 
Vec::new()),
-            value_stats: BinaryTableStats::new(Vec::new(), Vec::new(), 
Vec::new()),
-            min_sequence_number: 0,
-            max_sequence_number: max_seq,
-            schema_id: 0,
-            level: 0,
-            extra_files: Vec::new(),
-            creation_time: DateTime::<Utc>::from_timestamp(0, 0).unwrap(),
-            delete_row_count: None,
-            embedded_index: None,
-            first_row_id,
-            write_cols: None,
-        }
-    }
-
-    fn file_names(groups: &[Vec<DataFileMeta>]) -> Vec<Vec<&str>> {
-        groups
-            .iter()
-            .map(|g| g.iter().map(|f| f.file_name.as_str()).collect())
-            .collect()
-    }
+    use chrono::Utc;
 
     struct SerializedBinaryRowBuilder {
         arity: i32,
@@ -605,6 +992,51 @@ mod tests {
         }
     }
 
+    struct RawBinaryRowBuilder {
+        arity: i32,
+        null_bits_size: usize,
+        data: Vec<u8>,
+    }
+
+    impl RawBinaryRowBuilder {
+        fn new(arity: i32) -> Self {
+            let null_bits_size = 
crate::spec::BinaryRow::cal_bit_set_width_in_bytes(arity) as usize;
+            let fixed_part_size = null_bits_size + (arity as usize) * 8;
+            Self {
+                arity,
+                null_bits_size,
+                data: vec![0u8; fixed_part_size],
+            }
+        }
+
+        fn field_offset(&self, pos: usize) -> usize {
+            self.null_bits_size + pos * 8
+        }
+
+        fn set_null_at(&mut self, pos: usize) {
+            let bit_index = pos + crate::spec::BinaryRow::HEADER_SIZE_IN_BYTES 
as usize;
+            let byte_index = bit_index / 8;
+            let bit_offset = bit_index % 8;
+            self.data[byte_index] |= 1 << bit_offset;
+
+            let offset = self.field_offset(pos);
+            self.data[offset..offset + 8].fill(0);
+        }
+
+        fn write_int(&mut self, pos: usize, value: i32) {
+            let offset = self.field_offset(pos);
+            self.data[offset..offset + 
4].copy_from_slice(&value.to_le_bytes());
+        }
+
+        fn build(self) -> Vec<u8> {
+            debug_assert_eq!(
+                self.data.len(),
+                self.null_bits_size + (self.arity as usize) * 8
+            );
+            self.data
+        }
+    }
+
     fn partition_string_field() -> Vec<DataField> {
         vec![DataField::new(
             0,
@@ -613,6 +1045,105 @@ mod tests {
         )]
     }
 
+    fn int_field() -> Vec<DataField> {
+        vec![DataField::new(
+            0,
+            "id".to_string(),
+            DataType::Int(IntType::new()),
+        )]
+    }
+
+    fn int_stats_row(value: Option<i32>) -> Vec<u8> {
+        let mut builder = RawBinaryRowBuilder::new(1);
+        match value {
+            Some(value) => builder.write_int(0, value),
+            None => builder.set_null_at(0),
+        }
+        let raw = builder.build();
+        let mut serialized = Vec::with_capacity(4 + raw.len());
+        serialized.extend_from_slice(&(1_i32).to_be_bytes());
+        serialized.extend_from_slice(&raw);
+        serialized
+    }
+
+    fn test_data_file_meta(
+        min_values: Vec<u8>,
+        max_values: Vec<u8>,
+        null_counts: Vec<i64>,
+        row_count: i64,
+    ) -> DataFileMeta {
+        test_data_file_meta_with_schema(
+            min_values,
+            max_values,
+            null_counts,
+            row_count,
+            0, // default schema_id
+        )
+    }
+
+    fn test_data_file_meta_with_schema(
+        min_values: Vec<u8>,
+        max_values: Vec<u8>,
+        null_counts: Vec<i64>,
+        row_count: i64,
+        schema_id: i64,
+    ) -> DataFileMeta {
+        DataFileMeta {
+            file_name: "test.parquet".into(),
+            file_size: 128,
+            row_count,
+            min_key: Vec::new(),
+            max_key: Vec::new(),
+            key_stats: BinaryTableStats::new(Vec::new(), Vec::new(), 
Vec::new()),
+            value_stats: BinaryTableStats::new(min_values, max_values, 
null_counts),
+            min_sequence_number: 0,
+            max_sequence_number: 0,
+            schema_id,
+            level: 1,
+            extra_files: Vec::new(),
+            creation_time: Utc::now(),
+            delete_row_count: None,
+            embedded_index: None,
+            first_row_id: None,
+            write_cols: None,
+        }
+    }
+
+    fn make_evo_file(
+        name: &str,
+        file_size: i64,
+        row_count: i64,
+        max_seq: i64,
+        first_row_id: Option<i64>,
+    ) -> DataFileMeta {
+        DataFileMeta {
+            file_name: name.to_string(),
+            file_size,
+            row_count,
+            min_key: Vec::new(),
+            max_key: Vec::new(),
+            key_stats: BinaryTableStats::new(Vec::new(), Vec::new(), 
Vec::new()),
+            value_stats: BinaryTableStats::new(Vec::new(), Vec::new(), 
Vec::new()),
+            min_sequence_number: max_seq,
+            max_sequence_number: max_seq,
+            schema_id: 0,
+            level: 0,
+            extra_files: Vec::new(),
+            creation_time: Utc::now(),
+            delete_row_count: None,
+            embedded_index: None,
+            first_row_id,
+            write_cols: None,
+        }
+    }
+
+    fn file_names(groups: &[Vec<DataFileMeta>]) -> Vec<Vec<&str>> {
+        groups
+            .iter()
+            .map(|group| group.iter().map(|file| 
file.file_name.as_str()).collect())
+            .collect()
+    }
+
     #[test]
     fn test_partition_matches_predicate_decode_failure_fails_open() {
         let predicate = PredicateBuilder::new(&partition_string_field())
@@ -645,7 +1176,8 @@ mod tests {
         );
     }
 
-    // ==================== group_by_overlapping_row_id tests 
====================
+    const TEST_SCHEMA_ID: i64 = 0;
+    const TEST_NUM_FIELDS: usize = 1;
 
     #[test]
     fn test_group_by_overlapping_row_id_empty() {
@@ -655,8 +1187,6 @@ mod tests {
 
     #[test]
     fn test_group_by_overlapping_row_id_no_row_ids() {
-        // Files without first_row_id each become their own group.
-        // Sorted by (i64::MIN, -max_seq), so b(seq=2) before a(seq=1).
         let files = vec![
             make_evo_file("a", 10, 100, 1, None),
             make_evo_file("b", 10, 100, 2, None),
@@ -667,7 +1197,6 @@ mod tests {
 
     #[test]
     fn test_group_by_overlapping_row_id_same_range() {
-        // Two files with the same first_row_id and row_count → same range → 
one group.
         let files = vec![
             make_evo_file("a", 10, 100, 2, Some(0)),
             make_evo_file("b", 10, 100, 1, Some(0)),
@@ -679,7 +1208,6 @@ mod tests {
 
     #[test]
     fn test_group_by_overlapping_row_id_overlapping_ranges() {
-        // File a: rows [0, 99], file b: rows [50, 149] → overlapping → one 
group.
         let files = vec![
             make_evo_file("a", 10, 100, 1, Some(0)),
             make_evo_file("b", 10, 100, 2, Some(50)),
@@ -691,7 +1219,6 @@ mod tests {
 
     #[test]
     fn test_group_by_overlapping_row_id_non_overlapping() {
-        // File a: rows [0, 99], file b: rows [100, 199] → no overlap → two 
groups.
         let files = vec![
             make_evo_file("a", 10, 100, 1, Some(0)),
             make_evo_file("b", 10, 100, 2, Some(100)),
@@ -703,8 +1230,6 @@ mod tests {
 
     #[test]
     fn test_group_by_overlapping_row_id_mixed() {
-        // a: [0,99], b: [0,99] (overlap), c: None (own group), d: [200,299]
-        // After sort: c(None→MIN) comes first, then b(seq=2), a(seq=1), d.
         let files = vec![
             make_evo_file("a", 10, 100, 1, Some(0)),
             make_evo_file("b", 10, 100, 2, Some(0)),
@@ -720,7 +1245,6 @@ mod tests {
 
     #[test]
     fn test_group_by_overlapping_row_id_sorted_by_seq() {
-        // Within a group, files are sorted by (first_row_id, 
-max_sequence_number).
         let files = vec![
             make_evo_file("a", 10, 100, 1, Some(0)),
             make_evo_file("b", 10, 100, 3, Some(0)),
@@ -728,10 +1252,182 @@ mod tests {
         ];
         let groups = group_by_overlapping_row_id(files);
         assert_eq!(groups.len(), 1);
-        // Sorted by descending max_sequence_number: b(3), c(2), a(1)
         assert_eq!(file_names(&groups), vec![vec!["b", "c", "a"]]);
     }
 
+    #[test]
+    fn test_data_file_matches_eq_prunes_out_of_range() {
+        let fields = int_field();
+        let file =
+            test_data_file_meta(int_stats_row(Some(10)), 
int_stats_row(Some(20)), vec![0], 5);
+        let predicate = PredicateBuilder::new(&fields)
+            .equal("id", Datum::Int(30))
+            .unwrap();
+
+        assert!(!data_file_matches_predicates(
+            &file,
+            &[predicate],
+            TEST_SCHEMA_ID,
+            TEST_NUM_FIELDS,
+        ));
+    }
+
+    #[test]
+    fn test_data_file_matches_is_null_prunes_when_null_count_is_zero() {
+        let fields = int_field();
+        let file =
+            test_data_file_meta(int_stats_row(Some(10)), 
int_stats_row(Some(20)), vec![0], 5);
+        let predicate = PredicateBuilder::new(&fields).is_null("id").unwrap();
+
+        assert!(!data_file_matches_predicates(
+            &file,
+            &[predicate],
+            TEST_SCHEMA_ID,
+            TEST_NUM_FIELDS,
+        ));
+    }
+
+    #[test]
+    fn test_data_file_matches_is_not_null_prunes_all_null_file() {
+        let fields = int_field();
+        let file = test_data_file_meta(int_stats_row(None), 
int_stats_row(None), vec![5], 5);
+        let predicate = 
PredicateBuilder::new(&fields).is_not_null("id").unwrap();
+
+        assert!(!data_file_matches_predicates(
+            &file,
+            &[predicate],
+            TEST_SCHEMA_ID,
+            TEST_NUM_FIELDS,
+        ));
+    }
+
+    #[test]
+    fn test_data_file_matches_unsupported_predicate_fails_open() {
+        let fields = int_field();
+        let file =
+            test_data_file_meta(int_stats_row(Some(10)), 
int_stats_row(Some(20)), vec![0], 5);
+        let pb = PredicateBuilder::new(&fields);
+        let predicate = Predicate::or(vec![
+            pb.less_than("id", Datum::Int(5)).unwrap(),
+            pb.greater_than("id", Datum::Int(25)).unwrap(),
+        ]);
+
+        assert!(data_file_matches_predicates(
+            &file,
+            &[predicate],
+            TEST_SCHEMA_ID,
+            TEST_NUM_FIELDS,
+        ));
+    }
+
+    #[test]
+    fn test_data_file_matches_corrupt_stats_fails_open() {
+        let fields = int_field();
+        let file = test_data_file_meta(Vec::new(), Vec::new(), vec![0], 5);
+        let predicate = PredicateBuilder::new(&fields)
+            .equal("id", Datum::Int(30))
+            .unwrap();
+
+        assert!(data_file_matches_predicates(
+            &file,
+            &[predicate],
+            TEST_SCHEMA_ID,
+            TEST_NUM_FIELDS,
+        ));
+    }
+
+    #[test]
+    fn test_data_file_matches_schema_mismatch_fails_open() {
+        let fields = int_field();
+        let file = test_data_file_meta_with_schema(
+            int_stats_row(Some(10)),
+            int_stats_row(Some(20)),
+            vec![0],
+            5,
+            5,
+        );
+        let predicate = PredicateBuilder::new(&fields)
+            .equal("id", Datum::Int(30))
+            .unwrap();
+
+        assert!(data_file_matches_predicates(
+            &file,
+            &[predicate],
+            TEST_SCHEMA_ID,
+            TEST_NUM_FIELDS,
+        ));
+    }
+
+    #[test]
+    fn test_data_file_matches_dense_stats_arity_mismatch_fails_open() {
+        let mut builder = RawBinaryRowBuilder::new(3);
+        builder.write_int(0, 10);
+        builder.write_int(1, 100);
+        builder.write_int(2, 200);
+        let raw = builder.build();
+        let mut min_serialized = Vec::with_capacity(4 + raw.len());
+        min_serialized.extend_from_slice(&(3_i32).to_be_bytes());
+        min_serialized.extend_from_slice(&raw);
+
+        let mut builder = RawBinaryRowBuilder::new(3);
+        builder.write_int(0, 20);
+        builder.write_int(1, 200);
+        builder.write_int(2, 300);
+        let raw = builder.build();
+        let mut max_serialized = Vec::with_capacity(4 + raw.len());
+        max_serialized.extend_from_slice(&(3_i32).to_be_bytes());
+        max_serialized.extend_from_slice(&raw);
+
+        let fields = int_field();
+        let file = test_data_file_meta(min_serialized, max_serialized, vec![0, 
0, 0], 5);
+        let predicate = PredicateBuilder::new(&fields)
+            .equal("id", Datum::Int(30))
+            .unwrap();
+
+        assert!(data_file_matches_predicates(
+            &file,
+            &[predicate],
+            TEST_SCHEMA_ID,
+            TEST_NUM_FIELDS,
+        ));
+    }
+
+    #[test]
+    fn test_data_file_matches_always_false_prunes_despite_schema_mismatch() {
+        let file = test_data_file_meta_with_schema(
+            int_stats_row(Some(10)),
+            int_stats_row(Some(20)),
+            vec![0],
+            5,
+            99,
+        );
+
+        assert!(!data_file_matches_predicates(
+            &file,
+            &[Predicate::AlwaysFalse],
+            TEST_SCHEMA_ID,
+            TEST_NUM_FIELDS,
+        ));
+    }
+
+    #[test]
+    fn test_data_file_matches_always_true_keeps_file_despite_schema_mismatch() 
{
+        let file = test_data_file_meta_with_schema(
+            int_stats_row(Some(10)),
+            int_stats_row(Some(20)),
+            vec![0],
+            5,
+            99,
+        );
+
+        assert!(data_file_matches_predicates(
+            &file,
+            &[Predicate::AlwaysTrue],
+            TEST_SCHEMA_ID,
+            TEST_NUM_FIELDS,
+        ));
+    }
+
     #[test]
     fn test_build_deletion_files_map_preserves_cardinality() {
         let entries = vec![IndexManifestEntry {

Reply via email to