This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 03394f996f [core][python] Fix commitKind mislabeled as OVERWRITE for
data-evolution merge into (#7639)
03394f996f is described below
commit 03394f996fb692afbc3ccb438ff4334124546901
Author: littlecoder04 <[email protected]>
AuthorDate: Thu Apr 16 13:20:01 2026 +0800
[core][python] Fix commitKind mislabeled as OVERWRITE for data-evolution
merge into (#7639)
---
.../paimon/operation/FileStoreCommitImpl.java | 4 ++
.../paimon/operation/commit/ConflictDetection.java | 6 ++-
.../operation/commit/ConflictDetectionTest.java | 49 ++++++++++++++++++
.../pypaimon/tests/shard_table_updator_test.py | 60 ++++++++++++++++++++++
.../pypaimon/write/commit/conflict_detection.py | 9 +---
paimon-python/pypaimon/write/file_store_commit.py | 3 ++
6 files changed, 123 insertions(+), 8 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 74a27760fc..d2e20ebaef 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -318,6 +318,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
checkAppendFiles = true;
allowRollback = true;
}
+ if (conflictDetection.hasRowIdCheckFromSnapshot()) {
+ checkAppendFiles = true;
+ allowRollback = true;
+ }
attempts +=
tryCommit(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
index 1e84f58e51..db15db027d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -119,6 +119,10 @@ public class ConflictDetection {
this.rowIdCheckFromSnapshot = rowIdCheckFromSnapshot;
}
+ public boolean hasRowIdCheckFromSnapshot() {
+ return rowIdCheckFromSnapshot != null;
+ }
+
@Nullable
public Comparator<InternalRow> keyComparator() {
return keyComparator;
@@ -140,7 +144,7 @@ public class ConflictDetection {
return true;
}
}
- return rowIdCheckFromSnapshot != null;
+ return false;
}
public Optional<RuntimeException> checkConflicts(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
index 522cb73e72..c3e0258da2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
@@ -25,6 +25,8 @@ import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.manifest.SimpleFileEntryWithDV;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
@@ -340,4 +342,51 @@ class ConflictDetectionTest {
}
assert (deleteCount > 0);
}
+
+ @Test
+ void testShouldBeOverwriteCommit() {
+ ConflictDetection detection = createConflictDetection();
+
+ List<SimpleFileEntry> addOnlyEntries = new ArrayList<>();
+ addOnlyEntries.add(createFileEntry("f1", ADD));
+ addOnlyEntries.add(createFileEntry("f2", ADD));
+ assertThat(detection.shouldBeOverwriteCommit(addOnlyEntries,
Collections.emptyList()))
+ .isFalse();
+
+ assertThat(
+ detection.shouldBeOverwriteCommit(
+ Collections.emptyList(),
Collections.emptyList()))
+ .isFalse();
+
+ List<SimpleFileEntry> deleteEntries = new ArrayList<>();
+ deleteEntries.add(createFileEntry("f1", DELETE));
+ deleteEntries.add(createFileEntry("f2", ADD));
+ assertThat(detection.shouldBeOverwriteCommit(deleteEntries,
Collections.emptyList()))
+ .isTrue();
+
+ List<IndexManifestEntry> dvIndexFiles = new ArrayList<>();
+ dvIndexFiles.add(createDvIndexEntry("dv1", ADD, Arrays.asList("f1")));
+ assertThat(detection.shouldBeOverwriteCommit(Collections.emptyList(),
dvIndexFiles))
+ .isTrue();
+
+ detection.setRowIdCheckFromSnapshot(1L);
+ assertThat(detection.shouldBeOverwriteCommit(addOnlyEntries,
Collections.emptyList()))
+ .isFalse();
+ }
+
+ private ConflictDetection createConflictDetection() {
+ return new ConflictDetection(
+ "test-table",
+ "test-user",
+ RowType.of(),
+ null,
+ null,
+ BucketMode.HASH_FIXED,
+ false,
+ true,
+ false,
+ null,
+ null,
+ null);
+ }
}
diff --git a/paimon-python/pypaimon/tests/shard_table_updator_test.py
b/paimon-python/pypaimon/tests/shard_table_updator_test.py
index 1ff658c609..92436c584d 100644
--- a/paimon-python/pypaimon/tests/shard_table_updator_test.py
+++ b/paimon-python/pypaimon/tests/shard_table_updator_test.py
@@ -19,6 +19,7 @@ import os
import shutil
import tempfile
import unittest
+from unittest.mock import patch
import pyarrow as pa
@@ -591,6 +592,65 @@ class ShardTableUpdatorTest(unittest.TestCase):
% actual_columns
)
+ def test_shard_update_passes_allow_rollback_true(self):
+ table_schema = pa.schema([
+ ('a', pa.int32()),
+ ('b', pa.int32()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ table_schema,
+ options={'row-tracking.enabled': 'true', 'data-evolution.enabled':
'true'}
+ )
+ name = self._create_unique_table_name('rollback')
+ self.catalog.create_table(name, schema, False)
+ table = self.catalog.get_table(name)
+
+ write_builder = table.new_batch_write_builder()
+ tw = write_builder.new_write().with_write_type(['a', 'b'])
+ tc = write_builder.new_commit()
+ tw.write_arrow(pa.Table.from_pydict(
+ {'a': [1, 2], 'b': [10, 20]},
+ schema=table_schema,
+ ))
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ upd = write_builder.new_update()
+ upd.with_read_projection(['a'])
+ upd.with_update_type(['b'])
+ shard = upd.new_shard_updator(0, 1)
+ reader = shard.arrow_reader()
+ for batch in iter(reader.read_next_batch, None):
+ shard.update_by_arrow_batch(pa.RecordBatch.from_pydict(
+ {'b': [99] * batch.num_rows},
+ schema=pa.schema([('b', pa.int32())]),
+ ))
+ commit_messages = shard.prepare_commit()
+
+ from pypaimon.write.file_store_commit import FileStoreCommit
+ original_try_commit = FileStoreCommit._try_commit
+ captured_args = {}
+
+ def spy_try_commit(self_inner, **kwargs):
+ captured_args.update(kwargs)
+ return original_try_commit(self_inner, **kwargs)
+
+ with patch.object(FileStoreCommit, '_try_commit', spy_try_commit):
+ tc2 = write_builder.new_commit()
+ tc2.commit(commit_messages)
+ tc2.close()
+
+ self.assertTrue(
+ captured_args.get('allow_rollback', False),
+ "Row-id-check commits must pass allow_rollback=True so that "
+ "concurrent COMPACT snapshots can be rolled back on conflict."
+ )
+ self.assertTrue(
+ captured_args.get('detect_conflicts', False),
+ "Row-id-check commits must enable conflict detection."
+ )
+
if __name__ == '__main__':
unittest.main()
diff --git a/paimon-python/pypaimon/write/commit/conflict_detection.py
b/paimon-python/pypaimon/write/commit/conflict_detection.py
index 8e62c946e0..cfbdfd4e13 100644
--- a/paimon-python/pypaimon/write/commit/conflict_detection.py
+++ b/paimon-python/pypaimon/write/commit/conflict_detection.py
@@ -19,7 +19,6 @@
Conflict detection for commit operations.
"""
-
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.file_entry import FileEntry
@@ -53,13 +52,9 @@ class ConflictDetection:
self.commit_scanner = commit_scanner
def should_be_overwrite_commit(self):
- """Check if the commit should be treated as an overwrite commit.
-
- returns True if rowIdCheckFromSnapshot is set.
+ return False
- Returns:
- True if the commit should be treated as OVERWRITE.
- """
+ def has_row_id_check_from_snapshot(self):
return self._row_id_check_from_snapshot is not None
def check_conflicts(self, latest_snapshot, base_entries, delta_entries,
commit_kind):
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 8937842680..22d0fea7ac 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -147,6 +147,9 @@ class FileStoreCommit:
commit_kind = "OVERWRITE"
detect_conflicts = True
allow_rollback = True
+ if self.conflict_detection.has_row_id_check_from_snapshot():
+ detect_conflicts = True
+ allow_rollback = True
self._try_commit(commit_kind=commit_kind,
commit_identifier=commit_identifier,