the-other-tim-brown commented on code in PR #13623:
URL: https://github.com/apache/hudi/pull/13623#discussion_r2231485624
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/TestBaseSparkInternalRowReaderContext.java:
##########
@@ -124,12 +124,126 @@ void testConstructEngineRecordWithNullValueFromBase() {
BufferedRecord<InternalRow> record = new BufferedRecord<>(
"record_key", 1, base, 1, false);
- InternalRow result = readerContext.constructEngineRecord(SCHEMA, updates,
record);
+ InternalRow result = readerContext.mergeEngineRecord(SCHEMA, updates,
record);
assertTrue(result.isNullAt(0));
assertEquals("Dan", result.getUTF8String(1).toString());
assertTrue(result.getBoolean(2));
}
+ @Test
+ void testConstructEngineRecordWithListOfValues() {
+ List<Object> values = Arrays.asList(1, UTF8String.fromString("Alice"),
true);
+ InternalRow result = readerContext.createEngineRecord(SCHEMA, values);
+
+ assertEquals(1, result.getInt(0));
+ assertEquals("Alice", result.getString(1));
+ assertTrue(result.getBoolean(2));
+ }
+
+ @Test
+ void testConstructEngineRecordWithNullValues() {
+ List<Object> values = Arrays.asList(null, UTF8String.fromString("Bob"),
null);
+ InternalRow result = readerContext.createEngineRecord(SCHEMA, values);
+
+ assertTrue(result.isNullAt(0));
+ assertEquals("Bob", result.getString(1));
+ assertTrue(result.isNullAt(2));
+ }
+
+ @Test
+ void testConstructEngineRecordWithMixedTypes() {
+ List<Object> values = Arrays.asList(42, UTF8String.fromString("Carol"),
false);
+ InternalRow result = readerContext.createEngineRecord(SCHEMA, values);
+
+ assertEquals(42, result.getInt(0));
+ assertEquals("Carol", result.getString(1));
+ assertFalse(result.getBoolean(2));
+ }
+
+ @Test
+ void testConstructEngineRecordWithEmptyValues() {
+ List<Object> values = Arrays.asList(0, UTF8String.fromString(""), false);
+ InternalRow result = readerContext.createEngineRecord(SCHEMA, values);
+
+ assertEquals(0, result.getInt(0));
+ assertEquals("", result.getString(1));
+ assertFalse(result.getBoolean(2));
+ }
+
+ @Test
+ void testConstructEngineRecordWithValueCountMismatch() {
+ List<Object> values = Arrays.asList(1, UTF8String.fromString("Alice")); //
Missing boolean value
+
+ try {
+ readerContext.createEngineRecord(SCHEMA, values);
+ // Should not reach here
+ assertTrue(false, "Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Value count (2) does not match field
count (3)"));
+ }
+ }
+
+ @Test
+ void testConstructEngineRecordWithExtraValues() {
+ List<Object> values = Arrays.asList(1, UTF8String.fromString("Alice"),
true, "extra");
+
+ try {
+ readerContext.createEngineRecord(SCHEMA, values);
+ // Should not reach here
+ assertTrue(false, "Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Value count (4) does not match field
count (3)"));
+ }
+ }
+
+ @Test
+ void testConstructEngineRecordWithComplexSchema() {
+ // Create a more complex schema with nested fields
+ Schema complexSchema = SchemaBuilder.record("ComplexRecord").fields()
Review Comment:
We need at least one test with nested fields for sanity.
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestKeepValuesPartialMergingUtils.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestKeepValuesPartialMergingUtils {
+ private KeepValuesPartialMergingUtils<IndexedRecord>
keepValuesPartialMergingUtils;
+ private HoodieReaderContext<IndexedRecord> mockReaderContext;
+ private Schema fullSchema;
+ private Schema partialSchema;
+ private Schema readerSchema;
+
+ @BeforeEach
+ void setUp() {
+ keepValuesPartialMergingUtils = new KeepValuesPartialMergingUtils<>();
+ mockReaderContext = mock(HoodieReaderContext.class);
+
+ // Create test schemas
Review Comment:
Let's make some schemas with nested fields here as well. Maybe lists and
maps for robustness.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java:
##########
@@ -129,6 +129,16 @@ public InternalRow constructEngineRecord(Schema schema,
return new GenericInternalRow(values);
}
+ @Override
+ public InternalRow createEngineRecord(Schema schema, List<Object> values) {
+ List<Schema.Field> fields = schema.getFields();
+ if (fields.size() != values.size()) {
+ throw new IllegalArgumentException(
+ "Value count (" + values.size() + ") does not match field count (" +
fields.size() + ")");
Review Comment:
This is only validating the first level of the schema, it does not account
for nested fields. Will the toBinaryRow handle validating nested fields?
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##########
@@ -230,16 +234,31 @@ public HoodieRecord<ArrayWritable>
constructHoodieRecord(BufferedRecord<ArrayWri
}
@Override
- public ArrayWritable constructEngineRecord(Schema schema,
- Map<Integer, Object> updateValues,
- BufferedRecord<ArrayWritable>
baseRecord) {
+ public ArrayWritable mergeEngineRecord(Schema schema,
+ Map<Integer, Object> updateValues,
+ BufferedRecord<ArrayWritable>
baseRecord) {
Writable[] engineRecord = baseRecord.getRecord().get();
for (Map.Entry<Integer, Object> value : updateValues.entrySet()) {
engineRecord[value.getKey()] = (Writable) value.getValue();
}
return baseRecord.getRecord();
}
+ @Override
+ public ArrayWritable createEngineRecord(Schema schema, List<Object> values) {
+ List<Schema.Field> fields = schema.getFields();
+ if (fields.size() != values.size()) {
+ throw new IllegalArgumentException("Mismatch between schema fields and
values");
+ }
+
+ Writable[] writables = new Writable[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+ Schema fieldSchema = resolveUnion(fields.get(i).schema());
+ writables[i] = convertToWritable(fieldSchema, values.get(i));
Review Comment:
This is the only engine that is converting the output into engine specific
format. Do the other engines need to do this?
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java:
##########
@@ -413,9 +413,18 @@ public Comparable getOrderingValue(T record,
* @param baseRecord The record based on which the engine record is
built.
* @return A new instance of engine record type {@link T}.
*/
- public abstract T constructEngineRecord(Schema schema,
- Map<Integer, Object> updateValues,
- BufferedRecord<T> baseRecord);
+ public abstract T mergeEngineRecord(Schema schema,
+ Map<Integer, Object> updateValues,
+ BufferedRecord<T> baseRecord);
+
+ /**
+ * Creates a new Engine based record based on a given schema and its field
values.
+ *
+ * @param schema The schema of the new record.
+ * @param values The list of values.
+ * @return A new instance of engine record type {@link T}.
+ */
+ public abstract T createEngineRecord(Schema schema, List<Object> values);
Review Comment:
Let's consider using an array of values instead. It looks like in some cases
we can simplify the code a bit with this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]