This is an automated email from the ASF dual-hosted git repository. hope pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 2f62a4f9981827a4d89dcd86a6518829aa6ed31b Author: Jingsong Lee <[email protected]> AuthorDate: Mon Mar 30 21:42:26 2026 +0800 [python] Introduce file pruning for dv pk table (#7557) ### Purpose In the primary key table scan, when DV is enabled, the logic of filtering files by value stats has been added. ### Tests in java_py_read_write_test.py, test_pk_dv_read_multi_batch_with_value_filter. --- .../pypaimon/read/scanner/file_scanner.py | 30 ++++++++++++---- .../pypaimon/tests/e2e/java_py_read_write_test.py | 42 ++++++++++++++++++++++ 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py b/paimon-python/pypaimon/read/scanner/file_scanner.py index a5e8576802..aea709669e 100755 --- a/paimon-python/pypaimon/read/scanner/file_scanner.py +++ b/paimon-python/pypaimon/read/scanner/file_scanner.py @@ -381,12 +381,30 @@ class FileScanner: if self.table.is_primary_key_table: if self.deletion_vectors_enabled and entry.file.level == 0: # do not read level 0 file return False - if not self.primary_key_predicate: - return True - return self.primary_key_predicate.test_by_simple_stats( - entry.file.key_stats, - entry.file.row_count - ) + if self.primary_key_predicate: + if not self.primary_key_predicate.test_by_simple_stats( + entry.file.key_stats, + entry.file.row_count + ): + return False + # In DV mode, files within a bucket don't overlap (level 0 excluded above), + # so we can safely filter by value stats per file. + if self.deletion_vectors_enabled and self.predicate_for_stats: + if entry.file.value_stats_cols is None and entry.file.write_cols is not None: + stats_fields = entry.file.write_cols + else: + stats_fields = entry.file.value_stats_cols + evolved_stats = evolution.evolution( + entry.file.value_stats, + entry.file.row_count, + stats_fields + ) + if not self.predicate_for_stats.test_by_simple_stats( + evolved_stats, + entry.file.row_count + ): + return False + return True else: if not self.predicate: return True diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py index 0732925bae..4d888c3cb4 100644 --- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py +++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py @@ -322,6 +322,48 @@ class JavaPyReadWriteTest(unittest.TestCase): }, schema=pa_schema) self.assertEqual(expected, actual) + def test_pk_dv_read_multi_batch_with_value_filter(self): + """Test that DV-enabled PK table filters files by value stats during scan.""" + pa_schema = pa.schema([ + pa.field('pt', pa.int32(), nullable=False), + pa.field('a', pa.int32(), nullable=False), + ('b', pa.int64()) + ]) + schema = Schema.from_pyarrow_schema(pa_schema, + partition_keys=['pt'], + primary_keys=['pt', 'a'], + options={'bucket': '1'}) + self.catalog.create_table('default.test_pk_dv_multi_batch', schema, True) + table = self.catalog.get_table('default.test_pk_dv_multi_batch') + + # Unfiltered scan: count total files + rb_all = table.new_read_builder() + all_splits = rb_all.new_scan().plan().splits() + all_files_count = sum(len(s.files) for s in all_splits) + + # Filtered scan: b < 500 should only match a few rows (b in {100,200,300,400}) + # and prune files whose value stats don't overlap + predicate_builder = table.new_read_builder().new_predicate_builder() + pred = predicate_builder.less_than('b', 500) + rb_filtered = table.new_read_builder().with_filter(pred) + filtered_splits = rb_filtered.new_scan().plan().splits() + filtered_files_count = sum(len(s.files) for s in filtered_splits) + filtered_result = table_sort_by( + rb_filtered.new_read().to_arrow(filtered_splits), 'a') + + # Verify correctness: rows with b < 500 are a=10,b=100 / a=20,b=200 / a=30,b=300 / a=40,b=400 + expected = pa.Table.from_pydict({ + 'pt': [1, 1, 1, 1], + 'a': [10, 20, 30, 40], + 'b': [100, 200, 300, 400] + }, schema=pa_schema) + self.assertEqual(expected, filtered_result) + + # Verify file pruning: filtered scan should read fewer files + self.assertLess( + filtered_files_count, all_files_count, + f"DV value filter should prune files: filtered={filtered_files_count}, all={all_files_count}") + def test_pk_dv_read_multi_batch_raw_convertable(self): pa_schema = pa.schema([ pa.field('pt', pa.int32(), nullable=False),
