alexeykudinkin commented on a change in pull request #4724:
URL: https://github.com/apache/hudi/pull/4724#discussion_r813359945
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
##########
@@ -58,6 +58,31 @@ default T preCombine(T oldValue, Properties properties) {
return preCombine(oldValue);
}
+ /**
+ *When more than one HoodieRecord have the same HoodieKey in the incoming
batch, this function combines them before attempting to insert/upsert by taking
in a property map.
+ *
+ * @param oldValue instance of the old {@link HoodieRecordPayload} to be
combined with.
+ * @param properties Payload related properties. For example pass the
ordering field(s) name to extract from value in storage.
+ * @param schema Schema used for record
+ * @return the combined value
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+ default T preCombine(T oldValue, Properties properties, Schema schema) {
Review comment:
I think currently established semantic for `preCombine` -- you select
either A or B, but you don't produce new record based on those 2, since it's
mostly used to de-dupe records in the incoming batch. I can hardly imagine the
case to fuse 2 incoming records into something third. Can you help me
understand what use-case you have in mind here?
##########
File path:
hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
##########
@@ -268,6 +268,12 @@ private FlinkOptions() {
.withDescription("Payload class used. Override this, if you like to roll
your own merge logic, when upserting/inserting.\n"
+ "This will render any value set for the option in-effective");
+ public static final ConfigOption<Boolean> PARTIAL_OVERWRITE_ENABLED =
ConfigOptions
Review comment:
What's the idea for this additional configuration (beside the record
payload class)?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/PartialOverwriteWithLatestAvroPayload.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+
+import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;
+
+/**
+ * The only difference with {@link OverwriteNonDefaultsWithLatestAvroPayload}
is that it supports
+ * merging the latest non-null partial fields with the old record instead of
replacing the whole record.
+ * And merging the non-null fields during preCombine multiple records with
same record key instead of choosing the latest record based on ordering field.
+ *
+ * <p> Regarding #combineAndGetUpdateValue, Assuming a {@link GenericRecord}
has row schema: (f0 int , f1 int, f2 int).
+ * The first record value is: (1, 2, 3), the second record value is: (4, 5,
null) with the field f2 value as null.
+ * Calling the #combineAndGetUpdateValue method of the two records returns
record: (4, 5, 3).
+ * Note that field f2 value is ignored because it is null. </p>
+ *
+ * <p> Regarding #preCombine, Assuming a {@link GenericRecord} has row schema:
(f0 int , f1 int, f2 int, o1 int),
+ * and initial two {@link PartialOverwriteWithLatestAvroPayload} with
different ordering value.
+ * The first record value is (1, null, 1, 1) with the filed f1 value as null,
the second value is: (2, 2, null, 2) with the f2 value as null.
+ * Calling the #preCombine method of the two records returns record: (2, 2, 1,
2).
+ * Note:
+ * <ol>
+ * <li>the field f0 value is 2 because the ordering value of second record
is bigger.</li>
+ * <li>the filed f1 value is 2 because the f2 value of first record is
null.</li>
+ * <li>the filed f2 value is 1 because the f2 value of second record is
null.</li>
+ * <li>the filed o1 value is 2 because the ordering value of second record
is bigger.</li>
+ * </ol>
+ *
+ * </p>
+ */
+public class PartialOverwriteWithLatestAvroPayload extends
OverwriteWithLatestAvroPayload {
+
+ public PartialOverwriteWithLatestAvroPayload(GenericRecord record,
Comparable orderingVal) {
+ super(record, orderingVal);
+ }
+
+ public PartialOverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
+ super(record); // natural order
+ }
+
+ @Override
+ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema) throws IOException {
+ if (recordBytes.length == 0) {
+ return Option.empty();
+ }
+
+ GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+ if (isDeleteRecord(incomingRecord)) {
+ return Option.empty();
+ }
+
+ GenericRecord currentRecord = (GenericRecord) currentValue;
+ List<Schema.Field> fields = schema.getFields();
+ fields.forEach(field -> {
+ Object value = incomingRecord.get(field.name());
+ if (Objects.nonNull(value)) {
+ currentRecord.put(field.name(), value);
+ }
+ });
+
+ return Option.of(currentRecord);
+ }
+
+ @Override
+ public int compareTo(OverwriteWithLatestAvroPayload oldValue) {
+ return orderingVal.compareTo(oldValue.orderingVal);
+ }
+
+ @Override
+ public OverwriteWithLatestAvroPayload
preCombine(OverwriteWithLatestAvroPayload oldValue, Properties properties,
Schema schema) {
+ if (null == schema) {
+ return super.preCombine(oldValue);
+ }
+
+ try {
+ Option<IndexedRecord> incomingOption = getInsertValue(schema);
+ Option<IndexedRecord> oldRecordOption = oldValue.getInsertValue(schema);
+
+ if (incomingOption.isPresent() && oldRecordOption.isPresent()) {
Review comment:
In general it's better to express common functionality in a way that
would allow it to be re-used and adopted in other places: here for ex, we can
reuse the same routine of combining 2 records into one, across 2 methods if we
properly abstract it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]