This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit f975975992099f9ff82a6cfface2d1e024b766d1 Author: littlecoder04 <[email protected]> AuthorDate: Fri Apr 10 22:11:03 2026 +0800 [python] Fix duplicate _ROW_ID when file exceeds read batch size (#7626) When a data file contains more than 1024 rows, upsert_by_arrow_with_key fails with: `ValueError: Input data contains duplicate _ROW_ID values`. This PR fixes above issue by advancing first_row_id in DataFileBatchReader._assign_row_tracking after each batch. --- .../pypaimon/read/reader/data_file_batch_reader.py | 1 + .../pypaimon/tests/data_evolution_test.py | 40 ++++++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py index 7f2e1c61e1..126d57f625 100644 --- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py @@ -211,6 +211,7 @@ class DataFileBatchReader(RecordBatchReader): idx = self.system_fields[SpecialFields.ROW_ID.name] # Create a new array that fills with computed row IDs arrays[idx] = pa.array(range(self.first_row_id, self.first_row_id + record_batch.num_rows), type=pa.int64()) + self.first_row_id += record_batch.num_rows # Handle _SEQUENCE_NUMBER field if SpecialFields.SEQUENCE_NUMBER.name in self.system_fields.keys(): diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py b/paimon-python/pypaimon/tests/data_evolution_test.py index 266041eec5..5124c337cb 100644 --- a/paimon-python/pypaimon/tests/data_evolution_test.py +++ b/paimon-python/pypaimon/tests/data_evolution_test.py @@ -1437,3 +1437,43 @@ class DataEvolutionTest(unittest.TestCase): ])) self.assertEqual(actual.num_rows, 2) self.assertEqual(actual, expect) + + def test_large_file_read(self): + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + }) + self.catalog.create_table('default.test_large_file_row_id', schema, False) + table = self.catalog.get_table('default.test_large_file_row_id') + + # Write >1024 rows in a single file + num_rows = 2000 + data = pa.Table.from_pydict({ + 'id': list(range(num_rows)), + 'name': [f'name_{i}' for i in range(num_rows)], + }, schema=pa_schema) + + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(data) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + update_ids = list(range(0, 1500)) + upsert_data = pa.Table.from_pydict({ + 'id': update_ids, + 'name': [f'upsert_{i}' for i in update_ids], + }, schema=pa_schema) + + wb = table.new_batch_write_builder() + tu = wb.new_update() + msgs = tu.upsert_by_arrow_with_key(upsert_data, ['id']) + tc = wb.new_commit() + tc.commit(msgs) + tc.close()
