This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 227d631200431813743b47aa6549ad3fb92b163e Author: bryndenZh <[email protected]> AuthorDate: Thu Apr 2 22:58:38 2026 +0800 [core] Fix partial-update sequence-group delete mismatch under projected reads (#7586) --- .../compact/PartialUpdateMergeFunction.java | 9 ++++- .../compact/PartialUpdateMergeFunctionTest.java | 40 ++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index 2f429191c4..3d8c7098a8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -496,6 +496,7 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> { RowType targetType = readType != null ? readType : rowType; Map<Integer, FieldsComparator> projectedSeqComparators = new HashMap<>(); Map<Integer, FieldAggregator> projectedAggregators = new HashMap<>(); + Set<Integer> projectedSequenceGroupPartialDelete = sequenceGroupPartialDelete; if (readType != null) { // Build index mapping from table schema to read schema @@ -547,6 +548,12 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> { projectedAggregators.put(newIndex, fieldAggregators.get(oldIndex).get()); } } + + projectedSequenceGroupPartialDelete = + sequenceGroupPartialDelete.stream() + .filter(indexMap::containsKey) + .map(indexMap::get) + .collect(Collectors.toSet()); } else { // Use original mappings this.fieldSeqComparators.forEach( @@ -563,7 +570,7 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> { projectedAggregators, !fieldSeqComparators.isEmpty(), removeRecordOnDelete, - sequenceGroupPartialDelete, + projectedSequenceGroupPartialDelete, ArrayUtils.toPrimitiveBoolean( fieldTypes.stream().map(DataType::isNullable).toArray(Boolean[]::new))); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java index 846734b555..0d4a5a2a4f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java @@ -132,6 +132,46 @@ public class PartialUpdateMergeFunctionTest { validate(func, null, null, null, null, null, null, null); } + @Test + public void testSequenceGroupPartialDeleteWithProjection() { + Options options = new Options(); + options.set("fields.f3.sequence-group", "f1,f2"); + options.set("fields.f6.sequence-group", "f4,f5"); + options.set("partial-update.remove-record-on-sequence-group", "f3,f6"); + RowType rowType = + RowType.of( + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT()); + MergeFunctionFactory<KeyValue> factory = + PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0")); + + // Reordered fields + RowType projectedType = + RowType.of( + new DataType[] { + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT() + }, + new String[] {"f3", "f6", "f0", "f1", "f2", "f4", "f5"}); + MergeFunction<KeyValue> func = factory.create(projectedType); + + func.reset(); + add(func, 11, 22, 100, 200, 1, 12, 21); + add(func, RowKind.DELETE, 11, 22, 100, 200, 1, 12, 21); + + validate(func, null, null, null, null, null, null, null); + } + @Test public void testMultiSequenceFields() { Options options = new Options();
