This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit cb59e904aec313fe02b465ddd6285689abcf5866
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Mar 30 20:40:57 2026 +0800

    [Python] Fix data evolution read IndexError when file has no write_cols 
(#7556)
    
    When a data file has no write_cols (written with all columns before
    schema evolution), `_create_union_reader` wrongly assumes it contains
    all current table fields including columns added later, causing
    IndexError on read. This PR fixes it.
---
 paimon-python/pypaimon/read/split_read.py          |  9 ++-
 .../pypaimon/tests/data_evolution_test.py          | 81 ++++++++++++++++++++++
 2 files changed, 88 insertions(+), 2 deletions(-)

diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index f9adee4068..25324bdcbb 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -609,8 +609,13 @@ class DataEvolutionSplitRead(SplitRead):
             elif first_file.write_cols:
                 field_ids = 
self._get_field_ids_from_write_cols(first_file.write_cols)
             else:
-                # For regular files, get all field IDs from the schema
-                field_ids = [field.id for field in self.table.fields]
+                # For regular files without write_cols, derive field IDs from
+                # the file's schema version, not the current table schema.
+                # The file only contains columns from when it was written.
+                file_schema = 
self.table.schema_manager.get_schema(first_file.schema_id)
+                field_ids = [field.id for field in file_schema.fields]
+                field_ids.append(SpecialFields.ROW_ID.id)
+                field_ids.append(SpecialFields.SEQUENCE_NUMBER.id)
 
             read_fields = []
             for j, read_field_id in enumerate(read_field_index):
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py 
b/paimon-python/pypaimon/tests/data_evolution_test.py
index 796625f6a4..266041eec5 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -1356,3 +1356,84 @@ class DataEvolutionTest(unittest.TestCase):
         rebuilt = pa.RecordBatch.from_arrays(arrays, names=batch.schema.names)
         self.assertTrue(rebuilt.schema.field('_ROW_ID').nullable)
         self.assertTrue(rebuilt.schema.field('_SEQUENCE_NUMBER').nullable)
+
+    def test_read_full_schema_on_write_before_evolution(self):
+        from pypaimon.schema.schema_change import SchemaChange
+        from pypaimon.schema.data_types import AtomicType
+
+        # Step 1: Create table with [f0, f1]
+        initial_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            initial_schema,
+            options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 
'true'},
+        )
+        table_name = 'default.test_no_write_cols_schema_evo'
+        self.catalog.create_table(table_name, schema, False)
+        table = self.catalog.get_table(table_name)
+
+        # Step 2: Write data with ALL columns of old schema → write_cols = None
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(pa.Table.from_pydict(
+            {'f0': [1, 2], 'f1': ['a', 'b']},
+            schema=initial_schema,
+        ))
+        cmts = table_write.prepare_commit()
+        for c in cmts:
+            for nf in c.new_files:
+                self.assertIsNone(nf.write_cols)
+        table_commit.commit(cmts)
+        table_write.close()
+        table_commit.close()
+
+        # Step 3: Schema evolution - add column f2
+        self.catalog.alter_table(
+            table_name,
+            [SchemaChange.add_column('f2', AtomicType('STRING'))],
+        )
+        table = self.catalog.get_table(table_name)
+
+        # Step 4: Write f2 only for same rows (first_row_id = 0)
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write().with_write_type(['f2'])
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(pa.Table.from_pydict(
+            {'f2': ['x', 'y']},
+            schema=pa.schema([('f2', pa.string())]),
+        ))
+        cmts_f2 = table_write.prepare_commit()
+        for c in cmts_f2:
+            for nf in c.new_files:
+                nf.first_row_id = 0
+        table_commit.commit(cmts_f2)
+        table_write.close()
+        table_commit.close()
+
+        # Step 5: Read all columns
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        splits = table_scan.plan().splits()
+
+        for split in splits:
+            for f in split.files:
+                if f.write_cols is None:
+                    f.max_sequence_number = 999999
+
+        actual = table_read.to_arrow(splits)
+
+        expect = pa.Table.from_pydict({
+            'f0': [1, 2],
+            'f1': ['a', 'b'],
+            'f2': ['x', 'y'],
+        }, schema=pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+            ('f2', pa.string()),
+        ]))
+        self.assertEqual(actual.num_rows, 2)
+        self.assertEqual(actual, expect)

Reply via email to