This is an automated email from the ASF dual-hosted git repository.
lsomeyeah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7275b0a60a [python] Introduce file pruning for dv pk table (#7557)
7275b0a60a is described below
commit 7275b0a60ae1ebd4397ed1106fc85981a2ec18d3
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 30 21:42:26 2026 +0800
[python] Introduce file pruning for dv pk table (#7557)
### Purpose
In the primary key table scan, when DV is enabled, the logic of
filtering files by value stats has been added.
### Tests
in java_py_read_write_test.py,
test_pk_dv_read_multi_batch_with_value_filter.
---
.../pypaimon/read/scanner/file_scanner.py | 30 ++++++++++++----
.../pypaimon/tests/e2e/java_py_read_write_test.py | 42 ++++++++++++++++++++++
2 files changed, 66 insertions(+), 6 deletions(-)
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index a5e8576802..aea709669e 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -381,12 +381,30 @@ class FileScanner:
if self.table.is_primary_key_table:
if self.deletion_vectors_enabled and entry.file.level == 0: # do
not read level 0 file
return False
- if not self.primary_key_predicate:
- return True
- return self.primary_key_predicate.test_by_simple_stats(
- entry.file.key_stats,
- entry.file.row_count
- )
+ if self.primary_key_predicate:
+ if not self.primary_key_predicate.test_by_simple_stats(
+ entry.file.key_stats,
+ entry.file.row_count
+ ):
+ return False
+ # In DV mode, files within a bucket don't overlap (level 0
excluded above),
+ # so we can safely filter by value stats per file.
+ if self.deletion_vectors_enabled and self.predicate_for_stats:
+ if entry.file.value_stats_cols is None and
entry.file.write_cols is not None:
+ stats_fields = entry.file.write_cols
+ else:
+ stats_fields = entry.file.value_stats_cols
+ evolved_stats = evolution.evolution(
+ entry.file.value_stats,
+ entry.file.row_count,
+ stats_fields
+ )
+ if not self.predicate_for_stats.test_by_simple_stats(
+ evolved_stats,
+ entry.file.row_count
+ ):
+ return False
+ return True
else:
if not self.predicate:
return True
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 0732925bae..4d888c3cb4 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -322,6 +322,48 @@ class JavaPyReadWriteTest(unittest.TestCase):
}, schema=pa_schema)
self.assertEqual(expected, actual)
+ def test_pk_dv_read_multi_batch_with_value_filter(self):
+ """Test that DV-enabled PK table filters files by value stats during
scan."""
+ pa_schema = pa.schema([
+ pa.field('pt', pa.int32(), nullable=False),
+ pa.field('a', pa.int32(), nullable=False),
+ ('b', pa.int64())
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema,
+ partition_keys=['pt'],
+ primary_keys=['pt', 'a'],
+ options={'bucket': '1'})
+ self.catalog.create_table('default.test_pk_dv_multi_batch', schema,
True)
+ table = self.catalog.get_table('default.test_pk_dv_multi_batch')
+
+ # Unfiltered scan: count total files
+ rb_all = table.new_read_builder()
+ all_splits = rb_all.new_scan().plan().splits()
+ all_files_count = sum(len(s.files) for s in all_splits)
+
+ # Filtered scan: b < 500 should only match a few rows (b in
{100,200,300,400})
+ # and prune files whose value stats don't overlap
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ pred = predicate_builder.less_than('b', 500)
+ rb_filtered = table.new_read_builder().with_filter(pred)
+ filtered_splits = rb_filtered.new_scan().plan().splits()
+ filtered_files_count = sum(len(s.files) for s in filtered_splits)
+ filtered_result = table_sort_by(
+ rb_filtered.new_read().to_arrow(filtered_splits), 'a')
+
+ # Verify correctness: rows with b < 500 are a=10,b=100 / a=20,b=200 /
a=30,b=300 / a=40,b=400
+ expected = pa.Table.from_pydict({
+ 'pt': [1, 1, 1, 1],
+ 'a': [10, 20, 30, 40],
+ 'b': [100, 200, 300, 400]
+ }, schema=pa_schema)
+ self.assertEqual(expected, filtered_result)
+
+ # Verify file pruning: filtered scan should read fewer files
+ self.assertLess(
+ filtered_files_count, all_files_count,
+ f"DV value filter should prune files:
filtered={filtered_files_count}, all={all_files_count}")
+
def test_pk_dv_read_multi_batch_raw_convertable(self):
pa_schema = pa.schema([
pa.field('pt', pa.int32(), nullable=False),