This is an automated email from the ASF dual-hosted git repository.
hope pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/release-1.4 by this push:
new 1230bf4379 [python] Deduplicate input to keep last one in
TableUpsertByKey (#7526)
1230bf4379 is described below
commit 1230bf4379101d07c7d860c7eb55a5f3e2dc5123
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 25 16:23:37 2026 +0800
[python] Deduplicate input to keep last one in TableUpsertByKey (#7526)
Change the handling of duplicate keys in TableUpsertByKey from throwing
exceptions to automatic deduplication (keeping the last one).
---
docs/content/pypaimon/data-evolution.md | 2 +-
.../pypaimon/tests/table_upsert_by_key_test.py | 92 +++++++++++++++++++---
.../pypaimon/write/table_upsert_by_key.py | 21 +++--
3 files changed, 98 insertions(+), 17 deletions(-)
diff --git a/docs/content/pypaimon/data-evolution.md
b/docs/content/pypaimon/data-evolution.md
index b5daafae57..068ad9b99a 100644
--- a/docs/content/pypaimon/data-evolution.md
+++ b/docs/content/pypaimon/data-evolution.md
@@ -231,7 +231,7 @@ table_commit.close()
**Notes**
- Execution is driven **partition-by-partition**: only one partition's key set
is loaded into memory at a time.
-- Duplicate keys in the input data will raise an error.
+- Duplicate keys in the input data are automatically deduplicated — the **last
occurrence** is kept.
- The upsert is atomic per commit — all matched updates and new appends are
included in the same commit.
## Update Columns By Shards
diff --git a/paimon-python/pypaimon/tests/table_upsert_by_key_test.py
b/paimon-python/pypaimon/tests/table_upsert_by_key_test.py
index 37511afd54..0fd93a3daa 100644
--- a/paimon-python/pypaimon/tests/table_upsert_by_key_test.py
+++ b/paimon-python/pypaimon/tests/table_upsert_by_key_test.py
@@ -493,21 +493,93 @@ class TableUpsertByKeyTest(unittest.TestCase):
tu.upsert_by_arrow_with_key(data, upsert_keys=['id'])
self.assertIn('empty', str(ctx.exception))
- def test_duplicate_keys_in_input_raises(self):
- """Duplicate composite keys in input data should raise ValueError."""
+ def test_duplicate_keys_in_input_keeps_last(self):
+ """Duplicate keys in input data should keep the last occurrence."""
table = self._create_table()
- data = pa.Table.from_pydict({
- 'id': [1, 1],
- 'name': ['A', 'B'],
+ self._write(table, pa.Table.from_pydict({
+ 'id': [1, 2],
+ 'name': ['Alice', 'Bob'],
'age': [25, 30],
'city': ['NYC', 'LA'],
+ }, schema=self.pa_schema))
+
+ # id=1 appears twice; the second row (name='A_last') should win
+ data = pa.Table.from_pydict({
+ 'id': [1, 1],
+ 'name': ['A_first', 'A_last'],
+ 'age': [90, 91],
+ 'city': ['X', 'Y'],
}, schema=self.pa_schema)
+ self._upsert(table, data, upsert_keys=['id'])
- with self.assertRaises(ValueError) as ctx:
- wb = table.new_batch_write_builder()
- tu = wb.new_update()
- tu.upsert_by_arrow_with_key(data, upsert_keys=['id'])
- self.assertIn('duplicate', str(ctx.exception).lower())
+ result = self._read_all(table)
+ rows = {r: (n, a, c) for r, n, a, c in zip(
+ result['id'].to_pylist(),
+ result['name'].to_pylist(),
+ result['age'].to_pylist(),
+ result['city'].to_pylist(),
+ )}
+ # id=1 updated with last duplicate row
+ self.assertEqual(rows[1], ('A_last', 91, 'Y'))
+ # id=2 unchanged
+ self.assertEqual(rows[2], ('Bob', 30, 'LA'))
+
+ def test_duplicate_keys_all_new_keeps_last(self):
+ """Duplicate keys in input on empty table keeps the last occurrence."""
+ table = self._create_table()
+
+ # id=1 appears three times; last row should win
+ data = pa.Table.from_pydict({
+ 'id': [1, 1, 1, 2],
+ 'name': ['A1', 'A2', 'A3', 'B'],
+ 'age': [10, 20, 30, 40],
+ 'city': ['X1', 'X2', 'X3', 'Y'],
+ }, schema=self.pa_schema)
+ self._upsert(table, data, upsert_keys=['id'])
+
+ result = self._read_all(table)
+ self.assertEqual(result.num_rows, 2)
+ rows = {r: (n, a, c) for r, n, a, c in zip(
+ result['id'].to_pylist(),
+ result['name'].to_pylist(),
+ result['age'].to_pylist(),
+ result['city'].to_pylist(),
+ )}
+ self.assertEqual(rows[1], ('A3', 30, 'X3'))
+ self.assertEqual(rows[2], ('B', 40, 'Y'))
+
+ def test_duplicate_keys_partitioned_keeps_last(self):
+ """Duplicate keys in a partitioned table keep the last per
partition."""
+ table = self._create_table(
+ pa_schema=self.partitioned_pa_schema,
+ partition_keys=['region'],
+ )
+ self._write(table, pa.Table.from_pydict({
+ 'id': [1, 2],
+ 'name': ['Alice', 'Bob'],
+ 'age': [25, 30],
+ 'region': ['US', 'EU'],
+ }, schema=self.partitioned_pa_schema))
+
+ # id=1 duplicated in US partition; id=2 duplicated in EU partition
+ data = pa.Table.from_pydict({
+ 'id': [1, 1, 2, 2],
+ 'name': ['A_first', 'A_last', 'B_first', 'B_last'],
+ 'age': [50, 51, 60, 61],
+ 'region': ['US', 'US', 'EU', 'EU'],
+ }, schema=self.partitioned_pa_schema)
+ self._upsert(table, data, upsert_keys=['id'])
+
+ result = self._read_all(table)
+ self.assertEqual(result.num_rows, 2)
+ rows = {(r, reg): (n, a) for r, n, a, reg in zip(
+ result['id'].to_pylist(),
+ result['name'].to_pylist(),
+ result['age'].to_pylist(),
+ result['region'].to_pylist(),
+ )}
+ self.assertEqual(rows[(1, 'US')], ('A_last', 51))
+ self.assertEqual(rows[(2, 'EU')], ('B_last', 61))
def test_partitioned_table_missing_partition_col_in_data_raises(self):
"""Input data missing partition column should raise ValueError."""
diff --git a/paimon-python/pypaimon/write/table_upsert_by_key.py
b/paimon-python/pypaimon/write/table_upsert_by_key.py
index 3c895e4904..d8bad25fab 100644
--- a/paimon-python/pypaimon/write/table_upsert_by_key.py
+++ b/paimon-python/pypaimon/write/table_upsert_by_key.py
@@ -164,16 +164,25 @@ class TableUpsertByKey:
for i in range(partition_data.num_rows)
]
- # 2. Per-partition duplicate check
- input_key_set = set(input_key_tuples)
- if len(input_key_tuples) != len(input_key_set):
- raise ValueError(
- f"Input data contains duplicate values in upsert_keys columns "
- f"{match_keys} within partition {partition_spec}."
+ # 2. Deduplicate: keep last occurrence of each key
+ key_to_last_idx: Dict[_KeyTuple, int] = {}
+ for i, key_tuple in enumerate(input_key_tuples):
+ key_to_last_idx[key_tuple] = i # last write wins
+
+ if len(input_key_tuples) != len(key_to_last_idx):
+ original_count = len(input_key_tuples)
+ dedup_indices = sorted(key_to_last_idx.values())
+ partition_data = partition_data.take(dedup_indices)
+ input_key_tuples = [input_key_tuples[i] for i in dedup_indices]
+ logger.warning(
+ "Deduplicated input from %d to %d rows in partition %s "
+ "(kept last occurrence).",
+ original_count, len(input_key_tuples), partition_spec,
)
# 3. Scan partition in batches, build key → _ROW_ID only for
# keys present in the input (avoids full-partition materialisation).
+ input_key_set = set(key_to_last_idx.keys())
key_to_row_id = self._build_key_to_row_id_map(
match_keys, partition_spec, input_key_set
)