This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit c1974391d25755e3771da8c574df24bd550b1c1e
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Apr 1 17:29:51 2026 +0800

    [python] Fix update_by_row_id conflict when file rolling splits (#7574)
    
    When calling `update_by_arrow_with_row_id` or `upsert_by_arrow_with_key`
    on large files, there is `RuntimeError: For Data Evolution table,
    multiple 'MERGE INTO' and 'COMPACT' operations have encountered
    conflicts`. This PR fixes the above issue by disabling rolling for one
    file when `update_by_arrow_with_row_id`.
---
 paimon-python/pypaimon/tests/table_update_test.py  | 67 ++++++++++++++++++++++
 paimon-python/pypaimon/write/file_store_write.py   |  5 ++
 .../pypaimon/write/table_update_by_row_id.py       |  3 +-
 3 files changed, 74 insertions(+), 1 deletion(-)

diff --git a/paimon-python/pypaimon/tests/table_update_test.py 
b/paimon-python/pypaimon/tests/table_update_test.py
index bf46f7bbac..204e31ff0f 100644
--- a/paimon-python/pypaimon/tests/table_update_test.py
+++ b/paimon-python/pypaimon/tests/table_update_test.py
@@ -1052,6 +1052,73 @@ class TableUpdateTest(unittest.TestCase):
         self.assertEqual(40, ages[3], "Row 3 should remain unchanged")
         self.assertEqual(45, ages[4], "Row 4 should remain unchanged")
 
+    def test_update_with_large_file(self):
+        import uuid
+        import random
+        import string
+
+        table_name = f'test_row_id_split_{uuid.uuid4().hex[:8]}'
+        schema = Schema.from_pyarrow_schema(
+            pa.schema([('id', pa.int64()), ('name', pa.string())]),
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'write-only': 'true',
+            }
+        )
+        self.catalog.create_table(
+            f'default.{table_name}', schema, False)
+        table = self.catalog.get_table(f'default.{table_name}')
+
+        N = 5000
+        data = pa.table({
+            'id': list(range(N)),
+            'name': [
+                ''.join(random.choices(
+                    string.ascii_letters, k=200))
+                for _ in range(N)],
+        })
+        wb = table.new_batch_write_builder()
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(data)
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        from pypaimon.schema.schema_change import SetOption
+        self.catalog.alter_table(
+            f'default.{table_name}',
+            [SetOption('target-file-size', '10kb')])
+        table = self.catalog.get_table(f'default.{table_name}')
+
+        wb = table.new_batch_write_builder()
+        updator = wb.new_update()
+        updator.with_update_type(['name'])
+        update_data = pa.table({
+            '_ROW_ID': pa.array(
+                list(range(N)), type=pa.int64()),
+            'name': [
+                ''.join(random.choices(
+                    string.ascii_letters, k=200))
+                for _ in range(N)],
+        })
+        msgs = updator.update_by_arrow_with_row_id(update_data)
+
+        all_files = []
+        for msg in msgs:
+            all_files.extend(msg.new_files)
+
+        self.assertEqual(
+            len(all_files), 1,
+            "Update should produce exactly one file per group")
+        self.assertEqual(all_files[0].first_row_id, 0)
+        self.assertEqual(all_files[0].row_count, N)
+
+        tc = wb.new_commit()
+        tc.commit(msgs)
+        tc.close()
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/paimon-python/pypaimon/write/file_store_write.py 
b/paimon-python/pypaimon/write/file_store_write.py
index 95113a0689..75b1d3a7d7 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -46,6 +46,11 @@ class FileStoreWrite:
                              
(f"{self.options.data_file_prefix()}-u-{commit_user}"
                               f"-s-{random.randint(0, 2 ** 31 - 2)}-w-"))
 
+    def disable_rolling(self):
+        """Disable file rolling by setting target_file_size to max."""
+        self.options.set(
+            CoreOptions.TARGET_FILE_SIZE, str(2 ** 63 - 1))
+
     def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
         key = (partition, bucket)
         if key not in self.data_writers:
diff --git a/paimon-python/pypaimon/write/table_update_by_row_id.py 
b/paimon-python/pypaimon/write/table_update_by_row_id.py
index 48f61751ec..089fde047c 100644
--- a/paimon-python/pypaimon/write/table_update_by_row_id.py
+++ b/paimon-python/pypaimon/write/table_update_by_row_id.py
@@ -293,7 +293,9 @@ class TableUpdateByRowId:
         merged_data = self._merge_update_with_original(original_data, data, 
column_names, first_row_id)
 
         # Create a file store write for this partition
+        # Disable rolling to ensure one output file per first_row_id group,
         file_store_write = FileStoreWrite(self.table, self.commit_user)
+        file_store_write.disable_rolling()
 
         # Set write columns to only update specific columns
         write_cols = column_names
@@ -313,7 +315,6 @@ class TableUpdateByRowId:
         for msg in commit_messages:
             msg.check_from_snapshot = self.snapshot_id
             for file in msg.new_files:
-                # Assign the same first_row_id as the original file
                 file.first_row_id = first_row_id
                 file.write_cols = write_cols
 

Reply via email to