This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit c1974391d25755e3771da8c574df24bd550b1c1e Author: XiaoHongbo <[email protected]> AuthorDate: Wed Apr 1 17:29:51 2026 +0800 [python] Fix update_by_row_id conflict when file rolling splits (#7574) When calling `update_by_arrow_with_row_id` or `upsert_by_arrow_with_key` on large files, there is `RuntimeError: For Data Evolution table, multiple 'MERGE INTO' and 'COMPACT' operations have encountered conflicts`. This PR fixes the above issue by disabling rolling for one file when `update_by_arrow_with_row_id`. --- paimon-python/pypaimon/tests/table_update_test.py | 67 ++++++++++++++++++++++ paimon-python/pypaimon/write/file_store_write.py | 5 ++ .../pypaimon/write/table_update_by_row_id.py | 3 +- 3 files changed, 74 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/tests/table_update_test.py b/paimon-python/pypaimon/tests/table_update_test.py index bf46f7bbac..204e31ff0f 100644 --- a/paimon-python/pypaimon/tests/table_update_test.py +++ b/paimon-python/pypaimon/tests/table_update_test.py @@ -1052,6 +1052,73 @@ class TableUpdateTest(unittest.TestCase): self.assertEqual(40, ages[3], "Row 3 should remain unchanged") self.assertEqual(45, ages[4], "Row 4 should remain unchanged") + def test_update_with_large_file(self): + import uuid + import random + import string + + table_name = f'test_row_id_split_{uuid.uuid4().hex[:8]}' + schema = Schema.from_pyarrow_schema( + pa.schema([('id', pa.int64()), ('name', pa.string())]), + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'write-only': 'true', + } + ) + self.catalog.create_table( + f'default.{table_name}', schema, False) + table = self.catalog.get_table(f'default.{table_name}') + + N = 5000 + data = pa.table({ + 'id': list(range(N)), + 'name': [ + ''.join(random.choices( + string.ascii_letters, k=200)) + for _ in range(N)], + }) + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(data) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + from pypaimon.schema.schema_change import SetOption + self.catalog.alter_table( + f'default.{table_name}', + [SetOption('target-file-size', '10kb')]) + table = self.catalog.get_table(f'default.{table_name}') + + wb = table.new_batch_write_builder() + updator = wb.new_update() + updator.with_update_type(['name']) + update_data = pa.table({ + '_ROW_ID': pa.array( + list(range(N)), type=pa.int64()), + 'name': [ + ''.join(random.choices( + string.ascii_letters, k=200)) + for _ in range(N)], + }) + msgs = updator.update_by_arrow_with_row_id(update_data) + + all_files = [] + for msg in msgs: + all_files.extend(msg.new_files) + + self.assertEqual( + len(all_files), 1, + "Update should produce exactly one file per group") + self.assertEqual(all_files[0].first_row_id, 0) + self.assertEqual(all_files[0].row_count, N) + + tc = wb.new_commit() + tc.commit(msgs) + tc.close() + if __name__ == '__main__': unittest.main() diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index 95113a0689..75b1d3a7d7 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -46,6 +46,11 @@ class FileStoreWrite: (f"{self.options.data_file_prefix()}-u-{commit_user}" f"-s-{random.randint(0, 2 ** 31 - 2)}-w-")) + def disable_rolling(self): + """Disable file rolling by setting target_file_size to max.""" + self.options.set( + CoreOptions.TARGET_FILE_SIZE, str(2 ** 63 - 1)) + def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch): key = (partition, bucket) if key not in self.data_writers: diff --git a/paimon-python/pypaimon/write/table_update_by_row_id.py b/paimon-python/pypaimon/write/table_update_by_row_id.py index 48f61751ec..089fde047c 100644 --- a/paimon-python/pypaimon/write/table_update_by_row_id.py +++ b/paimon-python/pypaimon/write/table_update_by_row_id.py @@ -293,7 +293,9 @@ class TableUpdateByRowId: merged_data = self._merge_update_with_original(original_data, data, column_names, first_row_id) # Create a file store write for this partition + # Disable rolling to ensure one output file per first_row_id group, file_store_write = FileStoreWrite(self.table, self.commit_user) + file_store_write.disable_rolling() # Set write columns to only update specific columns write_cols = column_names @@ -313,7 +315,6 @@ class TableUpdateByRowId: for msg in commit_messages: msg.check_from_snapshot = self.snapshot_id for file in msg.new_files: - # Assign the same first_row_id as the original file file.first_row_id = first_row_id file.write_cols = write_cols
