This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a45be9f0d2 [core] Fix initRow in AggregateMergeFunction and
PartialUpdateMergeFunction to set all fields including nullable ones (#7645)
a45be9f0d2 is described below
commit a45be9f0d2775790245b8b6008f8c7d3f34265ab
Author: lxy <[email protected]>
AuthorDate: Wed Apr 15 12:03:11 2026 +0800
[core] Fix initRow in AggregateMergeFunction and PartialUpdateMergeFunction
to set all fields including nullable ones (#7645)
---
.../compact/PartialUpdateMergeFunction.java | 9 ++----
.../compact/aggregate/AggregateMergeFunction.java | 9 ++----
.../mergetree/compact/MergeFunctionTestUtils.java | 8 ++---
.../compact/PartialUpdateMergeFunctionTest.java | 29 ++++++++++++++++--
.../aggregate/AggregateMergeFunctionTest.java | 35 ++++++++++++++++++++++
5 files changed, 71 insertions(+), 19 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 3d8c7098a8..96c3371692 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -344,13 +344,10 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
private void initRow(GenericRow row, InternalRow value) {
for (int i = 0; i < getters.length; i++) {
Object field = getters[i].getFieldOrNull(value);
- if (!nullables[i]) {
- if (field != null) {
- row.setField(i, field);
- } else {
- throw new IllegalArgumentException("Field " + i + " can
not be null");
- }
+ if (!nullables[i] && field == null) {
+ throw new IllegalArgumentException("Field " + i + " can not be
null");
}
+ row.setField(i, field);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
index eea3c7e499..3e8aaa1b8d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -104,13 +104,10 @@ public class AggregateMergeFunction implements
MergeFunction<KeyValue> {
private void initRow(GenericRow row, InternalRow value) {
for (int i = 0; i < getters.length; i++) {
Object field = getters[i].getFieldOrNull(value);
- if (!nullables[i]) {
- if (field != null) {
- row.setField(i, field);
- } else {
- throw new IllegalArgumentException("Field " + i + " can
not be null");
- }
+ if (!nullables[i] && field == null) {
+ throw new IllegalArgumentException("Field " + i + " can not be
null");
}
+ row.setField(i, field);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
index 4acea88b2d..8035985969 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
@@ -107,12 +107,12 @@ public class MergeFunctionTestUtils {
new ReusingTestData(last.key, last.sequenceNumber,
RowKind.INSERT, sum));
} else {
if (group.stream().noneMatch(data -> data.valueKind ==
RowKind.INSERT)) {
- // No insert: fill the pk and left nullable fields to
null; sequenceNumber =
- // latest
+ // No insert: initRow now sets all fields including
nullable ones,
+ // so the value is from the last DELETE record
ReusingTestData last = group.get(group.size() - 1);
expected.add(
new ReusingTestData(
- last.key, last.sequenceNumber,
RowKind.DELETE, null));
+ last.key, last.sequenceNumber,
RowKind.DELETE, last.value));
} else {
RowKind rowKind = null;
Long sum = null;
@@ -122,7 +122,7 @@ public class MergeFunctionTestUtils {
sum = sum == null ? data.value : sum + data.value;
} else {
rowKind = RowKind.DELETE;
- sum = null;
+ sum = data.value;
}
}
ReusingTestData last = group.get(group.size() - 1);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
index 0d4a5a2a4f..f4f2d28f75 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
@@ -125,11 +125,11 @@ public class PartialUpdateMergeFunctionTest {
add(func, RowKind.DELETE, 1, 1, 1, 3, 1, 1, null);
validate(func, 1, null, null, 3, 3, 3, 3);
add(func, RowKind.DELETE, 1, 1, 1, 3, 1, 1, 4);
- validate(func, null, null, null, null, null, null, null);
+ validate(func, 1, 1, 1, 3, 1, 1, 4);
add(func, 1, 4, 4, 4, 5, 5, 5);
validate(func, 1, 4, 4, 4, 5, 5, 5);
add(func, RowKind.DELETE, 1, 1, 1, 6, 1, 1, 6);
- validate(func, null, null, null, null, null, null, null);
+ validate(func, 1, 1, 1, 6, 1, 1, 6);
}
@Test
@@ -169,7 +169,7 @@ public class PartialUpdateMergeFunctionTest {
add(func, 11, 22, 100, 200, 1, 12, 21);
add(func, RowKind.DELETE, 11, 22, 100, 200, 1, 12, 21);
- validate(func, null, null, null, null, null, null, null);
+ validate(func, 11, 22, 100, 200, 1, 12, 21);
}
@Test
@@ -912,6 +912,29 @@ public class PartialUpdateMergeFunctionTest {
assertThat(func.getResult().sequenceNumber()).isEqualTo(1);
}
+ @Test
+ public void testInitRowWithNullableFieldOnDelete() {
+ Options options = new Options();
+ options.set("partial-update.remove-record-on-delete", "true");
+ RowType rowType =
+ RowType.of(
+ DataTypes.INT().notNull(),
+ DataTypes.INT().notNull(),
+ DataTypes.INT(),
+ DataTypes.INT());
+ MergeFunction<KeyValue> func =
+ PartialUpdateMergeFunction.factory(options, rowType,
ImmutableList.of("f0"))
+ .create();
+ func.reset();
+
+ // insert some data first
+ add(func, 1, 3, 5, 7);
+ // send a DELETE with nullable field as null, triggers initRow
+ add(func, RowKind.DELETE, 1, 2, 2, null);
+ // after delete with removeRecordOnDelete, row is re-initialized via
initRow
+ validate(func, 1, 2, 2, null);
+ }
+
private void add(MergeFunction<KeyValue> function, Integer... f) {
add(function, RowKind.INSERT, f);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
index c1b2e9dcd8..f1ae731511 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
@@ -141,11 +141,46 @@ class AggregateMergeFunctionTest {
BinaryString.fromString("1/2/3/4/5")));
}
+ @Test
+ void testInitRowWithNullableFieldOnDelete() {
+ Options options = new Options();
+ options.set(FIELDS_DEFAULT_AGG_FUNC, "sum");
+ options.set("aggregation.remove-record-on-delete", "true");
+ MergeFunction<KeyValue> aggregateFunction =
+ AggregateMergeFunction.factory(
+ options,
+ RowType.builder()
+ .fields(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.INT().notNull(),
+ DataTypes.INT(),
+ DataTypes.INT()
+ },
+ new String[] {"k", "a", "b",
"c"})
+ .build(),
+ Collections.singletonList("k"))
+ .create();
+ aggregateFunction.reset();
+
+ // insert some data first
+ aggregateFunction.add(value(1, 0, 0, 5));
+ // send a DELETE with nullable field as null, triggers initRow
+ aggregateFunction.add(deleteValue(1, 2, 2, null));
+ // after delete with removeRecordOnDelete, row is re-initialized via
initRow
+
assertThat(aggregateFunction.getResult().value()).isEqualTo(GenericRow.of(1, 2,
2, null));
+ }
+
private KeyValue value(Integer... values) {
return new KeyValue()
.replace(GenericRow.of(values[0]), RowKind.INSERT,
GenericRow.of(values));
}
+ private KeyValue deleteValue(Integer... values) {
+ return new KeyValue()
+ .replace(GenericRow.of(values[0]), RowKind.DELETE,
GenericRow.of(values));
+ }
+
private KeyValue value(
Integer value1,
BinaryString value2,