This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a45be9f0d2 [core] Fix initRow in AggregateMergeFunction and 
PartialUpdateMergeFunction to set all fields including nullable ones (#7645)
a45be9f0d2 is described below

commit a45be9f0d2775790245b8b6008f8c7d3f34265ab
Author: lxy <[email protected]>
AuthorDate: Wed Apr 15 12:03:11 2026 +0800

    [core] Fix initRow in AggregateMergeFunction and PartialUpdateMergeFunction 
to set all fields including nullable ones (#7645)
---
 .../compact/PartialUpdateMergeFunction.java        |  9 ++----
 .../compact/aggregate/AggregateMergeFunction.java  |  9 ++----
 .../mergetree/compact/MergeFunctionTestUtils.java  |  8 ++---
 .../compact/PartialUpdateMergeFunctionTest.java    | 29 ++++++++++++++++--
 .../aggregate/AggregateMergeFunctionTest.java      | 35 ++++++++++++++++++++++
 5 files changed, 71 insertions(+), 19 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 3d8c7098a8..96c3371692 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -344,13 +344,10 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
     private void initRow(GenericRow row, InternalRow value) {
         for (int i = 0; i < getters.length; i++) {
             Object field = getters[i].getFieldOrNull(value);
-            if (!nullables[i]) {
-                if (field != null) {
-                    row.setField(i, field);
-                } else {
-                    throw new IllegalArgumentException("Field " + i + " can 
not be null");
-                }
+            if (!nullables[i] && field == null) {
+                throw new IllegalArgumentException("Field " + i + " can not be 
null");
             }
+            row.setField(i, field);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
index eea3c7e499..3e8aaa1b8d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -104,13 +104,10 @@ public class AggregateMergeFunction implements 
MergeFunction<KeyValue> {
     private void initRow(GenericRow row, InternalRow value) {
         for (int i = 0; i < getters.length; i++) {
             Object field = getters[i].getFieldOrNull(value);
-            if (!nullables[i]) {
-                if (field != null) {
-                    row.setField(i, field);
-                } else {
-                    throw new IllegalArgumentException("Field " + i + " can 
not be null");
-                }
+            if (!nullables[i] && field == null) {
+                throw new IllegalArgumentException("Field " + i + " can not be 
null");
             }
+            row.setField(i, field);
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
index 4acea88b2d..8035985969 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
@@ -107,12 +107,12 @@ public class MergeFunctionTestUtils {
                         new ReusingTestData(last.key, last.sequenceNumber, 
RowKind.INSERT, sum));
             } else {
                 if (group.stream().noneMatch(data -> data.valueKind == 
RowKind.INSERT)) {
-                    // No insert: fill the pk and left nullable fields to 
null; sequenceNumber =
-                    // latest
+                    // No insert: initRow now sets all fields including 
nullable ones,
+                    // so the value is from the last DELETE record
                     ReusingTestData last = group.get(group.size() - 1);
                     expected.add(
                             new ReusingTestData(
-                                    last.key, last.sequenceNumber, 
RowKind.DELETE, null));
+                                    last.key, last.sequenceNumber, 
RowKind.DELETE, last.value));
                 } else {
                     RowKind rowKind = null;
                     Long sum = null;
@@ -122,7 +122,7 @@ public class MergeFunctionTestUtils {
                             sum = sum == null ? data.value : sum + data.value;
                         } else {
                             rowKind = RowKind.DELETE;
-                            sum = null;
+                            sum = data.value;
                         }
                     }
                     ReusingTestData last = group.get(group.size() - 1);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
index 0d4a5a2a4f..f4f2d28f75 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
@@ -125,11 +125,11 @@ public class PartialUpdateMergeFunctionTest {
         add(func, RowKind.DELETE, 1, 1, 1, 3, 1, 1, null);
         validate(func, 1, null, null, 3, 3, 3, 3);
         add(func, RowKind.DELETE, 1, 1, 1, 3, 1, 1, 4);
-        validate(func, null, null, null, null, null, null, null);
+        validate(func, 1, 1, 1, 3, 1, 1, 4);
         add(func, 1, 4, 4, 4, 5, 5, 5);
         validate(func, 1, 4, 4, 4, 5, 5, 5);
         add(func, RowKind.DELETE, 1, 1, 1, 6, 1, 1, 6);
-        validate(func, null, null, null, null, null, null, null);
+        validate(func, 1, 1, 1, 6, 1, 1, 6);
     }
 
     @Test
@@ -169,7 +169,7 @@ public class PartialUpdateMergeFunctionTest {
         add(func, 11, 22, 100, 200, 1, 12, 21);
         add(func, RowKind.DELETE, 11, 22, 100, 200, 1, 12, 21);
 
-        validate(func, null, null, null, null, null, null, null);
+        validate(func, 11, 22, 100, 200, 1, 12, 21);
     }
 
     @Test
@@ -912,6 +912,29 @@ public class PartialUpdateMergeFunctionTest {
         assertThat(func.getResult().sequenceNumber()).isEqualTo(1);
     }
 
+    @Test
+    public void testInitRowWithNullableFieldOnDelete() {
+        Options options = new Options();
+        options.set("partial-update.remove-record-on-delete", "true");
+        RowType rowType =
+                RowType.of(
+                        DataTypes.INT().notNull(),
+                        DataTypes.INT().notNull(),
+                        DataTypes.INT(),
+                        DataTypes.INT());
+        MergeFunction<KeyValue> func =
+                PartialUpdateMergeFunction.factory(options, rowType, 
ImmutableList.of("f0"))
+                        .create();
+        func.reset();
+
+        // insert some data first
+        add(func, 1, 3, 5, 7);
+        // send a DELETE with nullable field as null, triggers initRow
+        add(func, RowKind.DELETE, 1, 2, 2, null);
+        // after delete with removeRecordOnDelete, row is re-initialized via 
initRow
+        validate(func, 1, 2, 2, null);
+    }
+
     private void add(MergeFunction<KeyValue> function, Integer... f) {
         add(function, RowKind.INSERT, f);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
index c1b2e9dcd8..f1ae731511 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
@@ -141,11 +141,46 @@ class AggregateMergeFunctionTest {
                                 BinaryString.fromString("1/2/3/4/5")));
     }
 
+    @Test
+    void testInitRowWithNullableFieldOnDelete() {
+        Options options = new Options();
+        options.set(FIELDS_DEFAULT_AGG_FUNC, "sum");
+        options.set("aggregation.remove-record-on-delete", "true");
+        MergeFunction<KeyValue> aggregateFunction =
+                AggregateMergeFunction.factory(
+                                options,
+                                RowType.builder()
+                                        .fields(
+                                                new DataType[] {
+                                                    DataTypes.INT().notNull(),
+                                                    DataTypes.INT().notNull(),
+                                                    DataTypes.INT(),
+                                                    DataTypes.INT()
+                                                },
+                                                new String[] {"k", "a", "b", 
"c"})
+                                        .build(),
+                                Collections.singletonList("k"))
+                        .create();
+        aggregateFunction.reset();
+
+        // insert some data first
+        aggregateFunction.add(value(1, 0, 0, 5));
+        // send a DELETE with nullable field as null, triggers initRow
+        aggregateFunction.add(deleteValue(1, 2, 2, null));
+        // after delete with removeRecordOnDelete, row is re-initialized via 
initRow
+        
assertThat(aggregateFunction.getResult().value()).isEqualTo(GenericRow.of(1, 2, 
2, null));
+    }
+
     private KeyValue value(Integer... values) {
         return new KeyValue()
                 .replace(GenericRow.of(values[0]), RowKind.INSERT, 
GenericRow.of(values));
     }
 
+    private KeyValue deleteValue(Integer... values) {
+        return new KeyValue()
+                .replace(GenericRow.of(values[0]), RowKind.DELETE, 
GenericRow.of(values));
+    }
+
     private KeyValue value(
             Integer value1,
             BinaryString value2,

Reply via email to