vinothchandar commented on code in PR #13498: URL: https://github.com/apache/hudi/pull/13498#discussion_r2224106623
########## hudi-common/src/main/java/org/apache/hudi/common/engine/ReaderContextTypeConverter.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.engine; + +/** + * Helper class that handle cell level operation. Review Comment: Fix java docs. We can't use things like helper classes, unless it's full of util methods. Specifically don't expect classes to be extended ########## hudi-common/src/main/java/org/apache/hudi/avro/AvroReaderContextTypeConverter.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.avro; + +import org.apache.hudi.common.engine.ReaderContextTypeConverter; +import org.apache.hudi.common.util.StringUtils; + +import java.nio.ByteBuffer; + +public class AvroReaderContextTypeConverter extends ReaderContextTypeConverter { Review Comment: UTs? for all these classes? ########## hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java: ########## @@ -1078,6 +1095,15 @@ public Set<String> getMetadataPartitions() { CONFIG_VALUES_DELIMITER)); } + public PartialUpdateMode getPartialUpdateMode() { + if (getTableVersion().greaterThanOrEquals(HoodieTableVersion.NINE)) { + return PartialUpdateMode.valueOf(getStringOrDefault(PARTIAL_UPDATE_MODE)); + } else { + // For table version <= 8, partial update is not supported. + return PartialUpdateMode.NONE; Review Comment: are we saying partial update wont work for table version 8? this should not happen right. I understand the table property was introduced.. I am questioning whether the default is `PartialUpdateMode.NONE` or one of the actual choices.. if this table config is not there, it should not mean the partial update is not enabled.. instead it should use some default like `KEEP_VALUES` ########## hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java: ########## @@ -410,6 +403,18 @@ public Comparable getOrderingValue(T record, */ public abstract HoodieRecord<T> constructHoodieRecord(BufferedRecord<T> bufferedRecord); + /** + * Constructs a new Engine based record based on a given schema, base record and update values. + * + * @param schema The schema of the new record. + * @param updateValues The map recording field index and its corresponding update value. + * @param baseRecord The record based on which the engine record is built. + * @return A new instance of engine record type {@link T}. + */ + public abstract T constructEngineRecord(Schema schema, Review Comment: rename: `mergeWithEngineRecord` ########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java: ########## @@ -238,11 +238,14 @@ protected void initQueryIndexConf() { * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}). */ protected void cleanupSparkContexts() { + if (sparkSession != null) { Review Comment: Is this solving an issue that exists today? ########## hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table; + +import org.apache.hudi.common.config.EnumFieldDescription; + +public enum PartialUpdateMode { + @EnumFieldDescription( + "No partial update logic should be employed.") + NONE, + + @EnumFieldDescription( + "For any column values missing in current record, pick value from previous version of the record.") Review Comment: For any columns that are present in the current record, the value from current record overwrites the previous record. --- this is the default overwrite behavior. ########## hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table; + +import org.apache.hudi.common.config.EnumFieldDescription; + +public enum PartialUpdateMode { + @EnumFieldDescription( + "No partial update logic should be employed.") Review Comment: If a MIT statement will work on top of a table with `hoodie.table.partial.update.mode=NONE` , then this is confusing.. Options : 1) Make KEEP_VALUES the default. the full record overwrite is simply a subset of that, with no column values missing in the record... 2) change the table config to `hoodie.table.update.mode=AUTO`... and other modes to `PARTIAL_KEEP_VALUES` , `PARTIAL_FILL_DEFAULTS` ... . But begs the question whether we need `*_KEEP_VALUES` I vote for 1. ########## hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table; + +import org.apache.hudi.common.config.EnumFieldDescription; + +public enum PartialUpdateMode { + @EnumFieldDescription( + "No partial update logic should be employed.") + NONE, + + @EnumFieldDescription( + "For any column values missing in current record, pick value from previous version of the record.") + KEEP_VALUES, + + @EnumFieldDescription( + "For column values missing in current record, pick the default value from the schema.") + FILL_DEFAULTS, + + @EnumFieldDescription( + "For columns having default values set in current record, pick the value from previous version of the record." + + "Only top level data type default is checked, which means this mode does not check leaf level data type default" + + "value for nested data types.") + IGNORE_DEFAULTS, + + @EnumFieldDescription( + "For columns having marker in the current record, pick value from previous version of the record during write." Review Comment: whats a marker.. not obvious from context.. better naming? ########## hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table; + +import org.apache.hudi.common.config.EnumFieldDescription; + +public enum PartialUpdateMode { + @EnumFieldDescription( + "No partial update logic should be employed.") + NONE, + + @EnumFieldDescription( + "For any column values missing in current record, pick value from previous version of the record.") + KEEP_VALUES, + + @EnumFieldDescription( + "For column values missing in current record, pick the default value from the schema.") + FILL_DEFAULTS, + + @EnumFieldDescription( + "For columns having default values set in current record, pick the value from previous version of the record." + + "Only top level data type default is checked, which means this mode does not check leaf level data type default" + + "value for nested data types.") + IGNORE_DEFAULTS, + + @EnumFieldDescription( + "For columns having marker in the current record, pick value from previous version of the record during write." + + "Marker value can be defined using `hoodie.write.partial.update.custom.marker`, which should be added to" + + "the value of table config `hoodie.write.partial.update.properties`. During upgrade, it could be" + + "added automatically for some payload classes, e.g., `PostgresDebeziumAvroPayload`.") Review Comment: lets not talk about payload classes here.. ########## hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table; + +import org.apache.hudi.common.config.EnumFieldDescription; + +public enum PartialUpdateMode { + @EnumFieldDescription( + "No partial update logic should be employed.") + NONE, + + @EnumFieldDescription( + "For any column values missing in current record, pick value from previous version of the record.") + KEEP_VALUES, + + @EnumFieldDescription( + "For column values missing in current record, pick the default value from the schema.") + FILL_DEFAULTS, + + @EnumFieldDescription( + "For columns having default values set in current record, pick the value from previous version of the record." + + "Only top level data type default is checked, which means this mode does not check leaf level data type default" + + "value for nested data types.") + IGNORE_DEFAULTS, + + @EnumFieldDescription( + "For columns having marker in the current record, pick value from previous version of the record during write." + + "Marker value can be defined using `hoodie.write.partial.update.custom.marker`, which should be added to" + + "the value of table config `hoodie.write.partial.update.properties`. During upgrade, it could be" + + "added automatically for some payload classes, e.g., `PostgresDebeziumAvroPayload`.") + IGNORE_MARKERS Review Comment: rename:FILL_UNAVAILABLE ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java: ########## @@ -45,27 +46,45 @@ * Factory to create a {@link BufferedRecordMerger}. */ public class BufferedRecordMergerFactory { + + private BufferedRecordMergerFactory() { + } + public static <T> BufferedRecordMerger<T> create(HoodieReaderContext<T> readerContext, RecordMergeMode recordMergeMode, boolean enablePartialMerging, Option<HoodieRecordMerger> recordMerger, Option<String> orderingFieldName, Option<String> payloadClass, Schema readerSchema, - TypedProperties props) { + TypedProperties props, + PartialUpdateMode partialUpdateMode) { + /** + * This part implements KEEP_VALUES partial update mode, which merges two records that do not have all columns. + * Other Partial update modes, like IGNORE_DEFAULTS assume all columns exists in the record, + * but some columns contain specific values that should be replaced by that from older version of the record. + */ if (enablePartialMerging) { BufferedRecordMerger<T> deleteRecordMerger = create( - readerContext, recordMergeMode, false, recordMerger, orderingFieldName, payloadClass, readerSchema, props); + readerContext, recordMergeMode, false, recordMerger, orderingFieldName, payloadClass, readerSchema, props, partialUpdateMode); return new PartialUpdateBufferedRecordMerger<>(readerContext, recordMerger, deleteRecordMerger, readerSchema, props); } + switch (recordMergeMode) { case COMMIT_TIME_ORDERING: - return new CommitTimeBufferedRecordMerger<>(); + if (partialUpdateMode == PartialUpdateMode.NONE) { Review Comment: needs to change based on feedback above ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java: ########## @@ -93,6 +112,49 @@ public Pair<Boolean, T> finalMerge(BufferedRecord<T> olderRecord, BufferedRecord } } + /** + * An implementation of {@link BufferedRecordMerger} which merges {@link BufferedRecord}s + * based on {@code COMMIT_TIME_ORDERING} merge mode and partial update mode. + */ + private static class CommitTimeBufferedRecordPartialUpdateMerger<T> extends CommitTimeBufferedRecordMerger<T> { Review Comment: too long a name. rename: `CommitTimePartialRecordMerger` and then base class is `CommitTimeRecordMerger` ########## hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table; + +import org.apache.hudi.common.config.EnumFieldDescription; + +public enum PartialUpdateMode { + @EnumFieldDescription( + "No partial update logic should be employed.") + NONE, + + @EnumFieldDescription( + "For any column values missing in current record, pick value from previous version of the record.") + KEEP_VALUES, + + @EnumFieldDescription( + "For column values missing in current record, pick the default value from the schema.") + FILL_DEFAULTS, + + @EnumFieldDescription( + "For columns having default values set in current record, pick the value from previous version of the record." + + "Only top level data type default is checked, which means this mode does not check leaf level data type default" + + "value for nested data types.") + IGNORE_DEFAULTS, + + @EnumFieldDescription( + "For columns having marker in the current record, pick value from previous version of the record during write." Review Comment: We are basically talking about unavailable columns here. lets just call this unavailable columns, and not marker ########## hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table; + +import org.apache.hudi.common.config.EnumFieldDescription; + +public enum PartialUpdateMode { + @EnumFieldDescription( + "No partial update logic should be employed.") + NONE, + + @EnumFieldDescription( + "For any column values missing in current record, pick value from previous version of the record.") + KEEP_VALUES, + + @EnumFieldDescription( + "For column values missing in current record, pick the default value from the schema.") + FILL_DEFAULTS, Review Comment: To confirm my understanding, all the writer paths here - will enforce this behavior.. ? ########## hudi-common/src/main/java/org/apache/hudi/common/engine/ReaderContextTypeConverter.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.engine; + +/** + * Helper class that handle cell level operation. + */ +public class ReaderContextTypeConverter { Review Comment: Should this be an interface with default methods? ########## hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java: ########## @@ -110,6 +112,23 @@ public HoodieRecord<InternalRow> constructHoodieRecord(BufferedRecord<InternalRo return new HoodieSparkRecord(hoodieKey, row, HoodieInternalRowUtils.getCachedSchema(schema), false); } + @Override + public InternalRow constructEngineRecord(Schema schema, Review Comment: Does this add extra overhead for MIT scenario with partial update? over the current baseline impl? if so this needs to be addressed. ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java: ########## @@ -126,6 +188,75 @@ public Pair<Boolean, T> finalMerge(BufferedRecord<T> olderRecord, BufferedRecord } } + /** + * An implementation of {@link EventTimeBufferedRecordMerger} which merges {@link BufferedRecord}s + * based on {@code EVENT_TIME_ORDERING} merge mode and partial update mode. + */ + private static class EventTimeBufferedRecordPartialUpdateMerger<T> extends EventTimeBufferedRecordMerger<T> { Review Comment: rename ########## hudi-common/src/main/java/org/apache/hudi/common/table/PartialUpdateMode.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table; + +import org.apache.hudi.common.config.EnumFieldDescription; + +public enum PartialUpdateMode { + @EnumFieldDescription( + "No partial update logic should be employed.") + NONE, + + @EnumFieldDescription( + "For any column values missing in current record, pick value from previous version of the record.") + KEEP_VALUES, + + @EnumFieldDescription( + "For column values missing in current record, pick the default value from the schema.") + FILL_DEFAULTS, + + @EnumFieldDescription( + "For columns having default values set in current record, pick the value from previous version of the record." + + "Only top level data type default is checked, which means this mode does not check leaf level data type default" + + "value for nested data types.") + IGNORE_DEFAULTS, + + @EnumFieldDescription( + "For columns having marker in the current record, pick value from previous version of the record during write." + + "Marker value can be defined using `hoodie.write.partial.update.custom.marker`, which should be added to" Review Comment: why `.update.*` vs `.merge.`. lets move all these `hoodie.write.partial.update.custom.marker` into the prefix/namespace defined above.. `unavailable.column.field.value` => the value that indicates the column was unavailable.. ########## hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java: ########## @@ -312,6 +314,20 @@ public class HoodieTableConfig extends HoodieConfig { .sinceVersion("1.0.0") .withDocumentation("When set to true, the table can support reading and writing multiple base file formats."); + public static final ConfigProperty<PartialUpdateMode> PARTIAL_UPDATE_MODE = ConfigProperty + .key("hoodie.table.partial.update.mode") + .defaultValue(PartialUpdateMode.NONE) + .sinceVersion("1.1.0") + .withDocumentation("This property when set, will define how two versions of the record will be " + + "merged together where the later contains only partial set of values and not entire record."); + + public static final ConfigProperty<String> MERGE_PROPERTIES = ConfigProperty + .key("hoodie.table.merge.properties") + .noDefaultValue() + .sinceVersion("1.1.0") + .withDocumentation("The value of this property is in the format of 'K1=V1,K2=V2,...,Ki=Vi,...'." Review Comment: Is this prevalent practice? To encode key value pairs into a property file? Instead, can we create a new prefix/namespace here ... e.g `hoodie.table.merge.properties.k1` ``` hoodie.table.merge.properties.k1=V1 hoodie.table.merge.properties.k2=V2 ``` and then we pass the following into the merger or code paths as needed. ``` Map( k1=v1 k2=v2 ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
