This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 7cdbd23aff10b4902a9edf18d8ffc386921c0557
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 25 16:23:37 2026 +0800

    [python] Deduplicate input to keep last one in TableUpsertByKey (#7526)
    
    Change the handling of duplicate keys in TableUpsertByKey from throwing
    exceptions to automatic deduplication (keeping the last one).
---
 docs/content/pypaimon/data-evolution.md            |  2 +-
 .../pypaimon/tests/table_upsert_by_key_test.py     | 92 +++++++++++++++++++---
 .../pypaimon/write/table_upsert_by_key.py          | 21 +++--
 3 files changed, 98 insertions(+), 17 deletions(-)

diff --git a/docs/content/pypaimon/data-evolution.md 
b/docs/content/pypaimon/data-evolution.md
index b5daafae57..068ad9b99a 100644
--- a/docs/content/pypaimon/data-evolution.md
+++ b/docs/content/pypaimon/data-evolution.md
@@ -231,7 +231,7 @@ table_commit.close()
 **Notes**
 
 - Execution is driven **partition-by-partition**: only one partition's key set 
is loaded into memory at a time.
-- Duplicate keys in the input data will raise an error.
+- Duplicate keys in the input data are automatically deduplicated — the **last 
occurrence** is kept.
 - The upsert is atomic per commit — all matched updates and new appends are 
included in the same commit.
 
 ## Update Columns By Shards
diff --git a/paimon-python/pypaimon/tests/table_upsert_by_key_test.py 
b/paimon-python/pypaimon/tests/table_upsert_by_key_test.py
index 37511afd54..0fd93a3daa 100644
--- a/paimon-python/pypaimon/tests/table_upsert_by_key_test.py
+++ b/paimon-python/pypaimon/tests/table_upsert_by_key_test.py
@@ -493,21 +493,93 @@ class TableUpsertByKeyTest(unittest.TestCase):
             tu.upsert_by_arrow_with_key(data, upsert_keys=['id'])
         self.assertIn('empty', str(ctx.exception))
 
-    def test_duplicate_keys_in_input_raises(self):
-        """Duplicate composite keys in input data should raise ValueError."""
+    def test_duplicate_keys_in_input_keeps_last(self):
+        """Duplicate keys in input data should keep the last occurrence."""
         table = self._create_table()
-        data = pa.Table.from_pydict({
-            'id': [1, 1],
-            'name': ['A', 'B'],
+        self._write(table, pa.Table.from_pydict({
+            'id': [1, 2],
+            'name': ['Alice', 'Bob'],
             'age': [25, 30],
             'city': ['NYC', 'LA'],
+        }, schema=self.pa_schema))
+
+        # id=1 appears twice; the second row (name='A_last') should win
+        data = pa.Table.from_pydict({
+            'id': [1, 1],
+            'name': ['A_first', 'A_last'],
+            'age': [90, 91],
+            'city': ['X', 'Y'],
         }, schema=self.pa_schema)
+        self._upsert(table, data, upsert_keys=['id'])
 
-        with self.assertRaises(ValueError) as ctx:
-            wb = table.new_batch_write_builder()
-            tu = wb.new_update()
-            tu.upsert_by_arrow_with_key(data, upsert_keys=['id'])
-        self.assertIn('duplicate', str(ctx.exception).lower())
+        result = self._read_all(table)
+        rows = {r: (n, a, c) for r, n, a, c in zip(
+            result['id'].to_pylist(),
+            result['name'].to_pylist(),
+            result['age'].to_pylist(),
+            result['city'].to_pylist(),
+        )}
+        # id=1 updated with last duplicate row
+        self.assertEqual(rows[1], ('A_last', 91, 'Y'))
+        # id=2 unchanged
+        self.assertEqual(rows[2], ('Bob', 30, 'LA'))
+
+    def test_duplicate_keys_all_new_keeps_last(self):
+        """Duplicate keys in input on empty table keeps the last occurrence."""
+        table = self._create_table()
+
+        # id=1 appears three times; last row should win
+        data = pa.Table.from_pydict({
+            'id': [1, 1, 1, 2],
+            'name': ['A1', 'A2', 'A3', 'B'],
+            'age': [10, 20, 30, 40],
+            'city': ['X1', 'X2', 'X3', 'Y'],
+        }, schema=self.pa_schema)
+        self._upsert(table, data, upsert_keys=['id'])
+
+        result = self._read_all(table)
+        self.assertEqual(result.num_rows, 2)
+        rows = {r: (n, a, c) for r, n, a, c in zip(
+            result['id'].to_pylist(),
+            result['name'].to_pylist(),
+            result['age'].to_pylist(),
+            result['city'].to_pylist(),
+        )}
+        self.assertEqual(rows[1], ('A3', 30, 'X3'))
+        self.assertEqual(rows[2], ('B', 40, 'Y'))
+
+    def test_duplicate_keys_partitioned_keeps_last(self):
+        """Duplicate keys in a partitioned table keep the last per 
partition."""
+        table = self._create_table(
+            pa_schema=self.partitioned_pa_schema,
+            partition_keys=['region'],
+        )
+        self._write(table, pa.Table.from_pydict({
+            'id': [1, 2],
+            'name': ['Alice', 'Bob'],
+            'age': [25, 30],
+            'region': ['US', 'EU'],
+        }, schema=self.partitioned_pa_schema))
+
+        # id=1 duplicated in US partition; id=2 duplicated in EU partition
+        data = pa.Table.from_pydict({
+            'id': [1, 1, 2, 2],
+            'name': ['A_first', 'A_last', 'B_first', 'B_last'],
+            'age': [50, 51, 60, 61],
+            'region': ['US', 'US', 'EU', 'EU'],
+        }, schema=self.partitioned_pa_schema)
+        self._upsert(table, data, upsert_keys=['id'])
+
+        result = self._read_all(table)
+        self.assertEqual(result.num_rows, 2)
+        rows = {(r, reg): (n, a) for r, n, a, reg in zip(
+            result['id'].to_pylist(),
+            result['name'].to_pylist(),
+            result['age'].to_pylist(),
+            result['region'].to_pylist(),
+        )}
+        self.assertEqual(rows[(1, 'US')], ('A_last', 51))
+        self.assertEqual(rows[(2, 'EU')], ('B_last', 61))
 
     def test_partitioned_table_missing_partition_col_in_data_raises(self):
         """Input data missing partition column should raise ValueError."""
diff --git a/paimon-python/pypaimon/write/table_upsert_by_key.py 
b/paimon-python/pypaimon/write/table_upsert_by_key.py
index 3c895e4904..d8bad25fab 100644
--- a/paimon-python/pypaimon/write/table_upsert_by_key.py
+++ b/paimon-python/pypaimon/write/table_upsert_by_key.py
@@ -164,16 +164,25 @@ class TableUpsertByKey:
             for i in range(partition_data.num_rows)
         ]
 
-        # 2. Per-partition duplicate check
-        input_key_set = set(input_key_tuples)
-        if len(input_key_tuples) != len(input_key_set):
-            raise ValueError(
-                f"Input data contains duplicate values in upsert_keys columns "
-                f"{match_keys} within partition {partition_spec}."
+        # 2. Deduplicate: keep last occurrence of each key
+        key_to_last_idx: Dict[_KeyTuple, int] = {}
+        for i, key_tuple in enumerate(input_key_tuples):
+            key_to_last_idx[key_tuple] = i  # last write wins
+
+        if len(input_key_tuples) != len(key_to_last_idx):
+            original_count = len(input_key_tuples)
+            dedup_indices = sorted(key_to_last_idx.values())
+            partition_data = partition_data.take(dedup_indices)
+            input_key_tuples = [input_key_tuples[i] for i in dedup_indices]
+            logger.warning(
+                "Deduplicated input from %d to %d rows in partition %s "
+                "(kept last occurrence).",
+                original_count, len(input_key_tuples), partition_spec,
             )
 
         # 3. Scan partition in batches, build key → _ROW_ID only for
         #    keys present in the input (avoids full-partition materialisation).
+        input_key_set = set(key_to_last_idx.keys())
         key_to_row_id = self._build_key_to_row_id_map(
             match_keys, partition_spec, input_key_set
         )

Reply via email to