This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 22baf60059 [python] Fix duplicate _ROW_ID when file exceeds read batch
size (#7626)
22baf60059 is described below
commit 22baf60059a09ff4f09afe4c5ac2b962c68fe975
Author: littlecoder04 <[email protected]>
AuthorDate: Fri Apr 10 22:11:03 2026 +0800
[python] Fix duplicate _ROW_ID when file exceeds read batch size (#7626)
When a data file contains more than 1024 rows, upsert_by_arrow_with_key
fails with: `ValueError: Input data contains duplicate _ROW_ID values`.
This PR fixes above issue by advancing first_row_id in
DataFileBatchReader._assign_row_tracking after each batch.
---
.../pypaimon/read/reader/data_file_batch_reader.py | 1 +
.../pypaimon/tests/data_evolution_test.py | 40 ++++++++++++++++++++++
2 files changed, 41 insertions(+)
diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
index 7f2e1c61e1..126d57f625 100644
--- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
@@ -211,6 +211,7 @@ class DataFileBatchReader(RecordBatchReader):
idx = self.system_fields[SpecialFields.ROW_ID.name]
# Create a new array that fills with computed row IDs
arrays[idx] = pa.array(range(self.first_row_id, self.first_row_id
+ record_batch.num_rows), type=pa.int64())
+ self.first_row_id += record_batch.num_rows
# Handle _SEQUENCE_NUMBER field
if SpecialFields.SEQUENCE_NUMBER.name in self.system_fields.keys():
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py
b/paimon-python/pypaimon/tests/data_evolution_test.py
index 6da142fb75..b9ca4c7acc 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -1593,3 +1593,43 @@ class DataEvolutionTest(unittest.TestCase):
sliced = rb.new_read().to_arrow(scan.plan().splits())
self.assertEqual(sliced.num_rows, 3)
self.assertEqual(sorted(sliced.column('id').to_pylist()), [2, 3, 4])
+
+ def test_large_file_read(self):
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('default.test_large_file_row_id', schema,
False)
+ table = self.catalog.get_table('default.test_large_file_row_id')
+
+ # Write >1024 rows in a single file
+ num_rows = 2000
+ data = pa.Table.from_pydict({
+ 'id': list(range(num_rows)),
+ 'name': [f'name_{i}' for i in range(num_rows)],
+ }, schema=pa_schema)
+
+ wb = table.new_batch_write_builder()
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(data)
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ update_ids = list(range(0, 1500))
+ upsert_data = pa.Table.from_pydict({
+ 'id': update_ids,
+ 'name': [f'upsert_{i}' for i in update_ids],
+ }, schema=pa_schema)
+
+ wb = table.new_batch_write_builder()
+ tu = wb.new_update()
+ msgs = tu.upsert_by_arrow_with_key(upsert_data, ['id'])
+ tc = wb.new_commit()
+ tc.commit(msgs)
+ tc.close()