This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 22baf60059 [python] Fix duplicate _ROW_ID when file exceeds read batch 
size (#7626)
22baf60059 is described below

commit 22baf60059a09ff4f09afe4c5ac2b962c68fe975
Author: littlecoder04 <[email protected]>
AuthorDate: Fri Apr 10 22:11:03 2026 +0800

    [python] Fix duplicate _ROW_ID when file exceeds read batch size (#7626)
    
    When a data file contains more than 1024 rows, upsert_by_arrow_with_key
    fails with: `ValueError: Input data contains duplicate _ROW_ID values`.
    This PR fixes above issue by advancing first_row_id in
      DataFileBatchReader._assign_row_tracking after each batch.
---
 .../pypaimon/read/reader/data_file_batch_reader.py |  1 +
 .../pypaimon/tests/data_evolution_test.py          | 40 ++++++++++++++++++++++
 2 files changed, 41 insertions(+)

diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py 
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
index 7f2e1c61e1..126d57f625 100644
--- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
@@ -211,6 +211,7 @@ class DataFileBatchReader(RecordBatchReader):
             idx = self.system_fields[SpecialFields.ROW_ID.name]
             # Create a new array that fills with computed row IDs
             arrays[idx] = pa.array(range(self.first_row_id, self.first_row_id 
+ record_batch.num_rows), type=pa.int64())
+            self.first_row_id += record_batch.num_rows
 
         # Handle _SEQUENCE_NUMBER field
         if SpecialFields.SEQUENCE_NUMBER.name in self.system_fields.keys():
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py 
b/paimon-python/pypaimon/tests/data_evolution_test.py
index 6da142fb75..b9ca4c7acc 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -1593,3 +1593,43 @@ class DataEvolutionTest(unittest.TestCase):
         sliced = rb.new_read().to_arrow(scan.plan().splits())
         self.assertEqual(sliced.num_rows, 3)
         self.assertEqual(sorted(sliced.column('id').to_pylist()), [2, 3, 4])
+
+    def test_large_file_read(self):
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        self.catalog.create_table('default.test_large_file_row_id', schema, 
False)
+        table = self.catalog.get_table('default.test_large_file_row_id')
+
+        # Write >1024 rows in a single file
+        num_rows = 2000
+        data = pa.Table.from_pydict({
+            'id': list(range(num_rows)),
+            'name': [f'name_{i}' for i in range(num_rows)],
+        }, schema=pa_schema)
+
+        wb = table.new_batch_write_builder()
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(data)
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        update_ids = list(range(0, 1500))
+        upsert_data = pa.Table.from_pydict({
+            'id': update_ids,
+            'name': [f'upsert_{i}' for i in update_ids],
+        }, schema=pa_schema)
+
+        wb = table.new_batch_write_builder()
+        tu = wb.new_update()
+        msgs = tu.upsert_by_arrow_with_key(upsert_data, ['id'])
+        tc = wb.new_commit()
+        tc.commit(msgs)
+        tc.close()

Reply via email to