This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 0cb1a83  feat(table): support partition predicate pruning in TableScan 
(#167)
0cb1a83 is described below

commit 0cb1a8390131f95ded9519bdc7083f1432d4fed1
Author: Zach <[email protected]>
AuthorDate: Wed Apr 1 22:57:20 2026 +0800

    feat(table): support partition predicate pruning in TableScan (#167)
---
 crates/integration_tests/tests/read_tables.rs | 309 +++++++++-
 crates/paimon/src/spec/mod.rs                 |   1 +
 crates/paimon/src/spec/predicate.rs           | 822 +++++++++++++++++++++++---
 crates/paimon/src/table/read_builder.rs       |  23 +-
 crates/paimon/src/table/table_scan.rs         | 162 ++++-
 5 files changed, 1228 insertions(+), 89 deletions(-)

diff --git a/crates/integration_tests/tests/read_tables.rs 
b/crates/integration_tests/tests/read_tables.rs
index 1b0032a..ec96653 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -22,7 +22,7 @@ use futures::TryStreamExt;
 use paimon::api::ConfigResponse;
 use paimon::catalog::{Identifier, RESTCatalog};
 use paimon::common::Options;
-use paimon::spec::{DataType, IntType, Schema, VarCharType};
+use paimon::spec::{DataType, IntType, Predicate, Schema, VarCharType};
 use paimon::{Catalog, Error, FileSystemCatalog, Plan};
 use std::collections::{HashMap, HashSet};
 
@@ -88,6 +88,27 @@ async fn scan_and_read_with_fs_catalog(
     scan_and_read(&catalog, table_name, projection).await
 }
 
+async fn scan_and_read_with_filter(
+    table: &paimon::Table,
+    filter: Predicate,
+) -> (Plan, Vec<RecordBatch>) {
+    let mut read_builder = table.new_read_builder();
+    read_builder.with_filter(filter);
+    let scan = read_builder.new_scan();
+    let plan = scan.plan().await.expect("Failed to plan scan");
+
+    let read = read_builder.new_read().expect("Failed to create read");
+    let stream = read
+        .to_arrow(plan.splits())
+        .expect("Failed to create arrow stream");
+    let batches: Vec<_> = stream
+        .try_collect()
+        .await
+        .expect("Failed to collect batches");
+
+    (plan, batches)
+}
+
 fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32, String)> {
     let mut rows = Vec::new();
     for batch in batches {
@@ -107,6 +128,55 @@ fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32, 
String)> {
     rows
 }
 
+fn extract_id_name_dt(batches: &[RecordBatch]) -> Vec<(i32, String, String)> {
+    let mut rows = Vec::new();
+    for batch in batches {
+        let id = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("id");
+        let name = batch
+            .column_by_name("name")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("name");
+        let dt = batch
+            .column_by_name("dt")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("dt");
+        for i in 0..batch.num_rows() {
+            rows.push((id.value(i), name.value(i).into(), dt.value(i).into()));
+        }
+    }
+    rows.sort_by_key(|(id, _, _)| *id);
+    rows
+}
+
+fn extract_plan_partitions(plan: &Plan) -> HashSet<String> {
+    plan.splits()
+        .iter()
+        .map(|split| {
+            split
+                .partition()
+                .get_string(0)
+                .expect("Failed to decode dt")
+                .to_string()
+        })
+        .collect()
+}
+
+fn extract_plan_multi_partitions(plan: &Plan) -> HashSet<(String, i32)> {
+    plan.splits()
+        .iter()
+        .map(|split| {
+            let partition = split.partition();
+            (
+                partition.get_string(0).expect("dt").to_string(),
+                partition.get_int(1).expect("hr"),
+            )
+        })
+        .collect()
+}
+
 #[tokio::test]
 async fn test_read_log_table() {
     let (plan, batches) = scan_and_read_with_fs_catalog("simple_log_table", 
None).await;
@@ -397,6 +467,7 @@ async fn test_read_projection_empty() {
         );
     }
 }
+
 #[tokio::test]
 async fn test_read_projection_unknown_column() {
     let catalog = create_file_system_catalog();
@@ -460,6 +531,242 @@ async fn test_read_projection_duplicate_column() {
     );
 }
 
+// ---------------------------------------------------------------------------
+// Partition filter integration tests
+// ---------------------------------------------------------------------------
+
+#[tokio::test]
+async fn test_read_partitioned_table_with_filter() {
+    use paimon::spec::{Datum, PredicateBuilder};
+
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, 
"partitioned_log_table").await;
+    // Build a filter: dt = '2024-01-01'
+    let schema = table.schema();
+    let pb = PredicateBuilder::new(schema.fields());
+    let filter = pb
+        .equal("dt", Datum::String("2024-01-01".into()))
+        .expect("Failed to build predicate");
+
+    let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+    let seen_partitions = extract_plan_partitions(&plan);
+    assert_eq!(
+        seen_partitions,
+        HashSet::from(["2024-01-01".into()]),
+        "Only the filtered partition should be in the plan"
+    );
+
+    let rows = extract_id_name_dt(&batches);
+    assert_eq!(
+        rows,
+        vec![
+            (1, "alice".into(), "2024-01-01".into()),
+            (2, "bob".into(), "2024-01-01".into()),
+        ]
+    );
+}
+
+#[tokio::test]
+async fn test_read_multi_partitioned_table_with_filter() {
+    use paimon::spec::{Datum, Predicate, PredicateBuilder};
+
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, 
"multi_partitioned_log_table").await;
+    let schema = table.schema();
+    let pb = PredicateBuilder::new(schema.fields());
+
+    // Filter: dt = '2024-01-01' AND hr = 10
+    let filter = Predicate::and(vec![
+        pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
+        pb.equal("hr", Datum::Int(10)).unwrap(),
+    ]);
+
+    let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+    let partitions = extract_plan_multi_partitions(&plan);
+    assert_eq!(
+        partitions,
+        HashSet::from([("2024-01-01".into(), 10)]),
+        "Only dt=2024-01-01, hr=10 should survive"
+    );
+
+    let actual = extract_id_name(&batches);
+    assert_eq!(
+        actual,
+        vec![(1, "alice".to_string()), (2, "bob".to_string()),],
+        "Only rows from dt=2024-01-01, hr=10 should be returned"
+    );
+}
+
+#[tokio::test]
+async fn 
test_read_partitioned_table_data_only_filter_preserves_all_partitions() {
+    use paimon::spec::{Datum, PredicateBuilder};
+
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, 
"partitioned_log_table").await;
+    let schema = table.schema();
+    let pb = PredicateBuilder::new(schema.fields());
+
+    // Data-only filter: id > 10 — should NOT prune any partitions,
+    // and is still ignored at read level in Phase 2.
+    let filter = pb
+        .greater_than("id", Datum::Int(10))
+        .expect("Failed to build predicate");
+
+    let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+    let seen_partitions = extract_plan_partitions(&plan);
+    assert_eq!(
+        seen_partitions,
+        HashSet::from(["2024-01-01".into(), "2024-01-02".into()]),
+        "Data-only filter should not prune any partitions"
+    );
+
+    let actual = extract_id_name(&batches);
+    assert_eq!(
+        actual,
+        vec![
+            (1, "alice".to_string()),
+            (2, "bob".to_string()),
+            (3, "carol".to_string()),
+        ],
+        "Data predicate is not applied at read level; all rows are still 
returned"
+    );
+}
+
+/// Mixed AND: partition predicate prunes partitions, but data predicate is
+/// silently ignored — all rows from the matching partition are returned.
+#[tokio::test]
+async fn test_read_partitioned_table_mixed_and_filter() {
+    use paimon::spec::{Datum, Predicate, PredicateBuilder};
+
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, 
"partitioned_log_table").await;
+    let schema = table.schema();
+    let pb = PredicateBuilder::new(schema.fields());
+
+    // dt = '2024-01-01' AND id > 10
+    // Partition conjunct (dt) is applied; data conjunct (id) is NOT.
+    let filter = Predicate::and(vec![
+        pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
+        pb.greater_than("id", Datum::Int(10)).unwrap(),
+    ]);
+
+    let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+    let seen_partitions = extract_plan_partitions(&plan);
+    assert_eq!(
+        seen_partitions,
+        HashSet::from(["2024-01-01".into()]),
+        "Only dt=2024-01-01 should survive"
+    );
+
+    let actual = extract_id_name(&batches);
+    assert_eq!(
+        actual,
+        vec![(1, "alice".to_string()), (2, "bob".to_string())],
+        "Data predicate (id > 10) is NOT applied — all rows from matching 
partition returned"
+    );
+}
+
+/// Mixed OR: `dt = '...' OR id > 10` cannot be split into a pure partition
+/// predicate, so no partitions should be pruned.
+#[tokio::test]
+async fn test_read_partitioned_table_mixed_or_filter_preserves_all() {
+    use paimon::spec::{Datum, Predicate, PredicateBuilder};
+
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, 
"partitioned_log_table").await;
+    let schema = table.schema();
+    let pb = PredicateBuilder::new(schema.fields());
+
+    // dt = '2024-01-01' OR id > 10 — mixed OR is not safely splittable.
+    let filter = Predicate::or(vec![
+        pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
+        pb.greater_than("id", Datum::Int(10)).unwrap(),
+    ]);
+
+    let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+    let seen_partitions = extract_plan_partitions(&plan);
+    assert_eq!(
+        seen_partitions,
+        HashSet::from(["2024-01-01".into(), "2024-01-02".into()]),
+        "Mixed OR should not prune any partitions"
+    );
+
+    let actual = extract_id_name(&batches);
+    assert_eq!(
+        actual,
+        vec![
+            (1, "alice".to_string()),
+            (2, "bob".to_string()),
+            (3, "carol".to_string()),
+        ],
+        "All rows should be returned when pruning is not possible"
+    );
+}
+
+/// Filter that matches no existing partition — all entries pruned, 0 splits.
+#[tokio::test]
+async fn test_read_partitioned_table_filter_matches_no_partition() {
+    use paimon::spec::{Datum, PredicateBuilder};
+
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, 
"partitioned_log_table").await;
+    let schema = table.schema();
+    let pb = PredicateBuilder::new(schema.fields());
+
+    // dt = '9999-12-31' matches no partition.
+    let filter = pb
+        .equal("dt", Datum::String("9999-12-31".into()))
+        .expect("Failed to build predicate");
+
+    let mut read_builder = table.new_read_builder();
+    read_builder.with_filter(filter);
+    let scan = read_builder.new_scan();
+    let plan = scan.plan().await.expect("Failed to plan scan");
+
+    assert!(
+        plan.splits().is_empty(),
+        "No splits should survive when filter matches no partition"
+    );
+}
+
+#[tokio::test]
+async fn test_read_partitioned_table_eval_row_error_fails_plan() {
+    use paimon::spec::{ArrayType, DataType, Datum, IntType, PredicateOperator};
+
+    let catalog = create_file_system_catalog();
+    let table = get_table_from_catalog(&catalog, 
"partitioned_log_table").await;
+    let dt_index = table
+        .schema()
+        .fields()
+        .iter()
+        .position(|f| f.name() == "dt")
+        .expect("dt partition column should exist");
+
+    // Use an unsupported DataType in a partition leaf so remapping succeeds
+    // but `eval_row` fails during partition pruning.
+    let filter = Predicate::Leaf {
+        column: "dt".into(),
+        index: dt_index,
+        data_type: 
DataType::Array(ArrayType::new(DataType::Int(IntType::new()))),
+        op: PredicateOperator::Eq,
+        literals: vec![Datum::Int(42)],
+    };
+
+    let mut read_builder = table.new_read_builder();
+    read_builder.with_filter(filter);
+
+    let err = read_builder
+        .new_scan()
+        .plan()
+        .await
+        .expect_err("eval_row error should fail-fast during planning");
+
+    assert!(
+        matches!(&err, Error::Unsupported { message } if 
message.contains("extract_datum")),
+        "Expected extract_datum unsupported error, got: {err:?}"
+    );
+}
+
 // ======================= REST Catalog read tests 
===============================
 
 /// Build a simple test schema matching the Spark-provisioned tables (id INT, 
name VARCHAR).
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index 3e82002..970fb5e 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -56,6 +56,7 @@ pub use types::*;
 mod partition_utils;
 pub(crate) use partition_utils::PartitionComputer;
 mod predicate;
+pub(crate) use predicate::eval_row;
 pub use predicate::{
     field_idx_to_partition_idx, Datum, Predicate, PredicateBuilder, 
PredicateOperator,
 };
diff --git a/crates/paimon/src/spec/predicate.rs 
b/crates/paimon/src/spec/predicate.rs
index 67954a1..4c89820 100644
--- a/crates/paimon/src/spec/predicate.rs
+++ b/crates/paimon/src/spec/predicate.rs
@@ -24,8 +24,10 @@
 //! - Java `PredicateBuilder` / `LeafPredicate` / `CompoundPredicate`
 
 use crate::error::*;
+use crate::spec::data_file::BinaryRow;
 use crate::spec::types::DataType;
 use crate::spec::DataField;
+use std::cmp::Ordering;
 use std::fmt;
 
 // ---------------------------------------------------------------------------
@@ -43,6 +45,7 @@ use std::fmt;
 /// rather than `BigDecimal.equals` which is scale-sensitive),
 /// e.g. `Decimal(10, scale=1)` == `Decimal(100, scale=2)`
 /// because both represent `1.0`.
+///
 #[derive(Debug, Clone)]
 pub enum Datum {
     Bool(bool),
@@ -100,55 +103,66 @@ impl fmt::Display for Datum {
 
 impl PartialEq for Datum {
     fn eq(&self, other: &Self) -> bool {
-        match (self, other) {
-            (Self::Bool(a), Self::Bool(b)) => a == b,
-            (Self::TinyInt(a), Self::TinyInt(b)) => a == b,
-            (Self::SmallInt(a), Self::SmallInt(b)) => a == b,
-            (Self::Int(a), Self::Int(b)) => a == b,
-            (Self::Long(a), Self::Long(b)) => a == b,
-            (Self::Float(a), Self::Float(b)) => a == b,
-            (Self::Double(a), Self::Double(b)) => a == b,
-            (Self::String(a), Self::String(b)) => a == b,
-            (Self::Date(a), Self::Date(b)) => a == b,
-            (Self::Time(a), Self::Time(b)) => a == b,
-            (
-                Self::Timestamp {
-                    millis: ma,
-                    nanos: na,
-                },
-                Self::Timestamp {
-                    millis: mb,
-                    nanos: nb,
-                },
-            ) => ma == mb && na == nb,
-            (
-                Self::LocalZonedTimestamp {
-                    millis: ma,
-                    nanos: na,
-                },
-                Self::LocalZonedTimestamp {
-                    millis: mb,
-                    nanos: nb,
-                },
-            ) => ma == mb && na == nb,
-            // Decimal: mathematical equivalence — normalize to common scale
-            // before comparing.  Matches Java Paimon's Decimal which uses
-            // compareTo() == 0 (not BigDecimal.equals which is 
scale-sensitive).
-            (
-                Self::Decimal {
-                    unscaled: ua,
-                    scale: sa,
-                    ..
-                },
-                Self::Decimal {
-                    unscaled: ub,
-                    scale: sb,
-                    ..
-                },
-            ) => decimal_eq(*ua, *sa, *ub, *sb),
-            (Self::Bytes(a), Self::Bytes(b)) => a == b,
-            _ => false,
-        }
+        datum_eq(self, other)
+    }
+}
+
+impl PartialOrd for Datum {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        datum_cmp(self, other)
+    }
+}
+
+fn datum_eq(lhs: &Datum, rhs: &Datum) -> bool {
+    datum_cmp(lhs, rhs) == Some(Ordering::Equal)
+}
+
+fn datum_cmp(lhs: &Datum, rhs: &Datum) -> Option<Ordering> {
+    match (lhs, rhs) {
+        (Datum::Bool(a), Datum::Bool(b)) => a.partial_cmp(b),
+        (Datum::TinyInt(a), Datum::TinyInt(b)) => a.partial_cmp(b),
+        (Datum::SmallInt(a), Datum::SmallInt(b)) => a.partial_cmp(b),
+        (Datum::Int(a), Datum::Int(b)) => a.partial_cmp(b),
+        (Datum::Long(a), Datum::Long(b)) => a.partial_cmp(b),
+        (Datum::Float(a), Datum::Float(b)) => a.partial_cmp(b),
+        (Datum::Double(a), Datum::Double(b)) => a.partial_cmp(b),
+        (Datum::String(a), Datum::String(b)) => a.partial_cmp(b),
+        (Datum::Date(a), Datum::Date(b)) => a.partial_cmp(b),
+        (Datum::Time(a), Datum::Time(b)) => a.partial_cmp(b),
+        (
+            Datum::Timestamp {
+                millis: ma,
+                nanos: na,
+            },
+            Datum::Timestamp {
+                millis: mb,
+                nanos: nb,
+            },
+        ) => (ma, na).partial_cmp(&(mb, nb)),
+        (
+            Datum::LocalZonedTimestamp {
+                millis: ma,
+                nanos: na,
+            },
+            Datum::LocalZonedTimestamp {
+                millis: mb,
+                nanos: nb,
+            },
+        ) => (ma, na).partial_cmp(&(mb, nb)),
+        (
+            Datum::Decimal {
+                unscaled: ua,
+                scale: sa,
+                ..
+            },
+            Datum::Decimal {
+                unscaled: ub,
+                scale: sb,
+                ..
+            },
+        ) => decimal_cmp(*ua, *sa, *ub, *sb),
+        (Datum::Bytes(a), Datum::Bytes(b)) => Some(java_bytes_cmp(a, b)),
+        _ => None,
     }
 }
 
@@ -156,23 +170,28 @@ impl PartialEq for Datum {
 ///
 /// Normalizes both to the larger scale, then compares unscaled values.
 /// E.g. `(10, scale=1)` vs `(100, scale=2)` → both represent 1.0 → equal.
-fn decimal_eq(ua: i128, sa: u32, ub: i128, sb: u32) -> bool {
+fn decimal_cmp(ua: i128, sa: u32, ub: i128, sb: u32) -> Option<Ordering> {
     if sa == sb {
-        return ua == ub;
+        return ua.partial_cmp(&ub);
     }
-    // Scale up the side with the smaller scale.
     let (na, nb) = if sa < sb {
-        match ua.checked_mul(pow10_i128(sb - sa)) {
-            Some(scaled) => (scaled, ub),
-            None => return false,
-        }
+        (ua.checked_mul(pow10_i128(sb - sa))?, ub)
     } else {
-        match ub.checked_mul(pow10_i128(sa - sb)) {
-            Some(scaled) => (ua, scaled),
-            None => return false,
-        }
+        (ua, ub.checked_mul(pow10_i128(sa - sb))?)
     };
-    na == nb
+    na.partial_cmp(&nb)
+}
+
+/// Match Java `CompareUtils.compare(byte[], byte[])`, which compares signed
+/// bytes lexicographically.
+fn java_bytes_cmp(a: &[u8], b: &[u8]) -> Ordering {
+    for (&lhs, &rhs) in a.iter().zip(b.iter()) {
+        let cmp = (lhs as i8).cmp(&(rhs as i8));
+        if cmp != Ordering::Equal {
+            return cmp;
+        }
+    }
+    a.len().cmp(&b.len())
 }
 
 /// 10^exp as i128.  Returns i128::MAX for exponents that would overflow.
@@ -262,23 +281,16 @@ pub enum Predicate {
 }
 
 impl Predicate {
-    /// Combine predicates with AND, with flattening and constant absorption.
+    /// Combine predicates with AND, with recursive flattening and constant 
absorption.
     ///
     /// - `AND(p, AlwaysTrue)` → `p` (identity element filtered out)
     /// - `AND(p, AlwaysFalse)` → `AlwaysFalse` (annihilator short-circuits)
-    /// - Nested `And` nodes are flattened
+    /// - Nested `And` nodes are recursively flattened
     /// - Empty input → `AlwaysTrue`
     /// - Single element → unwrapped
     pub fn and(predicates: Vec<Predicate>) -> Predicate {
         let mut flat = Vec::with_capacity(predicates.len());
-        for p in predicates {
-            match p {
-                Predicate::AlwaysTrue => {}
-                Predicate::AlwaysFalse => return Predicate::AlwaysFalse,
-                Predicate::And(children) => flat.extend(children),
-                other => flat.push(other),
-            }
-        }
+        Self::flatten_and(predicates, &mut flat);
         match flat.len() {
             0 => Predicate::AlwaysTrue,
             1 => flat.into_iter().next().unwrap(),
@@ -286,23 +298,36 @@ impl Predicate {
         }
     }
 
-    /// Combine predicates with OR, with flattening and constant absorption.
+    /// Recursively collect non-And children, absorbing constants.
+    fn flatten_and(predicates: Vec<Predicate>, out: &mut Vec<Predicate>) {
+        for p in predicates {
+            match p {
+                Predicate::AlwaysTrue => {}
+                Predicate::AlwaysFalse => {
+                    out.clear();
+                    out.push(Predicate::AlwaysFalse);
+                    return;
+                }
+                Predicate::And(children) => Self::flatten_and(children, out),
+                other => out.push(other),
+            }
+            // Check if a nested flatten hit AlwaysFalse
+            if out.first() == Some(&Predicate::AlwaysFalse) {
+                return;
+            }
+        }
+    }
+
+    /// Combine predicates with OR, with recursive flattening and constant 
absorption.
     ///
     /// - `OR(p, AlwaysFalse)` → `p` (identity element filtered out)
     /// - `OR(p, AlwaysTrue)` → `AlwaysTrue` (annihilator short-circuits)
-    /// - Nested `Or` nodes are flattened
+    /// - Nested `Or` nodes are recursively flattened
     /// - Empty input → `AlwaysFalse`
     /// - Single element → unwrapped
     pub fn or(predicates: Vec<Predicate>) -> Predicate {
         let mut flat = Vec::with_capacity(predicates.len());
-        for p in predicates {
-            match p {
-                Predicate::AlwaysFalse => {}
-                Predicate::AlwaysTrue => return Predicate::AlwaysTrue,
-                Predicate::Or(children) => flat.extend(children),
-                other => flat.push(other),
-            }
-        }
+        Self::flatten_or(predicates, &mut flat);
         match flat.len() {
             0 => Predicate::AlwaysFalse,
             1 => flat.into_iter().next().unwrap(),
@@ -310,6 +335,25 @@ impl Predicate {
         }
     }
 
+    /// Recursively collect non-Or children, absorbing constants.
+    fn flatten_or(predicates: Vec<Predicate>, out: &mut Vec<Predicate>) {
+        for p in predicates {
+            match p {
+                Predicate::AlwaysFalse => {}
+                Predicate::AlwaysTrue => {
+                    out.clear();
+                    out.push(Predicate::AlwaysTrue);
+                    return;
+                }
+                Predicate::Or(children) => Self::flatten_or(children, out),
+                other => out.push(other),
+            }
+            if out.first() == Some(&Predicate::AlwaysTrue) {
+                return;
+            }
+        }
+    }
+
     /// Negate a predicate with simplification.
     ///
     /// - `NOT(NOT(p))` → `p` (double negation elimination)
@@ -323,6 +367,70 @@ impl Predicate {
             other => Predicate::Not(Box::new(other)),
         }
     }
+
+    /// Split a predicate at AND boundaries into conjuncts (recursive).
+    ///
+    /// Unlike a simple one-level unwrap, this recursively flattens nested
+    /// `And` nodes — necessary because `Predicate` is a public enum and
+    /// callers may construct `And(vec![And(...), ...])` directly without
+    /// going through `Predicate::and()` which auto-flattens.
+    ///
+    /// Reference: Java `PredicateBuilder.splitAnd` which recursively
+    /// splits `CompoundPredicate(And, children)`.
+    pub(crate) fn split_and(self) -> Vec<Predicate> {
+        match self {
+            Predicate::And(children) => children.into_iter().flat_map(|c| 
c.split_and()).collect(),
+            other => vec![other],
+        }
+    }
+
+    /// Remap leaf field indices from table schema space to partition row 
space.
+    ///
+    /// Returns `Some(remapped)` if *all* leaf nodes in this subtree reference
+    /// partition columns; `None` otherwise. This guarantees safety under 
NOT/OR:
+    /// a mixed predicate is never partially remapped.
+    ///
+    /// `mapping` is the output of `field_idx_to_partition_idx`.
+    pub(crate) fn remap_field_index(self, mapping: &[Option<usize>]) -> 
Option<Predicate> {
+        match self {
+            Predicate::Leaf {
+                column,
+                index,
+                data_type,
+                op,
+                literals,
+            } => {
+                let new_index = (*mapping.get(index)?)?;
+                Some(Predicate::Leaf {
+                    column,
+                    index: new_index,
+                    data_type,
+                    op,
+                    literals,
+                })
+            }
+            Predicate::And(children) => {
+                let remapped: Option<Vec<_>> = children
+                    .into_iter()
+                    .map(|c| c.remap_field_index(mapping))
+                    .collect();
+                Some(Predicate::and(remapped?))
+            }
+            Predicate::Or(children) => {
+                let remapped: Option<Vec<_>> = children
+                    .into_iter()
+                    .map(|c| c.remap_field_index(mapping))
+                    .collect();
+                Some(Predicate::or(remapped?))
+            }
+            Predicate::Not(inner) => {
+                let remapped = inner.remap_field_index(mapping)?;
+                Some(Predicate::negate(remapped))
+            }
+            Predicate::AlwaysTrue => Some(Predicate::AlwaysTrue),
+            Predicate::AlwaysFalse => Some(Predicate::AlwaysFalse),
+        }
+    }
 }
 
 impl fmt::Display for Predicate {
@@ -591,6 +699,145 @@ pub fn field_idx_to_partition_idx(
         .collect()
 }
 
+// ---------------------------------------------------------------------------
+// extract_datum
+// ---------------------------------------------------------------------------
+
+/// Extract a typed `Datum` from a `BinaryRow` field based on `DataType`.
+///
+/// Returns `Ok(None)` if the field is null, `Ok(Some(datum))` on success,
+/// or `Err` if the binary data is malformed.
+pub(crate) fn extract_datum(
+    row: &BinaryRow,
+    pos: usize,
+    data_type: &DataType,
+) -> Result<Option<Datum>> {
+    if row.is_null_at(pos) {
+        return Ok(None);
+    }
+    let datum = match data_type {
+        DataType::Boolean(_) => Datum::Bool(row.get_boolean(pos)?),
+        DataType::TinyInt(_) => Datum::TinyInt(row.get_byte(pos)?),
+        DataType::SmallInt(_) => Datum::SmallInt(row.get_short(pos)?),
+        DataType::Int(_) => Datum::Int(row.get_int(pos)?),
+        DataType::BigInt(_) => Datum::Long(row.get_long(pos)?),
+        DataType::Float(_) => Datum::Float(row.get_float(pos)?),
+        DataType::Double(_) => Datum::Double(row.get_double(pos)?),
+        DataType::Char(_) | DataType::VarChar(_) => 
Datum::String(row.get_string(pos)?.to_string()),
+        DataType::Date(_) => Datum::Date(row.get_int(pos)?),
+        DataType::Time(_) => Datum::Time(row.get_int(pos)?),
+        DataType::Timestamp(ts) => {
+            let (millis, nanos) = row.get_timestamp_raw(pos, ts.precision())?;
+            Datum::Timestamp { millis, nanos }
+        }
+        DataType::LocalZonedTimestamp(ts) => {
+            let (millis, nanos) = row.get_timestamp_raw(pos, ts.precision())?;
+            Datum::LocalZonedTimestamp { millis, nanos }
+        }
+        DataType::Decimal(dec) => {
+            let precision = dec.precision();
+            let scale = dec.scale();
+            let unscaled = row.get_decimal_unscaled(pos, precision)?;
+            Datum::Decimal {
+                unscaled,
+                precision,
+                scale,
+            }
+        }
+        DataType::Binary(_) | DataType::VarBinary(_) => 
Datum::Bytes(row.get_binary(pos)?.to_vec()),
+        other => {
+            return Err(Error::Unsupported {
+                message: format!("extract_datum: unsupported DataType 
{other:?}"),
+            });
+        }
+    };
+    Ok(Some(datum))
+}
+
+// ---------------------------------------------------------------------------
+// eval_row
+// ---------------------------------------------------------------------------
+
+/// Evaluate a predicate tree against a `BinaryRow`.
+///
+/// Each `Leaf` carries its own `data_type` (preserved through 
`remap_field_index`),
+/// so no external type list is needed.
+///
+/// SQL null semantics: null compared to any value yields `false`.
+pub(crate) fn eval_row(predicate: &Predicate, row: &BinaryRow) -> Result<bool> 
{
+    match predicate {
+        Predicate::AlwaysTrue => Ok(true),
+        Predicate::AlwaysFalse => Ok(false),
+        Predicate::And(children) => {
+            for child in children {
+                if !eval_row(child, row)? {
+                    return Ok(false);
+                }
+            }
+            Ok(true)
+        }
+        Predicate::Or(children) => {
+            for child in children {
+                if eval_row(child, row)? {
+                    return Ok(true);
+                }
+            }
+            Ok(false)
+        }
+        Predicate::Not(inner) => Ok(!eval_row(inner, row)?),
+        Predicate::Leaf {
+            index,
+            data_type,
+            op,
+            literals,
+            ..
+        } => {
+            let datum = extract_datum(row, *index, data_type)?;
+            Ok(eval_leaf(*op, datum.as_ref(), literals))
+        }
+    }
+}
+
+/// Evaluate a single leaf predicate.
+///
+/// This function is infallible: all type decoding happens in `extract_datum`
+/// before this point, and the operator match is exhaustive.
+fn eval_leaf(op: PredicateOperator, datum: Option<&Datum>, literals: &[Datum]) 
-> bool {
+    match op {
+        PredicateOperator::IsNull => datum.is_none(),
+        PredicateOperator::IsNotNull => datum.is_some(),
+        _ => {
+            // SQL null semantics: NULL op value → false
+            let val = match datum {
+                Some(v) => v,
+                None => return false,
+            };
+            match op {
+                PredicateOperator::Eq => literals.first().is_some_and(|lit| 
datum_eq(val, lit)),
+                PredicateOperator::NotEq => literals.first().is_some_and(|lit| 
!datum_eq(val, lit)),
+                PredicateOperator::Lt => {
+                    literals.first().and_then(|lit| datum_cmp(val, lit)) == 
Some(Ordering::Less)
+                }
+                PredicateOperator::LtEq => matches!(
+                    literals.first().and_then(|lit| datum_cmp(val, lit)),
+                    Some(Ordering::Less | Ordering::Equal)
+                ),
+                PredicateOperator::Gt => {
+                    literals.first().and_then(|lit| datum_cmp(val, lit)) == 
Some(Ordering::Greater)
+                }
+                PredicateOperator::GtEq => matches!(
+                    literals.first().and_then(|lit| datum_cmp(val, lit)),
+                    Some(Ordering::Greater | Ordering::Equal)
+                ),
+                PredicateOperator::In => literals.iter().any(|lit| 
datum_eq(val, lit)),
+                PredicateOperator::NotIn => !literals.iter().any(|lit| 
datum_eq(val, lit)),
+                // IsNull/IsNotNull are handled in the outer match above.
+                PredicateOperator::IsNull | PredicateOperator::IsNotNull => 
unreachable!(),
+            }
+        }
+    }
+}
+
 // ---------------------------------------------------------------------------
 // Tests
 // ---------------------------------------------------------------------------
@@ -775,6 +1022,34 @@ mod tests {
         }
     }
 
+    #[test]
+    fn test_and_flattens_deep_nesting() {
+        let pb = PredicateBuilder::new(&test_fields());
+        let p1 = pb.equal("id", Datum::Int(1)).unwrap();
+        let p2 = pb.equal("id", Datum::Int(2)).unwrap();
+        let p3 = pb.equal("id", Datum::Int(3)).unwrap();
+        let p4 = pb.equal("id", Datum::Int(4)).unwrap();
+
+        // Directly construct nested And via enum (bypassing Predicate::and 
flatten).
+        let deep = Predicate::And(vec![Predicate::And(vec![
+            Predicate::And(vec![p1.clone(), p2.clone()]),
+            p3.clone(),
+        ])]);
+        // Now flatten through Predicate::and.
+        let flat = Predicate::and(vec![deep, p4.clone()]);
+
+        match &flat {
+            Predicate::And(children) => {
+                assert_eq!(children.len(), 4);
+                assert_eq!(children[0], p1);
+                assert_eq!(children[1], p2);
+                assert_eq!(children[2], p3);
+                assert_eq!(children[3], p4);
+            }
+            other => panic!("expected And with 4 children, got {other:?}"),
+        }
+    }
+
     #[test]
     fn test_or_empty() {
         assert_eq!(Predicate::or(vec![]), Predicate::AlwaysFalse);
@@ -806,6 +1081,29 @@ mod tests {
         }
     }
 
+    #[test]
+    fn test_or_flattens_deep_nesting() {
+        let pb = PredicateBuilder::new(&test_fields());
+        let p1 = pb.equal("id", Datum::Int(1)).unwrap();
+        let p2 = pb.equal("id", Datum::Int(2)).unwrap();
+        let p3 = pb.equal("id", Datum::Int(3)).unwrap();
+        let p4 = pb.equal("id", Datum::Int(4)).unwrap();
+
+        // Directly construct nested Or via enum (bypassing Predicate::or 
flatten).
+        let deep = Predicate::Or(vec![Predicate::Or(vec![
+            Predicate::Or(vec![p1.clone(), p2.clone()]),
+            p3.clone(),
+        ])]);
+        let flat = Predicate::or(vec![deep, p4.clone()]);
+
+        match &flat {
+            Predicate::Or(children) => {
+                assert_eq!(children.len(), 4);
+            }
+            other => panic!("expected Or with 4 children, got {other:?}"),
+        }
+    }
+
     #[test]
     fn test_not() {
         let pb = PredicateBuilder::new(&test_fields());
@@ -917,7 +1215,7 @@ mod tests {
         assert!(result.is_err());
     }
 
-    // ======================== Empty IN handling ========================
+    // ======================== Empty IN / NOT IN handling 
========================
 
     #[test]
     fn test_in_empty_returns_always_false() {
@@ -1055,4 +1353,364 @@ mod tests {
         };
         assert_eq!(a, b);
     }
+
+    // ======================== PartialOrd ========================
+
+    #[test]
+    fn test_datum_partial_ord_int() {
+        assert!(Datum::Int(1) < Datum::Int(2));
+        assert!(Datum::Int(2) > Datum::Int(1));
+        assert!(Datum::Int(1) <= Datum::Int(1));
+        assert!(Datum::Int(1) >= Datum::Int(1));
+    }
+
+    #[test]
+    fn test_datum_partial_ord_string() {
+        assert!(Datum::String("a".into()) < Datum::String("b".into()));
+        assert!(Datum::String("b".into()) > Datum::String("a".into()));
+    }
+
+    #[test]
+    fn test_datum_partial_ord_decimal_cross_scale() {
+        // 10 / 10^1 = 1.0 < 200 / 10^2 = 2.0
+        let a = Datum::Decimal {
+            unscaled: 10,
+            precision: 10,
+            scale: 1,
+        };
+        let b = Datum::Decimal {
+            unscaled: 200,
+            precision: 10,
+            scale: 2,
+        };
+        assert!(a < b);
+    }
+
+    #[test]
+    fn test_datum_partial_ord_bytes_matches_java_signed_byte_order() {
+        assert!(Datum::Bytes(vec![0xFF]) < Datum::Bytes(vec![0x00]));
+    }
+
+    #[test]
+    fn test_datum_partial_ord_cross_variant_is_none() {
+        assert_eq!(Datum::Int(1).partial_cmp(&Datum::Long(1)), None);
+    }
+
+    // ======================== eval_row ========================
+
+    /// Minimal BinaryRow builder for predicate evaluation tests.
+    struct TestBinaryRowBuilder {
+        arity: i32,
+        null_bits_size: usize,
+        data: Vec<u8>,
+    }
+
+    impl TestBinaryRowBuilder {
+        fn new(arity: i32) -> Self {
+            let null_bits_size = BinaryRow::cal_bit_set_width_in_bytes(arity) 
as usize;
+            let fixed_part_size = null_bits_size + (arity as usize) * 8;
+            Self {
+                arity,
+                null_bits_size,
+                data: vec![0u8; fixed_part_size],
+            }
+        }
+
+        fn field_offset(&self, pos: usize) -> usize {
+            self.null_bits_size + pos * 8
+        }
+
+        fn set_null_at(&mut self, pos: usize) {
+            let bit_index = pos + BinaryRow::HEADER_SIZE_IN_BYTES as usize;
+            let byte_index = bit_index / 8;
+            let bit_offset = bit_index % 8;
+            self.data[byte_index] |= 1 << bit_offset;
+            let offset = self.field_offset(pos);
+            self.data[offset..offset + 8].fill(0);
+        }
+
+        fn write_int(&mut self, pos: usize, value: i32) {
+            let offset = self.field_offset(pos);
+            self.data[offset..offset + 
4].copy_from_slice(&value.to_le_bytes());
+        }
+
+        fn build(self) -> BinaryRow {
+            BinaryRow::from_bytes(self.arity, self.data)
+        }
+    }
+
+    fn make_leaf(col: &str, idx: usize, op: PredicateOperator, literals: 
Vec<Datum>) -> Predicate {
+        Predicate::Leaf {
+            column: col.into(),
+            index: idx,
+            data_type: DataType::Int(IntType::new()),
+            op,
+            literals,
+        }
+    }
+
+    #[test]
+    fn test_eval_leaf_operators() {
+        // row: [x=10]
+        let mut b = TestBinaryRowBuilder::new(1);
+        b.write_int(0, 10);
+        let row = b.build();
+
+        // Eq
+        assert!(eval_row(
+            &make_leaf("x", 0, PredicateOperator::Eq, vec![Datum::Int(10)]),
+            &row
+        )
+        .unwrap());
+        assert!(!eval_row(
+            &make_leaf("x", 0, PredicateOperator::Eq, vec![Datum::Int(99)]),
+            &row
+        )
+        .unwrap());
+        // NotEq
+        assert!(eval_row(
+            &make_leaf("x", 0, PredicateOperator::NotEq, vec![Datum::Int(99)]),
+            &row
+        )
+        .unwrap());
+        // Lt / LtEq / Gt / GtEq
+        assert!(eval_row(
+            &make_leaf("x", 0, PredicateOperator::Lt, vec![Datum::Int(20)]),
+            &row
+        )
+        .unwrap());
+        assert!(!eval_row(
+            &make_leaf("x", 0, PredicateOperator::Gt, vec![Datum::Int(20)]),
+            &row
+        )
+        .unwrap());
+        assert!(eval_row(
+            &make_leaf("x", 0, PredicateOperator::LtEq, vec![Datum::Int(10)]),
+            &row
+        )
+        .unwrap());
+        assert!(eval_row(
+            &make_leaf("x", 0, PredicateOperator::GtEq, vec![Datum::Int(10)]),
+            &row
+        )
+        .unwrap());
+        // In / NotIn
+        assert!(eval_row(
+            &make_leaf(
+                "x",
+                0,
+                PredicateOperator::In,
+                vec![Datum::Int(1), Datum::Int(10)]
+            ),
+            &row
+        )
+        .unwrap());
+        assert!(!eval_row(
+            &make_leaf(
+                "x",
+                0,
+                PredicateOperator::In,
+                vec![Datum::Int(1), Datum::Int(2)]
+            ),
+            &row
+        )
+        .unwrap());
+        // NotIn: 10 not in {1, 2} → true; 10 not in {10, 20} → false
+        assert!(eval_row(
+            &make_leaf(
+                "x",
+                0,
+                PredicateOperator::NotIn,
+                vec![Datum::Int(1), Datum::Int(2)]
+            ),
+            &row
+        )
+        .unwrap());
+        assert!(!eval_row(
+            &make_leaf(
+                "x",
+                0,
+                PredicateOperator::NotIn,
+                vec![Datum::Int(10), Datum::Int(20)]
+            ),
+            &row
+        )
+        .unwrap());
+    }
+
+    #[test]
+    fn test_eval_null_semantics() {
+        let mut b = TestBinaryRowBuilder::new(1);
+        b.set_null_at(0);
+        let row = b.build();
+
+        // NULL compared to any value → false (SQL null semantics)
+        assert!(!eval_row(
+            &make_leaf("x", 0, PredicateOperator::Eq, vec![Datum::Int(42)]),
+            &row
+        )
+        .unwrap());
+        // IsNull / IsNotNull
+        assert!(eval_row(&make_leaf("x", 0, PredicateOperator::IsNull, 
vec![]), &row).unwrap());
+        assert!(!eval_row(
+            &make_leaf("x", 0, PredicateOperator::IsNotNull, vec![]),
+            &row
+        )
+        .unwrap());
+    }
+
+    #[test]
+    fn test_eval_compound_and_constants() {
+        let mut b = TestBinaryRowBuilder::new(2);
+        b.write_int(0, 10);
+        b.write_int(1, 20);
+        let row = b.build();
+
+        let p_true = make_leaf("a", 0, PredicateOperator::Eq, 
vec![Datum::Int(10)]);
+        let p_false = make_leaf("b", 1, PredicateOperator::Eq, 
vec![Datum::Int(99)]);
+
+        assert!(!eval_row(&Predicate::and(vec![p_true.clone(), 
p_false.clone()]), &row).unwrap());
+        assert!(eval_row(&Predicate::or(vec![p_true.clone(), 
p_false.clone()]), &row).unwrap());
+        assert!(!eval_row(&Predicate::negate(p_true), &row).unwrap());
+
+        // Constants
+        let empty_row = TestBinaryRowBuilder::new(0).build();
+        assert!(eval_row(&Predicate::AlwaysTrue, &empty_row).unwrap());
+        assert!(!eval_row(&Predicate::AlwaysFalse, &empty_row).unwrap());
+    }
+
+    // ======================== split_and ========================
+
+    #[test]
+    fn test_split_and() {
+        let pb = PredicateBuilder::new(&test_fields());
+        let p1 = pb.equal("id", Datum::Int(1)).unwrap();
+        let p2 = pb.equal("dt", Datum::Date(19723)).unwrap();
+
+        // AND → children
+        let parts = Predicate::and(vec![p1.clone(), p2.clone()]).split_and();
+        assert_eq!(parts, vec![p1.clone(), p2]);
+        // Non-AND → single-element vec
+        assert_eq!(p1.clone().split_and(), vec![p1]);
+    }
+
+    #[test]
+    fn test_split_and_recursive_nested() {
+        let pb = PredicateBuilder::new(&test_fields());
+        let p1 = pb.equal("id", Datum::Int(1)).unwrap();
+        let p2 = pb.equal("dt", Datum::Date(19723)).unwrap();
+        let p3 = pb.equal("hr", Datum::Int(10)).unwrap();
+
+        // Manually construct nested And (bypassing Predicate::and which 
flattens).
+        // And(And(p1, p2), p3) should still flatten to [p1, p2, p3].
+        let inner = Predicate::And(vec![p1.clone(), p2.clone()]);
+        let outer = Predicate::And(vec![inner, p3.clone()]);
+        let parts = outer.split_and();
+        assert_eq!(parts, vec![p1, p2, p3]);
+    }
+
+    // ======================== remap_field_index ========================
+
+    #[test]
+    fn test_remap_pure_partition_leaf() {
+        let pb = PredicateBuilder::new(&test_fields()); // [id(0), name(1), 
dt(2), hr(3)]
+        let p = pb.equal("dt", Datum::Date(19723)).unwrap(); // index=2
+        let mapping = vec![None, None, Some(0), Some(1)]; // dt→0, hr→1
+
+        let remapped = p.remap_field_index(&mapping).unwrap();
+        match &remapped {
+            Predicate::Leaf { index, column, .. } => {
+                assert_eq!(column, "dt");
+                assert_eq!(*index, 0); // remapped to partition index
+            }
+            other => panic!("expected Leaf, got {other:?}"),
+        }
+    }
+
+    #[test]
+    fn test_remap_non_partition_leaf_returns_none() {
+        let pb = PredicateBuilder::new(&test_fields());
+        let p = pb.equal("id", Datum::Int(1)).unwrap(); // index=0, not a 
partition key
+        let mapping = vec![None, None, Some(0), Some(1)];
+
+        assert!(p.remap_field_index(&mapping).is_none());
+    }
+
+    #[test]
+    fn test_remap_and_all_partition() {
+        let pb = PredicateBuilder::new(&test_fields());
+        let p1 = pb.equal("dt", Datum::Date(19723)).unwrap();
+        let p2 = pb.equal("hr", Datum::Int(10)).unwrap();
+        let combined = Predicate::and(vec![p1, p2]);
+        let mapping = vec![None, None, Some(0), Some(1)];
+
+        let remapped = combined.remap_field_index(&mapping).unwrap();
+        match &remapped {
+            Predicate::And(children) => {
+                assert_eq!(children.len(), 2);
+            }
+            other => panic!("expected And, got {other:?}"),
+        }
+    }
+
+    #[test]
+    fn test_remap_or_with_mixed_returns_none() {
+        let pb = PredicateBuilder::new(&test_fields());
+        let p_partition = pb.equal("dt", Datum::Date(19723)).unwrap();
+        let p_data = pb.equal("id", Datum::Int(1)).unwrap();
+        let combined = Predicate::or(vec![p_partition, p_data]);
+        let mapping = vec![None, None, Some(0), Some(1)];
+
+        // OR with mixed columns → cannot safely extract → None
+        assert!(combined.remap_field_index(&mapping).is_none());
+    }
+
+    /// Regression test: `eval_row` must propagate errors from `extract_datum`
+    /// as `Err` (fail-fast), not swallow them into `Ok(true)` (fail-open).
+    ///
+    /// This guards the invariant at `table_scan.rs` partition pruning where
+    /// `eval_row(pred, &row)?` was intentionally changed from fail-open to
+    /// fail-fast.  An unsupported DataType in a leaf triggers `Err` from
+    /// `extract_datum`; we verify it surfaces through `eval_row`.
+    #[test]
+    fn test_eval_row_propagates_extract_error() {
+        let mut b = TestBinaryRowBuilder::new(1);
+        b.write_int(0, 42);
+        let row = b.build();
+
+        // Leaf with unsupported DataType → extract_datum returns Err.
+        let unsupported_leaf = Predicate::Leaf {
+            column: "arr".into(),
+            index: 0,
+            data_type: 
DataType::Array(ArrayType::new(DataType::Int(IntType::new()))),
+            op: PredicateOperator::Eq,
+            literals: vec![Datum::Int(42)],
+        };
+
+        // Must be Err, not Ok(true).
+        assert!(eval_row(&unsupported_leaf, &row).is_err());
+
+        // Also verify error propagates through compound predicates 
(And/Or/Not).
+        let and_pred = Predicate::And(vec![Predicate::AlwaysTrue, 
unsupported_leaf.clone()]);
+        assert!(eval_row(&and_pred, &row).is_err());
+
+        let or_pred = Predicate::Or(vec![Predicate::AlwaysFalse, 
unsupported_leaf.clone()]);
+        assert!(eval_row(&or_pred, &row).is_err());
+
+        let not_pred = Predicate::Not(Box::new(unsupported_leaf));
+        assert!(eval_row(&not_pred, &row).is_err());
+    }
+
+    #[test]
+    fn test_remap_not_with_mixed_returns_none() {
+        let pb = PredicateBuilder::new(&test_fields());
+        let p_partition = pb.equal("dt", Datum::Date(19723)).unwrap();
+        let p_data = pb.greater_than("id", Datum::Int(10)).unwrap();
+        let inner = Predicate::and(vec![p_partition, p_data]);
+        let negated = Predicate::negate(inner);
+        let mapping = vec![None, None, Some(0), Some(1)];
+
+        // NOT(partition AND data) → mixed under NOT → None
+        assert!(negated.remap_field_index(&mapping).is_none());
+    }
 }
diff --git a/crates/paimon/src/table/read_builder.rs 
b/crates/paimon/src/table/read_builder.rs
index 69e1674..e0aae28 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -22,7 +22,7 @@
 
 use super::{ArrowRecordBatchStream, Table, TableScan};
 use crate::arrow::ArrowReaderBuilder;
-use crate::spec::{CoreOptions, DataField};
+use crate::spec::{CoreOptions, DataField, Predicate};
 use crate::Result;
 use crate::{DataSplit, Error};
 use std::collections::{HashMap, HashSet};
@@ -35,6 +35,7 @@ use std::collections::{HashMap, HashSet};
 pub struct ReadBuilder<'a> {
     table: &'a Table,
     projected_fields: Option<Vec<String>>,
+    filter: Option<Predicate>,
 }
 
 impl<'a> ReadBuilder<'a> {
@@ -42,6 +43,7 @@ impl<'a> ReadBuilder<'a> {
         Self {
             table,
             projected_fields: None,
+            filter: None,
         }
     }
 
@@ -53,9 +55,26 @@ impl<'a> ReadBuilder<'a> {
         self
     }
 
+    /// Set a filter predicate for scan planning.
+    ///
+    /// The predicate should use table schema field indices (as produced by
+    /// [`PredicateBuilder`]).  During [`TableScan::plan`] the filter is
+    /// decomposed at AND boundaries: partition-only conjuncts are extracted
+    /// and used to prune partitions; all other conjuncts are currently
+    /// **ignored** (neither `TableScan` nor `TableRead` applies data-level
+    /// predicates yet).
+    ///
+    /// This means rows returned by `TableRead` may **not** satisfy the full
+    /// filter — callers must apply remaining predicates themselves until
+    /// data-level pushdown is implemented.
+    pub fn with_filter(&mut self, filter: Predicate) -> &mut Self {
+        self.filter = Some(filter);
+        self
+    }
+
     /// Create a table scan. Call [TableScan::plan] to get splits.
     pub fn new_scan(&self) -> TableScan<'a> {
-        TableScan::new(self.table)
+        TableScan::new(self.table, self.filter.clone())
     }
 
     /// Create a table read for consuming splits (e.g. from a scan plan).
diff --git a/crates/paimon/src/table/table_scan.rs 
b/crates/paimon/src/table/table_scan.rs
index 1c3454a..031d15f 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -23,7 +23,8 @@
 use super::Table;
 use crate::io::FileIO;
 use crate::spec::{
-    BinaryRow, CoreOptions, FileKind, IndexManifest, ManifestEntry, 
PartitionComputer, Snapshot,
+    eval_row, field_idx_to_partition_idx, BinaryRow, CoreOptions, FileKind, 
IndexManifest,
+    ManifestEntry, PartitionComputer, Predicate, Snapshot,
 };
 use crate::table::bin_pack::split_for_batch;
 use crate::table::source::{DataSplitBuilder, DeletionFile, PartitionBucket, 
Plan};
@@ -151,17 +152,32 @@ fn merge_manifest_entries(entries: Vec<ManifestEntry>) -> 
Vec<ManifestEntry> {
         .collect()
 }
 
+/// Evaluate a partition predicate against serialized manifest partition bytes.
+///
+/// - `BinaryRow::from_serialized_bytes` failure → fail-open (`Ok(true)`)
+/// - `eval_row` failure → fail-fast (`Err(_)`)
+fn partition_matches_predicate(
+    serialized_partition: &[u8],
+    predicate: &Predicate,
+) -> crate::Result<bool> {
+    match BinaryRow::from_serialized_bytes(serialized_partition) {
+        Ok(row) => eval_row(predicate, &row),
+        Err(_) => Ok(true),
+    }
+}
+
 /// TableScan for full table scan (no incremental, no predicate).
 ///
 /// Reference: 
[pypaimon.read.table_scan.TableScan](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/table_scan.py)
 #[derive(Debug, Clone)]
 pub struct TableScan<'a> {
     table: &'a Table,
+    filter: Option<Predicate>,
 }
 
 impl<'a> TableScan<'a> {
-    pub fn new(table: &'a Table) -> Self {
-        Self { table }
+    pub fn new(table: &'a Table, filter: Option<Predicate>) -> Self {
+        Self { table, filter }
     }
 
     /// Plan the full scan: read latest snapshot, manifest list, manifest 
entries, then build DataSplits using bin packing.
@@ -191,6 +207,58 @@ impl<'a> TableScan<'a> {
             return Ok(Plan::new(Vec::new()));
         }
 
+        // --- Partition predicate extraction ---
+        let partition_keys = self.table.schema().partition_keys();
+        let partition_predicate = if !partition_keys.is_empty() {
+            self.filter.clone().and_then(|filter| {
+                let mapping =
+                    field_idx_to_partition_idx(self.table.schema().fields(), 
partition_keys);
+                let conjuncts = filter.split_and();
+                let remapped: Vec<Predicate> = conjuncts
+                    .into_iter()
+                    .filter_map(|c| c.remap_field_index(&mapping))
+                    .collect();
+                if remapped.is_empty() {
+                    None
+                } else {
+                    Some(Predicate::and(remapped))
+                }
+            })
+        } else {
+            None
+        };
+
+        // --- Partition pruning: filter manifest entries before grouping ---
+        //
+        // Note: split construction later still requires a decodable BinaryRow
+        // and will fail on corrupt partition bytes. Pruning is intentionally
+        // best-effort; split construction is mandatory.
+        let entries = if let Some(ref pred) = partition_predicate {
+            let mut kept = Vec::with_capacity(entries.len());
+            // Cache: partition bytes → accept/reject to avoid re-decoding.
+            let mut cache: HashMap<Vec<u8>, bool> = HashMap::new();
+            for e in entries {
+                let accept = match cache.get(e.partition()) {
+                    Some(&cached) => cached,
+                    None => {
+                        let partition_bytes = e.partition();
+                        let accept = 
partition_matches_predicate(partition_bytes, pred)?;
+                        cache.insert(partition_bytes.to_vec(), accept);
+                        accept
+                    }
+                };
+                if accept {
+                    kept.push(e);
+                }
+            }
+            kept
+        } else {
+            entries
+        };
+        if entries.is_empty() {
+            return Ok(Plan::new(Vec::new()));
+        }
+
         // Group by (partition, bucket). Key = (partition_bytes, bucket).
         let mut groups: HashMap<(Vec<u8>, i32), Vec<ManifestEntry>> = 
HashMap::new();
         for e in entries {
@@ -202,7 +270,6 @@ impl<'a> TableScan<'a> {
         let base_path = table_path.trim_end_matches('/');
         let mut splits = Vec::new();
 
-        let partition_keys = self.table.schema().partition_keys();
         let partition_computer = if !partition_keys.is_empty() {
             Some(PartitionComputer::new(
                 partition_keys,
@@ -279,3 +346,90 @@ impl<'a> TableScan<'a> {
         Ok(Plan::new(splits))
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::partition_matches_predicate;
+    use crate::spec::{
+        ArrayType, DataField, DataType, Datum, IntType, Predicate, 
PredicateBuilder,
+        PredicateOperator, VarCharType,
+    };
+    use crate::Error;
+
+    struct SerializedBinaryRowBuilder {
+        arity: i32,
+        null_bits_size: usize,
+        data: Vec<u8>,
+    }
+
+    impl SerializedBinaryRowBuilder {
+        fn new(arity: i32) -> Self {
+            let null_bits_size = 
crate::spec::BinaryRow::cal_bit_set_width_in_bytes(arity) as usize;
+            let fixed_part_size = null_bits_size + (arity as usize) * 8;
+            Self {
+                arity,
+                null_bits_size,
+                data: vec![0u8; fixed_part_size],
+            }
+        }
+
+        fn field_offset(&self, pos: usize) -> usize {
+            self.null_bits_size + pos * 8
+        }
+
+        fn write_string(&mut self, pos: usize, value: &str) {
+            let var_offset = self.data.len();
+            self.data.extend_from_slice(value.as_bytes());
+            let encoded = ((var_offset as u64) << 32) | (value.len() as u64);
+            let offset = self.field_offset(pos);
+            self.data[offset..offset + 
8].copy_from_slice(&encoded.to_le_bytes());
+        }
+
+        fn build_serialized(self) -> Vec<u8> {
+            let mut serialized = Vec::with_capacity(4 + self.data.len());
+            serialized.extend_from_slice(&self.arity.to_be_bytes());
+            serialized.extend_from_slice(&self.data);
+            serialized
+        }
+    }
+
+    fn partition_string_field() -> Vec<DataField> {
+        vec![DataField::new(
+            0,
+            "dt".to_string(),
+            DataType::VarChar(VarCharType::default()),
+        )]
+    }
+
+    #[test]
+    fn test_partition_matches_predicate_decode_failure_fails_open() {
+        let predicate = PredicateBuilder::new(&partition_string_field())
+            .equal("dt", Datum::String("2024-01-01".into()))
+            .unwrap();
+
+        assert!(partition_matches_predicate(&[0xFF, 0x00], 
&predicate).unwrap());
+    }
+
+    #[test]
+    fn test_partition_matches_predicate_eval_error_fails_fast() {
+        let mut builder = SerializedBinaryRowBuilder::new(1);
+        builder.write_string(0, "2024-01-01");
+        let serialized = builder.build_serialized();
+
+        let predicate = Predicate::Leaf {
+            column: "dt".into(),
+            index: 0,
+            data_type: 
DataType::Array(ArrayType::new(DataType::Int(IntType::new()))),
+            op: PredicateOperator::Eq,
+            literals: vec![Datum::Int(42)],
+        };
+
+        let err = partition_matches_predicate(&serialized, &predicate)
+            .expect_err("eval_row error should propagate");
+
+        assert!(
+            matches!(&err, Error::Unsupported { message } if 
message.contains("extract_datum")),
+            "Expected extract_datum unsupported error, got: {err:?}"
+        );
+    }
+}

Reply via email to