This is an automated email from the ASF dual-hosted git repository. hope pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 699e453d67958cd24942298793b2911ca32b4504 Author: XiaoHongbo <[email protected]> AuthorDate: Mon Mar 30 20:40:57 2026 +0800 [Python] Fix data evolution read IndexError when file has no write_cols (#7556) When a data file has no write_cols (written with all columns before schema evolution), `_create_union_reader` wrongly assumes it contains all current table fields including columns added later, causing IndexError on read. This PR fixes it. --- paimon-python/pypaimon/read/split_read.py | 9 ++- .../pypaimon/tests/data_evolution_test.py | 81 ++++++++++++++++++++++ 2 files changed, 88 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 41904267f7..ea53c3cf96 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -613,8 +613,13 @@ class DataEvolutionSplitRead(SplitRead): elif first_file.write_cols: field_ids = self._get_field_ids_from_write_cols(first_file.write_cols) else: - # For regular files, get all field IDs from the schema - field_ids = [field.id for field in self.table.fields] + # For regular files without write_cols, derive field IDs from + # the file's schema version, not the current table schema. + # The file only contains columns from when it was written. + file_schema = self.table.schema_manager.get_schema(first_file.schema_id) + field_ids = [field.id for field in file_schema.fields] + field_ids.append(SpecialFields.ROW_ID.id) + field_ids.append(SpecialFields.SEQUENCE_NUMBER.id) read_fields = [] for j, read_field_id in enumerate(read_field_index): diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py b/paimon-python/pypaimon/tests/data_evolution_test.py index 796625f6a4..266041eec5 100644 --- a/paimon-python/pypaimon/tests/data_evolution_test.py +++ b/paimon-python/pypaimon/tests/data_evolution_test.py @@ -1356,3 +1356,84 @@ class DataEvolutionTest(unittest.TestCase): rebuilt = pa.RecordBatch.from_arrays(arrays, names=batch.schema.names) self.assertTrue(rebuilt.schema.field('_ROW_ID').nullable) self.assertTrue(rebuilt.schema.field('_SEQUENCE_NUMBER').nullable) + + def test_read_full_schema_on_write_before_evolution(self): + from pypaimon.schema.schema_change import SchemaChange + from pypaimon.schema.data_types import AtomicType + + # Step 1: Create table with [f0, f1] + initial_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + initial_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'}, + ) + table_name = 'default.test_no_write_cols_schema_evo' + self.catalog.create_table(table_name, schema, False) + table = self.catalog.get_table(table_name) + + # Step 2: Write data with ALL columns of old schema → write_cols = None + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_arrow(pa.Table.from_pydict( + {'f0': [1, 2], 'f1': ['a', 'b']}, + schema=initial_schema, + )) + cmts = table_write.prepare_commit() + for c in cmts: + for nf in c.new_files: + self.assertIsNone(nf.write_cols) + table_commit.commit(cmts) + table_write.close() + table_commit.close() + + # Step 3: Schema evolution - add column f2 + self.catalog.alter_table( + table_name, + [SchemaChange.add_column('f2', AtomicType('STRING'))], + ) + table = self.catalog.get_table(table_name) + + # Step 4: Write f2 only for same rows (first_row_id = 0) + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write().with_write_type(['f2']) + table_commit = write_builder.new_commit() + table_write.write_arrow(pa.Table.from_pydict( + {'f2': ['x', 'y']}, + schema=pa.schema([('f2', pa.string())]), + )) + cmts_f2 = table_write.prepare_commit() + for c in cmts_f2: + for nf in c.new_files: + nf.first_row_id = 0 + table_commit.commit(cmts_f2) + table_write.close() + table_commit.close() + + # Step 5: Read all columns + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + splits = table_scan.plan().splits() + + for split in splits: + for f in split.files: + if f.write_cols is None: + f.max_sequence_number = 999999 + + actual = table_read.to_arrow(splits) + + expect = pa.Table.from_pydict({ + 'f0': [1, 2], + 'f1': ['a', 'b'], + 'f2': ['x', 'y'], + }, schema=pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ('f2', pa.string()), + ])) + self.assertEqual(actual.num_rows, 2) + self.assertEqual(actual, expect)
