This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 03394f996f [core][python] Fix commitKind mislabeled as OVERWRITE for 
data-evolution merge into (#7639)
03394f996f is described below

commit 03394f996fb692afbc3ccb438ff4334124546901
Author: littlecoder04 <[email protected]>
AuthorDate: Thu Apr 16 13:20:01 2026 +0800

    [core][python] Fix commitKind mislabeled as OVERWRITE for data-evolution 
merge into (#7639)
---
 .../paimon/operation/FileStoreCommitImpl.java      |  4 ++
 .../paimon/operation/commit/ConflictDetection.java |  6 ++-
 .../operation/commit/ConflictDetectionTest.java    | 49 ++++++++++++++++++
 .../pypaimon/tests/shard_table_updator_test.py     | 60 ++++++++++++++++++++++
 .../pypaimon/write/commit/conflict_detection.py    |  9 +---
 paimon-python/pypaimon/write/file_store_commit.py  |  3 ++
 6 files changed, 123 insertions(+), 8 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 74a27760fc..d2e20ebaef 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -318,6 +318,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     checkAppendFiles = true;
                     allowRollback = true;
                 }
+                if (conflictDetection.hasRowIdCheckFromSnapshot()) {
+                    checkAppendFiles = true;
+                    allowRollback = true;
+                }
 
                 attempts +=
                         tryCommit(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
index 1e84f58e51..db15db027d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -119,6 +119,10 @@ public class ConflictDetection {
         this.rowIdCheckFromSnapshot = rowIdCheckFromSnapshot;
     }
 
+    public boolean hasRowIdCheckFromSnapshot() {
+        return rowIdCheckFromSnapshot != null;
+    }
+
     @Nullable
     public Comparator<InternalRow> keyComparator() {
         return keyComparator;
@@ -140,7 +144,7 @@ public class ConflictDetection {
                 return true;
             }
         }
-        return rowIdCheckFromSnapshot != null;
+        return false;
     }
 
     public Optional<RuntimeException> checkConflicts(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
index 522cb73e72..c3e0258da2 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
@@ -25,6 +25,8 @@ import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.manifest.SimpleFileEntryWithDV;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.types.RowType;
 
 import org.junit.jupiter.api.Test;
 
@@ -340,4 +342,51 @@ class ConflictDetectionTest {
         }
         assert (deleteCount > 0);
     }
+
+    @Test
+    void testShouldBeOverwriteCommit() {
+        ConflictDetection detection = createConflictDetection();
+
+        List<SimpleFileEntry> addOnlyEntries = new ArrayList<>();
+        addOnlyEntries.add(createFileEntry("f1", ADD));
+        addOnlyEntries.add(createFileEntry("f2", ADD));
+        assertThat(detection.shouldBeOverwriteCommit(addOnlyEntries, 
Collections.emptyList()))
+                .isFalse();
+
+        assertThat(
+                        detection.shouldBeOverwriteCommit(
+                                Collections.emptyList(), 
Collections.emptyList()))
+                .isFalse();
+
+        List<SimpleFileEntry> deleteEntries = new ArrayList<>();
+        deleteEntries.add(createFileEntry("f1", DELETE));
+        deleteEntries.add(createFileEntry("f2", ADD));
+        assertThat(detection.shouldBeOverwriteCommit(deleteEntries, 
Collections.emptyList()))
+                .isTrue();
+
+        List<IndexManifestEntry> dvIndexFiles = new ArrayList<>();
+        dvIndexFiles.add(createDvIndexEntry("dv1", ADD, Arrays.asList("f1")));
+        assertThat(detection.shouldBeOverwriteCommit(Collections.emptyList(), 
dvIndexFiles))
+                .isTrue();
+
+        detection.setRowIdCheckFromSnapshot(1L);
+        assertThat(detection.shouldBeOverwriteCommit(addOnlyEntries, 
Collections.emptyList()))
+                .isFalse();
+    }
+
+    private ConflictDetection createConflictDetection() {
+        return new ConflictDetection(
+                "test-table",
+                "test-user",
+                RowType.of(),
+                null,
+                null,
+                BucketMode.HASH_FIXED,
+                false,
+                true,
+                false,
+                null,
+                null,
+                null);
+    }
 }
diff --git a/paimon-python/pypaimon/tests/shard_table_updator_test.py 
b/paimon-python/pypaimon/tests/shard_table_updator_test.py
index 1ff658c609..92436c584d 100644
--- a/paimon-python/pypaimon/tests/shard_table_updator_test.py
+++ b/paimon-python/pypaimon/tests/shard_table_updator_test.py
@@ -19,6 +19,7 @@ import os
 import shutil
 import tempfile
 import unittest
+from unittest.mock import patch
 
 import pyarrow as pa
 
@@ -591,6 +592,65 @@ class ShardTableUpdatorTest(unittest.TestCase):
             % actual_columns
         )
 
+    def test_shard_update_passes_allow_rollback_true(self):
+        table_schema = pa.schema([
+            ('a', pa.int32()),
+            ('b', pa.int32()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            table_schema,
+            options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 
'true'}
+        )
+        name = self._create_unique_table_name('rollback')
+        self.catalog.create_table(name, schema, False)
+        table = self.catalog.get_table(name)
+
+        write_builder = table.new_batch_write_builder()
+        tw = write_builder.new_write().with_write_type(['a', 'b'])
+        tc = write_builder.new_commit()
+        tw.write_arrow(pa.Table.from_pydict(
+            {'a': [1, 2], 'b': [10, 20]},
+            schema=table_schema,
+        ))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        upd = write_builder.new_update()
+        upd.with_read_projection(['a'])
+        upd.with_update_type(['b'])
+        shard = upd.new_shard_updator(0, 1)
+        reader = shard.arrow_reader()
+        for batch in iter(reader.read_next_batch, None):
+            shard.update_by_arrow_batch(pa.RecordBatch.from_pydict(
+                {'b': [99] * batch.num_rows},
+                schema=pa.schema([('b', pa.int32())]),
+            ))
+        commit_messages = shard.prepare_commit()
+
+        from pypaimon.write.file_store_commit import FileStoreCommit
+        original_try_commit = FileStoreCommit._try_commit
+        captured_args = {}
+
+        def spy_try_commit(self_inner, **kwargs):
+            captured_args.update(kwargs)
+            return original_try_commit(self_inner, **kwargs)
+
+        with patch.object(FileStoreCommit, '_try_commit', spy_try_commit):
+            tc2 = write_builder.new_commit()
+            tc2.commit(commit_messages)
+            tc2.close()
+
+        self.assertTrue(
+            captured_args.get('allow_rollback', False),
+            "Row-id-check commits must pass allow_rollback=True so that "
+            "concurrent COMPACT snapshots can be rolled back on conflict."
+        )
+        self.assertTrue(
+            captured_args.get('detect_conflicts', False),
+            "Row-id-check commits must enable conflict detection."
+        )
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/paimon-python/pypaimon/write/commit/conflict_detection.py 
b/paimon-python/pypaimon/write/commit/conflict_detection.py
index 8e62c946e0..cfbdfd4e13 100644
--- a/paimon-python/pypaimon/write/commit/conflict_detection.py
+++ b/paimon-python/pypaimon/write/commit/conflict_detection.py
@@ -19,7 +19,6 @@
 Conflict detection for commit operations.
 """
 
-
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.file_entry import FileEntry
@@ -53,13 +52,9 @@ class ConflictDetection:
         self.commit_scanner = commit_scanner
 
     def should_be_overwrite_commit(self):
-        """Check if the commit should be treated as an overwrite commit.
-
-        returns True if rowIdCheckFromSnapshot is set.
+        return False
 
-        Returns:
-            True if the commit should be treated as OVERWRITE.
-        """
+    def has_row_id_check_from_snapshot(self):
         return self._row_id_check_from_snapshot is not None
 
     def check_conflicts(self, latest_snapshot, base_entries, delta_entries, 
commit_kind):
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 8937842680..22d0fea7ac 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -147,6 +147,9 @@ class FileStoreCommit:
             commit_kind = "OVERWRITE"
             detect_conflicts = True
             allow_rollback = True
+        if self.conflict_detection.has_row_id_check_from_snapshot():
+            detect_conflicts = True
+            allow_rollback = True
 
         self._try_commit(commit_kind=commit_kind,
                          commit_identifier=commit_identifier,

Reply via email to