This is an automated email from the ASF dual-hosted git repository. hope pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 7cdbd23aff10b4902a9edf18d8ffc386921c0557 Author: Jingsong Lee <[email protected]> AuthorDate: Wed Mar 25 16:23:37 2026 +0800 [python] Deduplicate input to keep last one in TableUpsertByKey (#7526) Change the handling of duplicate keys in TableUpsertByKey from throwing exceptions to automatic deduplication (keeping the last one). --- docs/content/pypaimon/data-evolution.md | 2 +- .../pypaimon/tests/table_upsert_by_key_test.py | 92 +++++++++++++++++++--- .../pypaimon/write/table_upsert_by_key.py | 21 +++-- 3 files changed, 98 insertions(+), 17 deletions(-) diff --git a/docs/content/pypaimon/data-evolution.md b/docs/content/pypaimon/data-evolution.md index b5daafae57..068ad9b99a 100644 --- a/docs/content/pypaimon/data-evolution.md +++ b/docs/content/pypaimon/data-evolution.md @@ -231,7 +231,7 @@ table_commit.close() **Notes** - Execution is driven **partition-by-partition**: only one partition's key set is loaded into memory at a time. -- Duplicate keys in the input data will raise an error. +- Duplicate keys in the input data are automatically deduplicated — the **last occurrence** is kept. - The upsert is atomic per commit — all matched updates and new appends are included in the same commit. ## Update Columns By Shards diff --git a/paimon-python/pypaimon/tests/table_upsert_by_key_test.py b/paimon-python/pypaimon/tests/table_upsert_by_key_test.py index 37511afd54..0fd93a3daa 100644 --- a/paimon-python/pypaimon/tests/table_upsert_by_key_test.py +++ b/paimon-python/pypaimon/tests/table_upsert_by_key_test.py @@ -493,21 +493,93 @@ class TableUpsertByKeyTest(unittest.TestCase): tu.upsert_by_arrow_with_key(data, upsert_keys=['id']) self.assertIn('empty', str(ctx.exception)) - def test_duplicate_keys_in_input_raises(self): - """Duplicate composite keys in input data should raise ValueError.""" + def test_duplicate_keys_in_input_keeps_last(self): + """Duplicate keys in input data should keep the last occurrence.""" table = self._create_table() - data = pa.Table.from_pydict({ - 'id': [1, 1], - 'name': ['A', 'B'], + self._write(table, pa.Table.from_pydict({ + 'id': [1, 2], + 'name': ['Alice', 'Bob'], 'age': [25, 30], 'city': ['NYC', 'LA'], + }, schema=self.pa_schema)) + + # id=1 appears twice; the second row (name='A_last') should win + data = pa.Table.from_pydict({ + 'id': [1, 1], + 'name': ['A_first', 'A_last'], + 'age': [90, 91], + 'city': ['X', 'Y'], }, schema=self.pa_schema) + self._upsert(table, data, upsert_keys=['id']) - with self.assertRaises(ValueError) as ctx: - wb = table.new_batch_write_builder() - tu = wb.new_update() - tu.upsert_by_arrow_with_key(data, upsert_keys=['id']) - self.assertIn('duplicate', str(ctx.exception).lower()) + result = self._read_all(table) + rows = {r: (n, a, c) for r, n, a, c in zip( + result['id'].to_pylist(), + result['name'].to_pylist(), + result['age'].to_pylist(), + result['city'].to_pylist(), + )} + # id=1 updated with last duplicate row + self.assertEqual(rows[1], ('A_last', 91, 'Y')) + # id=2 unchanged + self.assertEqual(rows[2], ('Bob', 30, 'LA')) + + def test_duplicate_keys_all_new_keeps_last(self): + """Duplicate keys in input on empty table keeps the last occurrence.""" + table = self._create_table() + + # id=1 appears three times; last row should win + data = pa.Table.from_pydict({ + 'id': [1, 1, 1, 2], + 'name': ['A1', 'A2', 'A3', 'B'], + 'age': [10, 20, 30, 40], + 'city': ['X1', 'X2', 'X3', 'Y'], + }, schema=self.pa_schema) + self._upsert(table, data, upsert_keys=['id']) + + result = self._read_all(table) + self.assertEqual(result.num_rows, 2) + rows = {r: (n, a, c) for r, n, a, c in zip( + result['id'].to_pylist(), + result['name'].to_pylist(), + result['age'].to_pylist(), + result['city'].to_pylist(), + )} + self.assertEqual(rows[1], ('A3', 30, 'X3')) + self.assertEqual(rows[2], ('B', 40, 'Y')) + + def test_duplicate_keys_partitioned_keeps_last(self): + """Duplicate keys in a partitioned table keep the last per partition.""" + table = self._create_table( + pa_schema=self.partitioned_pa_schema, + partition_keys=['region'], + ) + self._write(table, pa.Table.from_pydict({ + 'id': [1, 2], + 'name': ['Alice', 'Bob'], + 'age': [25, 30], + 'region': ['US', 'EU'], + }, schema=self.partitioned_pa_schema)) + + # id=1 duplicated in US partition; id=2 duplicated in EU partition + data = pa.Table.from_pydict({ + 'id': [1, 1, 2, 2], + 'name': ['A_first', 'A_last', 'B_first', 'B_last'], + 'age': [50, 51, 60, 61], + 'region': ['US', 'US', 'EU', 'EU'], + }, schema=self.partitioned_pa_schema) + self._upsert(table, data, upsert_keys=['id']) + + result = self._read_all(table) + self.assertEqual(result.num_rows, 2) + rows = {(r, reg): (n, a) for r, n, a, reg in zip( + result['id'].to_pylist(), + result['name'].to_pylist(), + result['age'].to_pylist(), + result['region'].to_pylist(), + )} + self.assertEqual(rows[(1, 'US')], ('A_last', 51)) + self.assertEqual(rows[(2, 'EU')], ('B_last', 61)) def test_partitioned_table_missing_partition_col_in_data_raises(self): """Input data missing partition column should raise ValueError.""" diff --git a/paimon-python/pypaimon/write/table_upsert_by_key.py b/paimon-python/pypaimon/write/table_upsert_by_key.py index 3c895e4904..d8bad25fab 100644 --- a/paimon-python/pypaimon/write/table_upsert_by_key.py +++ b/paimon-python/pypaimon/write/table_upsert_by_key.py @@ -164,16 +164,25 @@ class TableUpsertByKey: for i in range(partition_data.num_rows) ] - # 2. Per-partition duplicate check - input_key_set = set(input_key_tuples) - if len(input_key_tuples) != len(input_key_set): - raise ValueError( - f"Input data contains duplicate values in upsert_keys columns " - f"{match_keys} within partition {partition_spec}." + # 2. Deduplicate: keep last occurrence of each key + key_to_last_idx: Dict[_KeyTuple, int] = {} + for i, key_tuple in enumerate(input_key_tuples): + key_to_last_idx[key_tuple] = i # last write wins + + if len(input_key_tuples) != len(key_to_last_idx): + original_count = len(input_key_tuples) + dedup_indices = sorted(key_to_last_idx.values()) + partition_data = partition_data.take(dedup_indices) + input_key_tuples = [input_key_tuples[i] for i in dedup_indices] + logger.warning( + "Deduplicated input from %d to %d rows in partition %s " + "(kept last occurrence).", + original_count, len(input_key_tuples), partition_spec, ) # 3. Scan partition in batches, build key → _ROW_ID only for # keys present in the input (avoids full-partition materialisation). + input_key_set = set(key_to_last_idx.keys()) key_to_row_id = self._build_key_to_row_id_map( match_keys, partition_spec, input_key_set )
