This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 1f4aac6 feat(scan): support data-level stats pruning in TableScan
(#196)
1f4aac6 is described below
commit 1f4aac69a2a932c2d2a0821d4d5921d100898db9
Author: Zach <[email protected]>
AuthorDate: Sat Apr 4 10:43:16 2026 +0800
feat(scan): support data-level stats pruning in TableScan (#196)
---
crates/integration_tests/tests/read_tables.rs | 207 ++++++-
crates/paimon/src/spec/mod.rs | 1 +
crates/paimon/src/spec/predicate.rs | 184 +++++-
crates/paimon/src/table/read_builder.rs | 16 +-
crates/paimon/src/table/table_scan.rs | 860 +++++++++++++++++++++++---
5 files changed, 1142 insertions(+), 126 deletions(-)
diff --git a/crates/integration_tests/tests/read_tables.rs
b/crates/integration_tests/tests/read_tables.rs
index 2775978..2411a9c 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -543,7 +543,6 @@ async fn test_read_partitioned_table_with_filter() {
let catalog = create_file_system_catalog();
let table = get_table_from_catalog(&catalog,
"partitioned_log_table").await;
- // Build a filter: dt = '2024-01-01'
let schema = table.schema();
let pb = PredicateBuilder::new(schema.fields());
let filter = pb
@@ -577,7 +576,6 @@ async fn test_read_multi_partitioned_table_with_filter() {
let schema = table.schema();
let pb = PredicateBuilder::new(schema.fields());
- // Filter: dt = '2024-01-01' AND hr = 10
let filter = Predicate::and(vec![
pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
pb.equal("hr", Datum::Int(10)).unwrap(),
@@ -600,7 +598,7 @@ async fn test_read_multi_partitioned_table_with_filter() {
}
#[tokio::test]
-async fn
test_read_partitioned_table_data_only_filter_preserves_all_partitions() {
+async fn test_read_partitioned_table_data_only_filter_prunes_all_files() {
use paimon::spec::{Datum, PredicateBuilder};
let catalog = create_file_system_catalog();
@@ -608,8 +606,6 @@ async fn
test_read_partitioned_table_data_only_filter_preserves_all_partitions()
let schema = table.schema();
let pb = PredicateBuilder::new(schema.fields());
- // Data-only filter: id > 10 — should NOT prune any partitions,
- // and is still ignored at read level in Phase 2.
let filter = pb
.greater_than("id", Datum::Int(10))
.expect("Failed to build predicate");
@@ -618,24 +614,18 @@ async fn
test_read_partitioned_table_data_only_filter_preserves_all_partitions()
let seen_partitions = extract_plan_partitions(&plan);
assert_eq!(
seen_partitions,
- HashSet::from(["2024-01-01".into(), "2024-01-02".into()]),
- "Data-only filter should not prune any partitions"
+ HashSet::<String>::new(),
+ "Data-only filter should prune all files when stats prove no match"
);
let actual = extract_id_name(&batches);
assert_eq!(
actual,
- vec![
- (1, "alice".to_string()),
- (2, "bob".to_string()),
- (3, "carol".to_string()),
- ],
- "Data predicate is not applied at read level; all rows are still
returned"
+ Vec::<(i32, String)>::new(),
+ "No rows should be planned when stats prove the predicate is
unsatisfiable"
);
}
-/// Mixed AND: partition predicate prunes partitions, but data predicate is
-/// silently ignored — all rows from the matching partition are returned.
#[tokio::test]
async fn test_read_partitioned_table_mixed_and_filter() {
use paimon::spec::{Datum, Predicate, PredicateBuilder};
@@ -645,8 +635,6 @@ async fn test_read_partitioned_table_mixed_and_filter() {
let schema = table.schema();
let pb = PredicateBuilder::new(schema.fields());
- // dt = '2024-01-01' AND id > 10
- // Partition conjunct (dt) is applied; data conjunct (id) is NOT.
let filter = Predicate::and(vec![
pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
pb.greater_than("id", Datum::Int(10)).unwrap(),
@@ -656,20 +644,92 @@ async fn test_read_partitioned_table_mixed_and_filter() {
let seen_partitions = extract_plan_partitions(&plan);
assert_eq!(
seen_partitions,
- HashSet::from(["2024-01-01".into()]),
- "Only dt=2024-01-01 should survive"
+ HashSet::<String>::new(),
+ "The matching partition should also be pruned when file stats prove no
match"
);
let actual = extract_id_name(&batches);
assert_eq!(
actual,
- vec![(1, "alice".to_string()), (2, "bob".to_string())],
- "Data predicate (id > 10) is NOT applied — all rows from matching
partition returned"
+ Vec::<(i32, String)>::new(),
+ "No rows should remain after partition pruning and data stats pruning"
+ );
+}
+
+#[tokio::test]
+async fn
test_read_partitioned_table_data_only_filter_keeps_matching_partition() {
+ use paimon::spec::{Datum, PredicateBuilder};
+
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog,
"partitioned_log_table").await;
+ let schema = table.schema();
+ let pb = PredicateBuilder::new(schema.fields());
+
+ let filter = pb
+ .greater_than("id", Datum::Int(2))
+ .expect("Failed to build predicate");
+
+ let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+ let seen_partitions = extract_plan_partitions(&plan);
+ assert_eq!(
+ seen_partitions,
+ HashSet::from(["2024-01-02".into()]),
+ "Only files whose stats may satisfy the predicate should remain in the
plan"
+ );
+
+ let actual = extract_id_name(&batches);
+ assert_eq!(
+ actual,
+ vec![(3, "carol".to_string())],
+ "Only rows from files that survive stats pruning should be returned"
);
}
-/// Mixed OR: `dt = '...' OR id > 10` cannot be split into a pure partition
-/// predicate, so no partitions should be pruned.
+/// Java-style inclusive projection can still extract partition predicates from
+/// an OR of mixed AND branches.
+#[tokio::test]
+async fn
test_read_multi_partitioned_table_or_of_mixed_ands_prunes_partitions() {
+ use paimon::spec::{Datum, Predicate, PredicateBuilder};
+
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog,
"multi_partitioned_log_table").await;
+ let schema = table.schema();
+ let pb = PredicateBuilder::new(schema.fields());
+
+ let filter = Predicate::or(vec![
+ Predicate::and(vec![
+ pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
+ pb.equal("hr", Datum::Int(10)).unwrap(),
+ pb.greater_than("id", Datum::Int(10)).unwrap(),
+ ]),
+ Predicate::and(vec![
+ pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
+ pb.equal("hr", Datum::Int(20)).unwrap(),
+ ]),
+ ]);
+
+ let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+ let seen_partitions = extract_plan_multi_partitions(&plan);
+ assert_eq!(
+ seen_partitions,
+ HashSet::from([("2024-01-01".into(), 10), ("2024-01-01".into(), 20)]),
+ "Inclusive projection should prune the dt=2024-01-02 partition"
+ );
+
+ let actual = extract_id_name(&batches);
+ assert_eq!(
+ actual,
+ vec![
+ (1, "alice".to_string()),
+ (2, "bob".to_string()),
+ (3, "carol".to_string()),
+ ],
+ "All rows from the surviving partitions should be returned"
+ );
+}
+
+/// A directly mixed OR like `dt = '...' OR id > 10` is still not safely
+/// splittable into a partition predicate, so no partitions should be pruned.
#[tokio::test]
async fn test_read_partitioned_table_mixed_or_filter_preserves_all() {
use paimon::spec::{Datum, Predicate, PredicateBuilder};
@@ -679,7 +739,6 @@ async fn
test_read_partitioned_table_mixed_or_filter_preserves_all() {
let schema = table.schema();
let pb = PredicateBuilder::new(schema.fields());
- // dt = '2024-01-01' OR id > 10 — mixed OR is not safely splittable.
let filter = Predicate::or(vec![
pb.equal("dt", Datum::String("2024-01-01".into())).unwrap(),
pb.greater_than("id", Datum::Int(10)).unwrap(),
@@ -705,7 +764,7 @@ async fn
test_read_partitioned_table_mixed_or_filter_preserves_all() {
);
}
-/// Filter that matches no existing partition — all entries pruned, 0 splits.
+/// A filter that matches no partition should produce no splits.
#[tokio::test]
async fn test_read_partitioned_table_filter_matches_no_partition() {
use paimon::spec::{Datum, PredicateBuilder};
@@ -715,7 +774,6 @@ async fn
test_read_partitioned_table_filter_matches_no_partition() {
let schema = table.schema();
let pb = PredicateBuilder::new(schema.fields());
- // dt = '9999-12-31' matches no partition.
let filter = pb
.equal("dt", Datum::String("9999-12-31".into()))
.expect("Failed to build predicate");
@@ -744,8 +802,7 @@ async fn
test_read_partitioned_table_eval_row_error_fails_plan() {
.position(|f| f.name() == "dt")
.expect("dt partition column should exist");
- // Use an unsupported DataType in a partition leaf so remapping succeeds
- // but `eval_row` fails during partition pruning.
+ // Use an unsupported partition type so remapping succeeds but `eval_row`
fails.
let filter = Predicate::Leaf {
column: "dt".into(),
index: dt_index,
@@ -1086,6 +1143,100 @@ async fn test_read_schema_evolution_type_promotion() {
);
}
+/// Stats pruning should treat a newly added column as all-NULL for old files.
+#[tokio::test]
+async fn
test_stats_pruning_schema_evolution_added_column_eq_prunes_old_files() {
+ use paimon::spec::{Datum, PredicateBuilder};
+
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog,
"schema_evolution_add_column").await;
+ let pb = PredicateBuilder::new(table.schema().fields());
+ let filter = pb
+ .equal("age", Datum::Int(30))
+ .expect("Failed to build predicate");
+
+ let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+ assert_eq!(
+ plan.splits().len(),
+ 1,
+ "Only the file written after ADD COLUMN should survive stats pruning"
+ );
+
+ let actual = extract_id_name(&batches);
+ assert_eq!(
+ actual,
+ vec![(3, "carol".to_string())],
+ "Old files missing 'age' and rows with age != 30 should be pruned"
+ );
+}
+
+/// Stats pruning should keep only old files for IS NULL on a newly added
column.
+#[tokio::test]
+async fn
test_stats_pruning_schema_evolution_added_column_is_null_prunes_new_files() {
+ use paimon::spec::PredicateBuilder;
+
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog,
"schema_evolution_add_column").await;
+ let pb = PredicateBuilder::new(table.schema().fields());
+ let filter = pb.is_null("age").expect("Failed to build predicate");
+
+ let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+ assert_eq!(
+ plan.splits().len(),
+ 1,
+ "Only files missing 'age' should survive stats pruning for age IS NULL"
+ );
+
+ let actual = extract_id_name(&batches);
+ assert_eq!(
+ actual,
+ vec![(1, "alice".to_string()), (2, "bob".to_string())],
+ "New files with non-null age should be pruned for age IS NULL"
+ );
+}
+
+/// Stats pruning should still work after INT -> BIGINT type promotion.
+#[tokio::test]
+async fn
test_stats_pruning_schema_evolution_type_promotion_prunes_old_int_files() {
+ use paimon::spec::{Datum, PredicateBuilder};
+
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog,
"schema_evolution_type_promotion").await;
+ let pb = PredicateBuilder::new(table.schema().fields());
+ let filter = pb
+ .greater_than("value", Datum::Long(250))
+ .expect("Failed to build predicate");
+
+ let (plan, batches) = scan_and_read_with_filter(&table, filter).await;
+ assert_eq!(
+ plan.splits().len(),
+ 1,
+ "Old INT files should still be pruned using promoted BIGINT predicates"
+ );
+
+ let mut rows: Vec<(i32, i64)> = Vec::new();
+ for batch in &batches {
+ let id = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("id");
+ let value = batch
+ .column_by_name("value")
+ .and_then(|c| c.as_any().downcast_ref::<Int64Array>())
+ .expect("value");
+ for i in 0..batch.num_rows() {
+ rows.push((id.value(i), value.value(i)));
+ }
+ }
+ rows.sort_by_key(|(id, _)| *id);
+
+ assert_eq!(
+ rows,
+ vec![(3, 3_000_000_000i64)],
+ "Only the BIGINT file should remain after value > 250 pruning"
+ );
+}
+
/// Test reading a data-evolution table after ALTER TABLE ADD COLUMNS.
/// Old files lack the new column; reader should fill nulls even in data
evolution mode.
#[tokio::test]
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index 970fb5e..bcfe5ad 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -57,6 +57,7 @@ mod partition_utils;
pub(crate) use partition_utils::PartitionComputer;
mod predicate;
pub(crate) use predicate::eval_row;
+pub(crate) use predicate::extract_datum;
pub use predicate::{
field_idx_to_partition_idx, Datum, Predicate, PredicateBuilder,
PredicateOperator,
};
diff --git a/crates/paimon/src/spec/predicate.rs
b/crates/paimon/src/spec/predicate.rs
index 4c89820..953b92c 100644
--- a/crates/paimon/src/spec/predicate.rs
+++ b/crates/paimon/src/spec/predicate.rs
@@ -391,7 +391,7 @@ impl Predicate {
/// a mixed predicate is never partially remapped.
///
/// `mapping` is the output of `field_idx_to_partition_idx`.
- pub(crate) fn remap_field_index(self, mapping: &[Option<usize>]) ->
Option<Predicate> {
+ pub(crate) fn remap_field_index(&self, mapping: &[Option<usize>]) ->
Option<Predicate> {
match self {
Predicate::Leaf {
column,
@@ -400,25 +400,25 @@ impl Predicate {
op,
literals,
} => {
- let new_index = (*mapping.get(index)?)?;
+ let new_index = (*mapping.get(*index)?)?;
Some(Predicate::Leaf {
- column,
+ column: column.clone(),
index: new_index,
- data_type,
- op,
- literals,
+ data_type: data_type.clone(),
+ op: *op,
+ literals: literals.clone(),
})
}
Predicate::And(children) => {
let remapped: Option<Vec<_>> = children
- .into_iter()
+ .iter()
.map(|c| c.remap_field_index(mapping))
.collect();
Some(Predicate::and(remapped?))
}
Predicate::Or(children) => {
let remapped: Option<Vec<_>> = children
- .into_iter()
+ .iter()
.map(|c| c.remap_field_index(mapping))
.collect();
Some(Predicate::or(remapped?))
@@ -431,6 +431,77 @@ impl Predicate {
Predicate::AlwaysFalse => Some(Predicate::AlwaysFalse),
}
}
+
+ /// Check whether every leaf field in this subtree is present in `mapping`.
+ ///
+ /// This is used to decide whether the original conjunct still needs to be
+ /// retained as a residual data predicate after partition projection.
+ pub(crate) fn references_only_mapped_fields(&self, mapping:
&[Option<usize>]) -> bool {
+ match self {
+ Predicate::Leaf { index, .. } =>
mapping.get(*index).is_some_and(Option::is_some),
+ Predicate::And(children) | Predicate::Or(children) => children
+ .iter()
+ .all(|child| child.references_only_mapped_fields(mapping)),
+ Predicate::Not(inner) =>
inner.references_only_mapped_fields(mapping),
+ Predicate::AlwaysTrue | Predicate::AlwaysFalse => true,
+ }
+ }
+
+ /// Project leaf field indices from table schema space into a smaller
field space.
+ ///
+ /// Unlike [`Self::remap_field_index`], mixed `AND` subtrees keep the
children
+ /// that can be projected and drop the rest. `OR` and `NOT` still require
all
+ /// children to be projectable to preserve correctness.
+ ///
+ /// This matches the partition predicate extraction semantics used by Java
+ /// `splitPartitionPredicatesAndDataPredicates`.
+ pub(crate) fn project_field_index_inclusive(
+ &self,
+ mapping: &[Option<usize>],
+ ) -> Option<Predicate> {
+ match self {
+ Predicate::Leaf {
+ column,
+ index,
+ data_type,
+ op,
+ literals,
+ } => {
+ let new_index = (*mapping.get(*index)?)?;
+ Some(Predicate::Leaf {
+ column: column.clone(),
+ index: new_index,
+ data_type: data_type.clone(),
+ op: *op,
+ literals: literals.clone(),
+ })
+ }
+ Predicate::And(children) => {
+ let projected: Vec<_> = children
+ .iter()
+ .filter_map(|c| c.project_field_index_inclusive(mapping))
+ .collect();
+ if projected.is_empty() {
+ None
+ } else {
+ Some(Predicate::and(projected))
+ }
+ }
+ Predicate::Or(children) => {
+ let projected: Option<Vec<_>> = children
+ .iter()
+ .map(|c| c.project_field_index_inclusive(mapping))
+ .collect();
+ Some(Predicate::or(projected?))
+ }
+ Predicate::Not(inner) => {
+ let projected = inner.remap_field_index(mapping)?;
+ Some(Predicate::negate(projected))
+ }
+ Predicate::AlwaysTrue => Some(Predicate::AlwaysTrue),
+ Predicate::AlwaysFalse => Some(Predicate::AlwaysFalse),
+ }
+ }
}
impl fmt::Display for Predicate {
@@ -1713,4 +1784,101 @@ mod tests {
// NOT(partition AND data) → mixed under NOT → None
assert!(negated.remap_field_index(&mapping).is_none());
}
+
+ // ================== project_field_index_inclusive ==================
+
+ #[test]
+ fn test_project_inclusive_and_keeps_partition_children() {
+ let pb = PredicateBuilder::new(&test_fields());
+ let mixed = Predicate::and(vec![
+ pb.equal("dt", Datum::Date(19723)).unwrap(),
+ pb.greater_than("id", Datum::Int(10)).unwrap(),
+ ]);
+ let mapping = vec![None, None, Some(0), Some(1)];
+
+ let projected = mixed.project_field_index_inclusive(&mapping).unwrap();
+ match projected {
+ Predicate::Leaf { column, index, .. } => {
+ assert_eq!(column, "dt");
+ assert_eq!(index, 0);
+ }
+ other => panic!("expected projected partition leaf, got
{other:?}"),
+ }
+ }
+
+ #[test]
+ fn test_project_inclusive_and_all_data_returns_none() {
+ let pb = PredicateBuilder::new(&test_fields());
+ let data_only = Predicate::and(vec![
+ pb.equal("id", Datum::Int(1)).unwrap(),
+ pb.equal("name", Datum::String("alice".into())).unwrap(),
+ ]);
+ let mapping = vec![None, None, Some(0), Some(1)];
+
+ assert!(data_only.project_field_index_inclusive(&mapping).is_none());
+ }
+
+ #[test]
+ fn test_project_inclusive_or_with_mixed_returns_none() {
+ let pb = PredicateBuilder::new(&test_fields());
+ let p_partition = pb.equal("dt", Datum::Date(19723)).unwrap();
+ let p_data = pb.equal("id", Datum::Int(1)).unwrap();
+ let combined = Predicate::or(vec![p_partition, p_data]);
+ let mapping = vec![None, None, Some(0), Some(1)];
+
+ assert!(combined.project_field_index_inclusive(&mapping).is_none());
+ }
+
+ #[test]
+ fn test_project_inclusive_or_of_mixed_ands_projects_each_branch() {
+ let pb = PredicateBuilder::new(&test_fields());
+ let left = Predicate::and(vec![
+ pb.equal("dt", Datum::Date(19723)).unwrap(),
+ pb.greater_than("id", Datum::Int(10)).unwrap(),
+ ]);
+ let right = Predicate::and(vec![
+ pb.equal("hr", Datum::Int(10)).unwrap(),
+ pb.equal("name", Datum::String("alice".into())).unwrap(),
+ ]);
+ let combined = Predicate::or(vec![left, right]);
+ let mapping = vec![None, None, Some(0), Some(1)];
+
+ let projected =
combined.project_field_index_inclusive(&mapping).unwrap();
+ match projected {
+ Predicate::Or(children) => {
+ assert_eq!(children.len(), 2);
+ assert!(matches!(
+ &children[0],
+ Predicate::Leaf {
+ column,
+ index: 0,
+ ..
+ } if column == "dt"
+ ));
+ assert!(matches!(
+ &children[1],
+ Predicate::Leaf {
+ column,
+ index: 1,
+ ..
+ } if column == "hr"
+ ));
+ }
+ other => panic!("expected projected OR, got {other:?}"),
+ }
+ }
+
+ #[test]
+ fn test_project_inclusive_not_with_mixed_returns_none() {
+ let pb = PredicateBuilder::new(&test_fields());
+ let inner = Predicate::and(vec![
+ pb.equal("dt", Datum::Date(19723)).unwrap(),
+ pb.greater_than("id", Datum::Int(10)).unwrap(),
+ ]);
+ let mapping = vec![None, None, Some(0), Some(1)];
+
+ assert!(Predicate::negate(inner)
+ .project_field_index_inclusive(&mapping)
+ .is_none());
+ }
}
diff --git a/crates/paimon/src/table/read_builder.rs
b/crates/paimon/src/table/read_builder.rs
index db5c42c..d55ecc1 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -60,15 +60,15 @@ impl<'a> ReadBuilder<'a> {
/// Set a filter predicate for scan planning.
///
/// The predicate should use table schema field indices (as produced by
- /// [`PredicateBuilder`]). During [`TableScan::plan`] the filter is
- /// decomposed at AND boundaries: partition-only conjuncts are extracted
- /// and used to prune partitions; all other conjuncts are currently
- /// **ignored** (neither `TableScan` nor `TableRead` applies data-level
- /// predicates yet).
+ /// [`PredicateBuilder`]). During [`TableScan::plan`], partition-only
+ /// conjuncts are used for partition pruning and supported data conjuncts
+ /// may be used for conservative file-stats pruning.
///
- /// This means rows returned by `TableRead` may **not** satisfy the full
- /// filter — callers must apply remaining predicates themselves until
- /// data-level pushdown is implemented.
+ /// Stats pruning is per file. Files with a different `schema_id`,
+ /// incompatible stats layout, or inconclusive stats are kept.
+ ///
+ /// [`TableRead`] does not evaluate row-level filters; callers must apply
+ /// any remaining predicates themselves.
pub fn with_filter(&mut self, filter: Predicate) -> &mut Self {
self.filter = Some(filter);
self
diff --git a/crates/paimon/src/table/table_scan.rs
b/crates/paimon/src/table/table_scan.rs
index 60d32a5..2f230ce 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -21,17 +21,21 @@
//! and
[FullStartingScanner](https://github.com/apache/paimon/blob/release-1.3/paimon-python/pypaimon/read/scanner/full_starting_scanner.py).
use super::Table;
+use crate::arrow::schema_evolution::create_index_mapping;
use crate::io::FileIO;
use crate::spec::{
- eval_row, field_idx_to_partition_idx, BinaryRow, CoreOptions,
DataFileMeta, FileKind,
- IndexManifest, ManifestEntry, PartitionComputer, Predicate, Snapshot,
+ eval_row, extract_datum, field_idx_to_partition_idx, BinaryRow,
CoreOptions, DataField,
+ DataFileMeta, DataType, Datum, FileKind, IndexManifest, ManifestEntry,
PartitionComputer,
+ Predicate, PredicateOperator, Snapshot,
};
use crate::table::bin_pack::split_for_batch;
use crate::table::source::{DataSplit, DataSplitBuilder, DeletionFile,
PartitionBucket, Plan};
use crate::table::SnapshotManager;
use crate::table::TagManager;
use crate::Error;
+use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
/// Path segment for manifest directory under table.
const MANIFEST_DIR: &str = "manifest";
@@ -219,6 +223,411 @@ pub(crate) fn group_by_overlapping_row_id(mut files:
Vec<DataFileMeta>) -> Vec<V
result
}
+#[derive(Debug, Clone)]
+struct FileStatsRows {
+ row_count: i64,
+ min_values: Option<BinaryRow>,
+ max_values: Option<BinaryRow>,
+ null_counts: Vec<i64>,
+}
+
+impl FileStatsRows {
+ /// Build file stats only when they are compatible with the expected file
schema.
+ fn try_from_data_file(file: &DataFileMeta, expected_fields: usize) ->
Option<Self> {
+ let stats = Self {
+ row_count: file.row_count,
+ min_values:
BinaryRow::from_serialized_bytes(file.value_stats.min_values()).ok(),
+ max_values:
BinaryRow::from_serialized_bytes(file.value_stats.max_values()).ok(),
+ null_counts: file.value_stats.null_counts().clone(),
+ };
+
+ stats.arity_matches(expected_fields).then_some(stats)
+ }
+
+ fn null_count(&self, index: usize) -> Option<i64> {
+ self.null_counts.get(index).copied()
+ }
+
+ /// Check whether the stats rows have the expected number of fields.
+ ///
+ /// If either min or max BinaryRow has an arity different from
+ /// `expected_fields`, the stats were likely written in dense mode or
+ /// under a different schema — making index-based access unsafe.
+ fn arity_matches(&self, expected_fields: usize) -> bool {
+ let min_ok = self
+ .min_values
+ .as_ref()
+ .is_none_or(|r| r.arity() as usize == expected_fields);
+ let max_ok = self
+ .max_values
+ .as_ref()
+ .is_none_or(|r| r.arity() as usize == expected_fields);
+ let null_ok = self.null_counts.is_empty() || self.null_counts.len() ==
expected_fields;
+ min_ok && max_ok && null_ok
+ }
+}
+
+#[derive(Debug)]
+struct ResolvedStatsSchema {
+ file_fields: Vec<DataField>,
+ field_mapping: Vec<Option<usize>>,
+}
+
+fn identity_field_mapping(num_fields: usize) -> Vec<Option<usize>> {
+ (0..num_fields).map(Some).collect()
+}
+
+fn normalize_field_mapping(mapping: Option<Vec<i32>>, num_fields: usize) ->
Vec<Option<usize>> {
+ mapping
+ .map(|field_mapping| {
+ field_mapping
+ .into_iter()
+ .map(|index| usize::try_from(index).ok())
+ .collect()
+ })
+ .unwrap_or_else(|| identity_field_mapping(num_fields))
+}
+
+fn split_partition_and_data_predicates(
+ filter: Predicate,
+ fields: &[DataField],
+ partition_keys: &[String],
+) -> (Option<Predicate>, Vec<Predicate>) {
+ let mapping = field_idx_to_partition_idx(fields, partition_keys);
+ let mut partition_predicates = Vec::new();
+ let mut data_predicates = Vec::new();
+
+ for conjunct in filter.split_and() {
+ let strict_partition_only =
conjunct.references_only_mapped_fields(&mapping);
+
+ if let Some(projected) =
conjunct.project_field_index_inclusive(&mapping) {
+ partition_predicates.push(projected);
+ }
+
+ // Keep any conjunct that is not fully partition-only for data-level
+ // stats pruning, even if part of it contributed to partition pruning.
+ if !strict_partition_only {
+ data_predicates.push(conjunct);
+ }
+ }
+
+ let partition_predicate = if partition_predicates.is_empty() {
+ None
+ } else {
+ Some(Predicate::and(partition_predicates))
+ };
+
+ (partition_predicate, data_predicates)
+}
+
+/// Check whether a data file *may* contain rows matching all `predicates`.
+///
+/// Pruning is evaluated per file and fails open when stats cannot be
+/// interpreted safely, including schema mismatches, incompatible stats arity,
+/// and missing or corrupted stats. Mixed-schema tables can still prune files
+/// written with the current schema; unsupported or inconclusive predicates are
+/// conservatively kept.
+fn data_file_matches_predicates(
+ file: &DataFileMeta,
+ predicates: &[Predicate],
+ current_schema_id: i64,
+ num_fields: usize,
+) -> bool {
+ if predicates.is_empty() {
+ return true;
+ }
+
+ // Evaluate constant predicates before consulting stats.
+ if predicates
+ .iter()
+ .any(|p| matches!(p, Predicate::AlwaysFalse))
+ {
+ return false;
+ }
+ if predicates
+ .iter()
+ .all(|p| matches!(p, Predicate::AlwaysTrue))
+ {
+ return true;
+ }
+
+ if file.schema_id != current_schema_id {
+ return true;
+ }
+
+ // Fail open if schema evolution or stats layout make index-based access
unsafe.
+ let Some(stats) = FileStatsRows::try_from_data_file(file, num_fields) else
{
+ return true;
+ };
+
+ predicates
+ .iter()
+ .all(|predicate| data_predicate_may_match(predicate, &stats))
+}
+
+async fn resolve_stats_schema(
+ table: &Table,
+ file_schema_id: i64,
+ schema_cache: &mut HashMap<i64, Option<Arc<ResolvedStatsSchema>>>,
+) -> Option<Arc<ResolvedStatsSchema>> {
+ if let Some(cached) = schema_cache.get(&file_schema_id) {
+ return cached.clone();
+ }
+
+ let table_schema = table.schema();
+ let current_fields = table_schema.fields();
+ let resolved = if file_schema_id == table_schema.id() {
+ Some(Arc::new(ResolvedStatsSchema {
+ file_fields: current_fields.to_vec(),
+ field_mapping: identity_field_mapping(current_fields.len()),
+ }))
+ } else {
+ let file_schema =
table.schema_manager().schema(file_schema_id).await.ok()?;
+ let file_fields = file_schema.fields().to_vec();
+ Some(Arc::new(ResolvedStatsSchema {
+ field_mapping: normalize_field_mapping(
+ create_index_mapping(current_fields, &file_fields),
+ current_fields.len(),
+ ),
+ file_fields,
+ }))
+ };
+
+ schema_cache.insert(file_schema_id, resolved.clone());
+ resolved
+}
+
+async fn data_file_matches_predicates_for_table(
+ table: &Table,
+ file: &DataFileMeta,
+ predicates: &[Predicate],
+ schema_cache: &mut HashMap<i64, Option<Arc<ResolvedStatsSchema>>>,
+) -> bool {
+ if predicates.is_empty() {
+ return true;
+ }
+
+ if file.schema_id == table.schema().id() {
+ return data_file_matches_predicates(
+ file,
+ predicates,
+ table.schema().id(),
+ table.schema().fields().len(),
+ );
+ }
+
+ let Some(resolved) = resolve_stats_schema(table, file.schema_id,
schema_cache).await else {
+ return true;
+ };
+
+ let Some(stats) = FileStatsRows::try_from_data_file(file,
resolved.file_fields.len()) else {
+ return true;
+ };
+
+ predicates.iter().all(|predicate| {
+ data_predicate_may_match_with_schema(
+ predicate,
+ &stats,
+ &resolved.field_mapping,
+ &resolved.file_fields,
+ )
+ })
+}
+
+fn data_predicate_may_match(predicate: &Predicate, stats: &FileStatsRows) ->
bool {
+ match predicate {
+ Predicate::AlwaysTrue => true,
+ Predicate::AlwaysFalse => false,
+ Predicate::And(children) => children
+ .iter()
+ .all(|child| data_predicate_may_match(child, stats)),
+ // Keep the first version conservative: only prune simple leaves and
conjunctions.
+ Predicate::Or(_) | Predicate::Not(_) => true,
+ Predicate::Leaf {
+ index,
+ data_type,
+ op,
+ literals,
+ ..
+ } => data_leaf_may_match(*index, data_type, data_type, *op, literals,
stats),
+ }
+}
+
+fn data_predicate_may_match_with_schema(
+ predicate: &Predicate,
+ stats: &FileStatsRows,
+ field_mapping: &[Option<usize>],
+ file_fields: &[DataField],
+) -> bool {
+ match predicate {
+ Predicate::AlwaysTrue => true,
+ Predicate::AlwaysFalse => false,
+ Predicate::And(children) => children.iter().all(|child| {
+ data_predicate_may_match_with_schema(child, stats, field_mapping,
file_fields)
+ }),
+ // Keep the first version conservative: only prune simple leaves and
conjunctions.
+ Predicate::Or(_) | Predicate::Not(_) => true,
+ Predicate::Leaf {
+ index,
+ data_type,
+ op,
+ literals,
+ ..
+ } => match field_mapping.get(*index).copied().flatten() {
+ Some(file_index) => {
+ let Some(file_field) = file_fields.get(file_index) else {
+ return true;
+ };
+ data_leaf_may_match(
+ file_index,
+ file_field.data_type(),
+ data_type,
+ *op,
+ literals,
+ stats,
+ )
+ }
+ None => missing_field_may_match(*op, stats.row_count),
+ },
+ }
+}
+
+fn data_leaf_may_match(
+ index: usize,
+ stats_data_type: &DataType,
+ predicate_data_type: &DataType,
+ op: PredicateOperator,
+ literals: &[Datum],
+ stats: &FileStatsRows,
+) -> bool {
+ let row_count = stats.row_count;
+ if row_count <= 0 {
+ return false;
+ }
+
+ let null_count = stats.null_count(index);
+ let all_null = null_count.map(|count| count == row_count);
+
+ match op {
+ PredicateOperator::IsNull => {
+ return null_count.is_none_or(|count| count > 0);
+ }
+ PredicateOperator::IsNotNull => {
+ return all_null != Some(true);
+ }
+ PredicateOperator::In | PredicateOperator::NotIn => {
+ return true;
+ }
+ PredicateOperator::Eq
+ | PredicateOperator::NotEq
+ | PredicateOperator::Lt
+ | PredicateOperator::LtEq
+ | PredicateOperator::Gt
+ | PredicateOperator::GtEq => {}
+ }
+
+ if all_null == Some(true) {
+ return false;
+ }
+
+ let literal = match literals.first() {
+ Some(literal) => literal,
+ None => return true,
+ };
+
+ let min_value = match stats
+ .min_values
+ .as_ref()
+ .and_then(|row| extract_stats_datum(row, index, stats_data_type))
+ .and_then(|datum| coerce_stats_datum_for_predicate(datum,
predicate_data_type))
+ {
+ Some(value) => value,
+ None => return true,
+ };
+ let max_value = match stats
+ .max_values
+ .as_ref()
+ .and_then(|row| extract_stats_datum(row, index, stats_data_type))
+ .and_then(|datum| coerce_stats_datum_for_predicate(datum,
predicate_data_type))
+ {
+ Some(value) => value,
+ None => return true,
+ };
+
+ match op {
+ PredicateOperator::Eq => {
+ !matches!(literal.partial_cmp(&min_value), Some(Ordering::Less))
+ && !matches!(literal.partial_cmp(&max_value),
Some(Ordering::Greater))
+ }
+ PredicateOperator::NotEq => !(min_value == *literal && max_value ==
*literal),
+ PredicateOperator::Lt => !matches!(
+ min_value.partial_cmp(literal),
+ Some(Ordering::Greater | Ordering::Equal)
+ ),
+ PredicateOperator::LtEq => {
+ !matches!(min_value.partial_cmp(literal), Some(Ordering::Greater))
+ }
+ PredicateOperator::Gt => !matches!(
+ max_value.partial_cmp(literal),
+ Some(Ordering::Less | Ordering::Equal)
+ ),
+ PredicateOperator::GtEq => !matches!(max_value.partial_cmp(literal),
Some(Ordering::Less)),
+ PredicateOperator::IsNull
+ | PredicateOperator::IsNotNull
+ | PredicateOperator::In
+ | PredicateOperator::NotIn => true,
+ }
+}
+
+fn missing_field_may_match(op: PredicateOperator, row_count: i64) -> bool {
+ if row_count <= 0 {
+ return false;
+ }
+
+ matches!(op, PredicateOperator::IsNull)
+}
+
+fn coerce_stats_datum_for_predicate(datum: Datum, predicate_data_type:
&DataType) -> Option<Datum> {
+ match (datum, predicate_data_type) {
+ (datum @ Datum::Bool(_), DataType::Boolean(_))
+ | (datum @ Datum::TinyInt(_), DataType::TinyInt(_))
+ | (datum @ Datum::SmallInt(_), DataType::SmallInt(_))
+ | (datum @ Datum::Int(_), DataType::Int(_))
+ | (datum @ Datum::Long(_), DataType::BigInt(_))
+ | (datum @ Datum::Float(_), DataType::Float(_))
+ | (datum @ Datum::Double(_), DataType::Double(_))
+ | (datum @ Datum::String(_), DataType::VarChar(_))
+ | (datum @ Datum::String(_), DataType::Char(_))
+ | (datum @ Datum::Bytes(_), DataType::Binary(_))
+ | (datum @ Datum::Bytes(_), DataType::VarBinary(_))
+ | (datum @ Datum::Date(_), DataType::Date(_))
+ | (datum @ Datum::Time(_), DataType::Time(_))
+ | (datum @ Datum::Timestamp { .. }, DataType::Timestamp(_))
+ | (datum @ Datum::LocalZonedTimestamp { .. },
DataType::LocalZonedTimestamp(_))
+ | (datum @ Datum::Decimal { .. }, DataType::Decimal(_)) => Some(datum),
+ (Datum::TinyInt(value), DataType::SmallInt(_)) =>
Some(Datum::SmallInt(value as i16)),
+ (Datum::TinyInt(value), DataType::Int(_)) => Some(Datum::Int(value as
i32)),
+ (Datum::TinyInt(value), DataType::BigInt(_)) => Some(Datum::Long(value
as i64)),
+ (Datum::SmallInt(value), DataType::Int(_)) => Some(Datum::Int(value as
i32)),
+ (Datum::SmallInt(value), DataType::BigInt(_)) =>
Some(Datum::Long(value as i64)),
+ (Datum::Int(value), DataType::BigInt(_)) => Some(Datum::Long(value as
i64)),
+ (Datum::Float(value), DataType::Double(_)) => Some(Datum::Double(value
as f64)),
+ _ => None,
+ }
+}
+
+fn extract_stats_datum(row: &BinaryRow, index: usize, data_type: &DataType) ->
Option<Datum> {
+ let min_row_len = BinaryRow::cal_fix_part_size_in_bytes(row.arity()) as
usize;
+ if index >= row.arity() as usize || row.data().len() < min_row_len {
+ return None;
+ }
+
+ match extract_datum(row, index, data_type) {
+ Ok(Some(datum)) => Some(datum),
+ Ok(None) | Err(_) => None,
+ }
+}
+
/// TableScan for full table scan (no incremental, no predicate).
///
/// Reference:
[pypaimon.read.table_scan.TableScan](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/table_scan.py)
@@ -348,35 +757,23 @@ impl<'a> TableScan<'a> {
return Ok(Plan::new(Vec::new()));
}
- // --- Partition predicate extraction ---
let partition_keys = self.table.schema().partition_keys();
- let partition_predicate = if !partition_keys.is_empty() {
- self.filter.clone().and_then(|filter| {
- let mapping =
- field_idx_to_partition_idx(self.table.schema().fields(),
partition_keys);
- let conjuncts = filter.split_and();
- let remapped: Vec<Predicate> = conjuncts
- .into_iter()
- .filter_map(|c| c.remap_field_index(&mapping))
- .collect();
- if remapped.is_empty() {
- None
- } else {
- Some(Predicate::and(remapped))
- }
- })
+ let (partition_predicate, data_predicates) = if let Some(filter) =
self.filter.clone() {
+ if partition_keys.is_empty() {
+ (None, filter.split_and())
+ } else {
+ split_partition_and_data_predicates(
+ filter,
+ self.table.schema().fields(),
+ partition_keys,
+ )
+ }
} else {
- None
+ (None, Vec::new())
};
- // --- Partition pruning: filter manifest entries before grouping ---
- //
- // Note: split construction later still requires a decodable BinaryRow
- // and will fail on corrupt partition bytes. Pruning is intentionally
- // best-effort; split construction is mandatory.
let entries = if let Some(ref pred) = partition_predicate {
let mut kept = Vec::with_capacity(entries.len());
- // Cache: partition bytes → accept/reject to avoid re-decoding.
let mut cache: HashMap<Vec<u8>, bool> = HashMap::new();
for e in entries {
let accept = match cache.get(e.partition()) {
@@ -400,6 +797,32 @@ impl<'a> TableScan<'a> {
return Ok(Plan::new(Vec::new()));
}
+ // Data-evolution tables can spread one logical row across multiple
files with
+ // different column sets. Pruning files independently would split
merge groups,
+ // so keep the current fail-open behavior until we support group-aware
pruning.
+ let entries = if data_predicates.is_empty() || data_evolution_enabled {
+ entries
+ } else {
+ let mut kept = Vec::with_capacity(entries.len());
+ let mut schema_cache: HashMap<i64,
Option<Arc<ResolvedStatsSchema>>> = HashMap::new();
+ for entry in entries {
+ if data_file_matches_predicates_for_table(
+ self.table,
+ entry.file(),
+ &data_predicates,
+ &mut schema_cache,
+ )
+ .await
+ {
+ kept.push(entry);
+ }
+ }
+ kept
+ };
+ if entries.is_empty() {
+ return Ok(Plan::new(Vec::new()));
+ }
+
// Group by (partition, bucket). Key = (partition_bytes, bucket).
let mut groups: HashMap<(Vec<u8>, i32), Vec<ManifestEntry>> =
HashMap::new();
for e in entries {
@@ -462,21 +885,19 @@ impl<'a> TableScan<'a> {
.as_ref()
.and_then(|map| map.get(&PartitionBucket::new(partition,
bucket)));
- // Split files into groups: data evolution merges overlapping
row_id ranges;
- // multi-file groups need column-wise merge, single-file groups
can be bin-packed.
+ // Data-evolution tables merge overlapping row-id groups
column-wise during read.
+ // Keep that split boundary intact and only bin-pack single-file
groups.
let file_groups_with_raw: Vec<(Vec<DataFileMeta>, bool)> = if
data_evolution_enabled {
let row_id_groups = group_by_overlapping_row_id(data_files);
- let (singles, multis): (Vec<_>, Vec<_>) =
- row_id_groups.into_iter().partition(|g| g.len() == 1);
-
- let mut result: Vec<(Vec<DataFileMeta>, bool)> = Vec::new();
+ let (singles, multis): (Vec<_>, Vec<_>) = row_id_groups
+ .into_iter()
+ .partition(|group| group.len() == 1);
- // Multi-file groups: each becomes its own split,
raw_convertible=false
+ let mut result = Vec::new();
for group in multis {
result.push((group, false));
}
- // Single-file groups: flatten and bin-pack,
raw_convertible=true
let single_files: Vec<DataFileMeta> =
singles.into_iter().flatten().collect();
for file_group in split_for_batch(single_files,
target_split_size, open_file_cost) {
result.push((file_group, true));
@@ -486,7 +907,7 @@ impl<'a> TableScan<'a> {
} else {
split_for_batch(data_files, target_split_size, open_file_cost)
.into_iter()
- .map(|g| (g, true))
+ .map(|group| (group, true))
.collect()
};
@@ -522,7 +943,9 @@ impl<'a> TableScan<'a> {
#[cfg(test)]
mod tests {
- use super::{group_by_overlapping_row_id, partition_matches_predicate};
+ use super::{
+ data_file_matches_predicates, group_by_overlapping_row_id,
partition_matches_predicate,
+ };
use crate::spec::{
stats::BinaryTableStats, ArrayType, DataField, DataFileMeta, DataType,
Datum,
DeletionVectorMeta, FileKind, IndexFileMeta, IndexManifestEntry,
IntType, Predicate,
@@ -530,43 +953,7 @@ mod tests {
};
use crate::table::source::DeletionFile;
use crate::Error;
- use chrono::{DateTime, Utc};
-
- /// Helper to build a DataFileMeta with data evolution fields.
- fn make_evo_file(
- name: &str,
- file_size: i64,
- row_count: i64,
- max_seq: i64,
- first_row_id: Option<i64>,
- ) -> DataFileMeta {
- DataFileMeta {
- file_name: name.to_string(),
- file_size,
- row_count,
- min_key: Vec::new(),
- max_key: Vec::new(),
- key_stats: BinaryTableStats::new(Vec::new(), Vec::new(),
Vec::new()),
- value_stats: BinaryTableStats::new(Vec::new(), Vec::new(),
Vec::new()),
- min_sequence_number: 0,
- max_sequence_number: max_seq,
- schema_id: 0,
- level: 0,
- extra_files: Vec::new(),
- creation_time: DateTime::<Utc>::from_timestamp(0, 0).unwrap(),
- delete_row_count: None,
- embedded_index: None,
- first_row_id,
- write_cols: None,
- }
- }
-
- fn file_names(groups: &[Vec<DataFileMeta>]) -> Vec<Vec<&str>> {
- groups
- .iter()
- .map(|g| g.iter().map(|f| f.file_name.as_str()).collect())
- .collect()
- }
+ use chrono::Utc;
struct SerializedBinaryRowBuilder {
arity: i32,
@@ -605,6 +992,51 @@ mod tests {
}
}
+ struct RawBinaryRowBuilder {
+ arity: i32,
+ null_bits_size: usize,
+ data: Vec<u8>,
+ }
+
+ impl RawBinaryRowBuilder {
+ fn new(arity: i32) -> Self {
+ let null_bits_size =
crate::spec::BinaryRow::cal_bit_set_width_in_bytes(arity) as usize;
+ let fixed_part_size = null_bits_size + (arity as usize) * 8;
+ Self {
+ arity,
+ null_bits_size,
+ data: vec![0u8; fixed_part_size],
+ }
+ }
+
+ fn field_offset(&self, pos: usize) -> usize {
+ self.null_bits_size + pos * 8
+ }
+
+ fn set_null_at(&mut self, pos: usize) {
+ let bit_index = pos + crate::spec::BinaryRow::HEADER_SIZE_IN_BYTES
as usize;
+ let byte_index = bit_index / 8;
+ let bit_offset = bit_index % 8;
+ self.data[byte_index] |= 1 << bit_offset;
+
+ let offset = self.field_offset(pos);
+ self.data[offset..offset + 8].fill(0);
+ }
+
+ fn write_int(&mut self, pos: usize, value: i32) {
+ let offset = self.field_offset(pos);
+ self.data[offset..offset +
4].copy_from_slice(&value.to_le_bytes());
+ }
+
+ fn build(self) -> Vec<u8> {
+ debug_assert_eq!(
+ self.data.len(),
+ self.null_bits_size + (self.arity as usize) * 8
+ );
+ self.data
+ }
+ }
+
fn partition_string_field() -> Vec<DataField> {
vec![DataField::new(
0,
@@ -613,6 +1045,105 @@ mod tests {
)]
}
+ fn int_field() -> Vec<DataField> {
+ vec![DataField::new(
+ 0,
+ "id".to_string(),
+ DataType::Int(IntType::new()),
+ )]
+ }
+
+ fn int_stats_row(value: Option<i32>) -> Vec<u8> {
+ let mut builder = RawBinaryRowBuilder::new(1);
+ match value {
+ Some(value) => builder.write_int(0, value),
+ None => builder.set_null_at(0),
+ }
+ let raw = builder.build();
+ let mut serialized = Vec::with_capacity(4 + raw.len());
+ serialized.extend_from_slice(&(1_i32).to_be_bytes());
+ serialized.extend_from_slice(&raw);
+ serialized
+ }
+
+ fn test_data_file_meta(
+ min_values: Vec<u8>,
+ max_values: Vec<u8>,
+ null_counts: Vec<i64>,
+ row_count: i64,
+ ) -> DataFileMeta {
+ test_data_file_meta_with_schema(
+ min_values,
+ max_values,
+ null_counts,
+ row_count,
+ 0, // default schema_id
+ )
+ }
+
+ fn test_data_file_meta_with_schema(
+ min_values: Vec<u8>,
+ max_values: Vec<u8>,
+ null_counts: Vec<i64>,
+ row_count: i64,
+ schema_id: i64,
+ ) -> DataFileMeta {
+ DataFileMeta {
+ file_name: "test.parquet".into(),
+ file_size: 128,
+ row_count,
+ min_key: Vec::new(),
+ max_key: Vec::new(),
+ key_stats: BinaryTableStats::new(Vec::new(), Vec::new(),
Vec::new()),
+ value_stats: BinaryTableStats::new(min_values, max_values,
null_counts),
+ min_sequence_number: 0,
+ max_sequence_number: 0,
+ schema_id,
+ level: 1,
+ extra_files: Vec::new(),
+ creation_time: Utc::now(),
+ delete_row_count: None,
+ embedded_index: None,
+ first_row_id: None,
+ write_cols: None,
+ }
+ }
+
+ fn make_evo_file(
+ name: &str,
+ file_size: i64,
+ row_count: i64,
+ max_seq: i64,
+ first_row_id: Option<i64>,
+ ) -> DataFileMeta {
+ DataFileMeta {
+ file_name: name.to_string(),
+ file_size,
+ row_count,
+ min_key: Vec::new(),
+ max_key: Vec::new(),
+ key_stats: BinaryTableStats::new(Vec::new(), Vec::new(),
Vec::new()),
+ value_stats: BinaryTableStats::new(Vec::new(), Vec::new(),
Vec::new()),
+ min_sequence_number: max_seq,
+ max_sequence_number: max_seq,
+ schema_id: 0,
+ level: 0,
+ extra_files: Vec::new(),
+ creation_time: Utc::now(),
+ delete_row_count: None,
+ embedded_index: None,
+ first_row_id,
+ write_cols: None,
+ }
+ }
+
+ fn file_names(groups: &[Vec<DataFileMeta>]) -> Vec<Vec<&str>> {
+ groups
+ .iter()
+ .map(|group| group.iter().map(|file|
file.file_name.as_str()).collect())
+ .collect()
+ }
+
#[test]
fn test_partition_matches_predicate_decode_failure_fails_open() {
let predicate = PredicateBuilder::new(&partition_string_field())
@@ -645,7 +1176,8 @@ mod tests {
);
}
- // ==================== group_by_overlapping_row_id tests
====================
+ const TEST_SCHEMA_ID: i64 = 0;
+ const TEST_NUM_FIELDS: usize = 1;
#[test]
fn test_group_by_overlapping_row_id_empty() {
@@ -655,8 +1187,6 @@ mod tests {
#[test]
fn test_group_by_overlapping_row_id_no_row_ids() {
- // Files without first_row_id each become their own group.
- // Sorted by (i64::MIN, -max_seq), so b(seq=2) before a(seq=1).
let files = vec![
make_evo_file("a", 10, 100, 1, None),
make_evo_file("b", 10, 100, 2, None),
@@ -667,7 +1197,6 @@ mod tests {
#[test]
fn test_group_by_overlapping_row_id_same_range() {
- // Two files with the same first_row_id and row_count → same range →
one group.
let files = vec![
make_evo_file("a", 10, 100, 2, Some(0)),
make_evo_file("b", 10, 100, 1, Some(0)),
@@ -679,7 +1208,6 @@ mod tests {
#[test]
fn test_group_by_overlapping_row_id_overlapping_ranges() {
- // File a: rows [0, 99], file b: rows [50, 149] → overlapping → one
group.
let files = vec![
make_evo_file("a", 10, 100, 1, Some(0)),
make_evo_file("b", 10, 100, 2, Some(50)),
@@ -691,7 +1219,6 @@ mod tests {
#[test]
fn test_group_by_overlapping_row_id_non_overlapping() {
- // File a: rows [0, 99], file b: rows [100, 199] → no overlap → two
groups.
let files = vec![
make_evo_file("a", 10, 100, 1, Some(0)),
make_evo_file("b", 10, 100, 2, Some(100)),
@@ -703,8 +1230,6 @@ mod tests {
#[test]
fn test_group_by_overlapping_row_id_mixed() {
- // a: [0,99], b: [0,99] (overlap), c: None (own group), d: [200,299]
- // After sort: c(None→MIN) comes first, then b(seq=2), a(seq=1), d.
let files = vec![
make_evo_file("a", 10, 100, 1, Some(0)),
make_evo_file("b", 10, 100, 2, Some(0)),
@@ -720,7 +1245,6 @@ mod tests {
#[test]
fn test_group_by_overlapping_row_id_sorted_by_seq() {
- // Within a group, files are sorted by (first_row_id,
-max_sequence_number).
let files = vec![
make_evo_file("a", 10, 100, 1, Some(0)),
make_evo_file("b", 10, 100, 3, Some(0)),
@@ -728,10 +1252,182 @@ mod tests {
];
let groups = group_by_overlapping_row_id(files);
assert_eq!(groups.len(), 1);
- // Sorted by descending max_sequence_number: b(3), c(2), a(1)
assert_eq!(file_names(&groups), vec![vec!["b", "c", "a"]]);
}
+ #[test]
+ fn test_data_file_matches_eq_prunes_out_of_range() {
+ let fields = int_field();
+ let file =
+ test_data_file_meta(int_stats_row(Some(10)),
int_stats_row(Some(20)), vec![0], 5);
+ let predicate = PredicateBuilder::new(&fields)
+ .equal("id", Datum::Int(30))
+ .unwrap();
+
+ assert!(!data_file_matches_predicates(
+ &file,
+ &[predicate],
+ TEST_SCHEMA_ID,
+ TEST_NUM_FIELDS,
+ ));
+ }
+
+ #[test]
+ fn test_data_file_matches_is_null_prunes_when_null_count_is_zero() {
+ let fields = int_field();
+ let file =
+ test_data_file_meta(int_stats_row(Some(10)),
int_stats_row(Some(20)), vec![0], 5);
+ let predicate = PredicateBuilder::new(&fields).is_null("id").unwrap();
+
+ assert!(!data_file_matches_predicates(
+ &file,
+ &[predicate],
+ TEST_SCHEMA_ID,
+ TEST_NUM_FIELDS,
+ ));
+ }
+
+ #[test]
+ fn test_data_file_matches_is_not_null_prunes_all_null_file() {
+ let fields = int_field();
+ let file = test_data_file_meta(int_stats_row(None),
int_stats_row(None), vec![5], 5);
+ let predicate =
PredicateBuilder::new(&fields).is_not_null("id").unwrap();
+
+ assert!(!data_file_matches_predicates(
+ &file,
+ &[predicate],
+ TEST_SCHEMA_ID,
+ TEST_NUM_FIELDS,
+ ));
+ }
+
+ #[test]
+ fn test_data_file_matches_unsupported_predicate_fails_open() {
+ let fields = int_field();
+ let file =
+ test_data_file_meta(int_stats_row(Some(10)),
int_stats_row(Some(20)), vec![0], 5);
+ let pb = PredicateBuilder::new(&fields);
+ let predicate = Predicate::or(vec![
+ pb.less_than("id", Datum::Int(5)).unwrap(),
+ pb.greater_than("id", Datum::Int(25)).unwrap(),
+ ]);
+
+ assert!(data_file_matches_predicates(
+ &file,
+ &[predicate],
+ TEST_SCHEMA_ID,
+ TEST_NUM_FIELDS,
+ ));
+ }
+
+ #[test]
+ fn test_data_file_matches_corrupt_stats_fails_open() {
+ let fields = int_field();
+ let file = test_data_file_meta(Vec::new(), Vec::new(), vec![0], 5);
+ let predicate = PredicateBuilder::new(&fields)
+ .equal("id", Datum::Int(30))
+ .unwrap();
+
+ assert!(data_file_matches_predicates(
+ &file,
+ &[predicate],
+ TEST_SCHEMA_ID,
+ TEST_NUM_FIELDS,
+ ));
+ }
+
+ #[test]
+ fn test_data_file_matches_schema_mismatch_fails_open() {
+ let fields = int_field();
+ let file = test_data_file_meta_with_schema(
+ int_stats_row(Some(10)),
+ int_stats_row(Some(20)),
+ vec![0],
+ 5,
+ 5,
+ );
+ let predicate = PredicateBuilder::new(&fields)
+ .equal("id", Datum::Int(30))
+ .unwrap();
+
+ assert!(data_file_matches_predicates(
+ &file,
+ &[predicate],
+ TEST_SCHEMA_ID,
+ TEST_NUM_FIELDS,
+ ));
+ }
+
+ #[test]
+ fn test_data_file_matches_dense_stats_arity_mismatch_fails_open() {
+ let mut builder = RawBinaryRowBuilder::new(3);
+ builder.write_int(0, 10);
+ builder.write_int(1, 100);
+ builder.write_int(2, 200);
+ let raw = builder.build();
+ let mut min_serialized = Vec::with_capacity(4 + raw.len());
+ min_serialized.extend_from_slice(&(3_i32).to_be_bytes());
+ min_serialized.extend_from_slice(&raw);
+
+ let mut builder = RawBinaryRowBuilder::new(3);
+ builder.write_int(0, 20);
+ builder.write_int(1, 200);
+ builder.write_int(2, 300);
+ let raw = builder.build();
+ let mut max_serialized = Vec::with_capacity(4 + raw.len());
+ max_serialized.extend_from_slice(&(3_i32).to_be_bytes());
+ max_serialized.extend_from_slice(&raw);
+
+ let fields = int_field();
+ let file = test_data_file_meta(min_serialized, max_serialized, vec![0,
0, 0], 5);
+ let predicate = PredicateBuilder::new(&fields)
+ .equal("id", Datum::Int(30))
+ .unwrap();
+
+ assert!(data_file_matches_predicates(
+ &file,
+ &[predicate],
+ TEST_SCHEMA_ID,
+ TEST_NUM_FIELDS,
+ ));
+ }
+
+ #[test]
+ fn test_data_file_matches_always_false_prunes_despite_schema_mismatch() {
+ let file = test_data_file_meta_with_schema(
+ int_stats_row(Some(10)),
+ int_stats_row(Some(20)),
+ vec![0],
+ 5,
+ 99,
+ );
+
+ assert!(!data_file_matches_predicates(
+ &file,
+ &[Predicate::AlwaysFalse],
+ TEST_SCHEMA_ID,
+ TEST_NUM_FIELDS,
+ ));
+ }
+
+ #[test]
+ fn test_data_file_matches_always_true_keeps_file_despite_schema_mismatch()
{
+ let file = test_data_file_meta_with_schema(
+ int_stats_row(Some(10)),
+ int_stats_row(Some(20)),
+ vec![0],
+ 5,
+ 99,
+ );
+
+ assert!(data_file_matches_predicates(
+ &file,
+ &[Predicate::AlwaysTrue],
+ TEST_SCHEMA_ID,
+ TEST_NUM_FIELDS,
+ ));
+ }
+
#[test]
fn test_build_deletion_files_map_preserves_cardinality() {
let entries = vec![IndexManifestEntry {