This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit bfee09f7d22206920a2e96beac8f02aa80709671
Author: XiaoHongbo <[email protected]>
AuthorDate: Fri Mar 27 14:40:25 2026 +0800

      [python] Fix data evolution merge read with disordered projection (#7544)
    
    In the data-evolution merge-read path, when the projection order differs
    from the table schema order, there is `ArrowInvalid: Failed to parse
    string as a scalar of type int32`. `upsert_by_key` is one path that can
    trigger this bug.This PR fixes the above bug.
---
 paimon-python/pypaimon/read/split_read.py           | 8 +++++++-
 paimon-python/pypaimon/tests/data_evolution_test.py | 8 ++++++++
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 498e53e9c0..f9adee4068 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -597,6 +597,7 @@ class DataEvolutionSplitRead(SplitRead):
         # Initialize offsets
         row_offsets = [-1] * len(all_read_fields)
         field_offsets = [-1] * len(all_read_fields)
+        schema_pos = {f.id: p for p, f in enumerate(self.table.fields)}
 
         for i, bunch in enumerate(fields_files):
             first_file = bunch.files()[0]
@@ -617,13 +618,18 @@ class DataEvolutionSplitRead(SplitRead):
                     if read_field_id == field_id:
                         if row_offsets[j] == -1:
                             row_offsets[j] = i
-                            field_offsets[j] = len(read_fields)
                             read_fields.append(all_read_fields[j])
                         break
 
             if not read_fields:
                 file_record_readers[i] = None
             else:
+                read_fields.sort(key=lambda f: schema_pos.get(f.id, 
float('inf')))
+                id_to_pos = {f.id: p for p, f in enumerate(read_fields)}
+                for j in range(len(read_field_index)):
+                    if row_offsets[j] == i:
+                        field_offsets[j] = id_to_pos[read_field_index[j]]
+
                 read_field_names = self._remove_partition_fields(read_fields)
                 table_fields = self.read_fields
                 self.read_fields = read_fields  # create reader based on 
read_fields
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py 
b/paimon-python/pypaimon/tests/data_evolution_test.py
index afb12cd948..796625f6a4 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -558,6 +558,14 @@ class DataEvolutionTest(unittest.TestCase):
         }, schema=simple_pa_schema)
         self.assertEqual(actual, expect)
 
+        read_builder2 = table.new_read_builder()
+        read_builder2.with_projection(['f2', 'f0', 'f1'])
+        actual2 = read_builder2.new_read().to_arrow(
+            read_builder2.new_scan().plan().splits())
+        self.assertEqual(actual2.column('f0').to_pylist(), [2] * num_rows)
+        self.assertEqual(actual2.column('f1').to_pylist(), ['x'] * num_rows)
+        self.assertEqual(actual2.column('f2').to_pylist(), ['y'] * num_rows)
+
     def test_only_some_columns(self):
         simple_pa_schema = pa.schema([
             ('f0', pa.int32()),

Reply via email to