nsivabalan commented on code in PR #13526:
URL: https://github.com/apache/hudi/pull/13526#discussion_r2220609107
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java:
##########
@@ -41,6 +41,7 @@ public class TestDataSource extends AbstractBaseTestSource {
private static final Logger LOG =
LoggerFactory.getLogger(TestDataSource.class);
public static boolean returnEmptyBatch = false;
+ public static Option<String> recordInstantTime = Option.empty();
Review Comment:
we should be resetting this to `Option.empty()` in either BeforeEach or in
the constructor of `TestDataSource`
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -604,13 +651,30 @@ protected void
validateOutputFromFileGroupReader(StorageConfiguration<?> storage
List<HoodieRecord>
expectedHoodieUnmergedRecords) throws Exception {
HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(storageConf, tablePath);
Schema avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema();
+ expectedHoodieRecords =
getExpectedHoodieRecordsWithOrderingValue(expectedHoodieRecords, metaClient,
avroSchema);
+ expectedHoodieUnmergedRecords =
getExpectedHoodieRecordsWithOrderingValue(expectedHoodieUnmergedRecords,
metaClient, avroSchema);
List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords =
convertHoodieRecords(expectedHoodieRecords, avroSchema);
List<HoodieTestDataGenerator.RecordIdentifier> expectedUnmergedRecords =
convertHoodieRecords(expectedHoodieUnmergedRecords, avroSchema);
validateOutputFromFileGroupReaderWithExistingRecords(
storageConf, tablePath, containsBaseFile, expectedLogFileNum,
recordMergeMode,
expectedRecords, expectedUnmergedRecords);
}
+ private static List<HoodieRecord>
getExpectedHoodieRecordsWithOrderingValue(List<HoodieRecord>
expectedHoodieRecords, HoodieTableMetaClient metaClient, Schema avroSchema) {
+ return expectedHoodieRecords.stream().map(rec -> {
+ RawTripTestPayload oldPayload = (RawTripTestPayload) rec.getData();
+ try {
+ List<String> orderingFields =
metaClient.getTableConfig().getPreCombineFieldList().get();
+ HoodieAvroRecord avroRecord = ((HoodieAvroRecord) rec);
+ Comparable orderingValue = Comparables.create(orderingFields, field ->
(Comparable) avroRecord.getColumnValueAsJava(avroSchema, field, new
TypedProperties()));
+ RawTripTestPayload newPayload = new
RawTripTestPayload(Option.ofNullable(oldPayload.getJsonData()),
oldPayload.getRowKey(), oldPayload.getPartitionPath(), null, false,
orderingValue);
Review Comment:
Have you thought about or attempted fixing HoodieTestDataGenerator only to
instantiate `RawTripTestPayload` w/ right ordering value.
then, we fix it for all test data generation right, and not just the
`TestHoodieFileGroupReaderBase.java`
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1955,6 +1957,62 @@ public void testFilterDupesWithPrecombine(
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void testDeltaStreamerWithMultipleOrderingFields(HoodieTableType
tableType) throws Exception {
+ String tableBasePath = basePath + "/test_with_multiple_ordering_fields";
+ HoodieDeltaStreamer.Config cfg =
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+ cfg.tableType = tableType.name();
+ cfg.filterDupes = true;
+ cfg.sourceOrderingFields = "timestamp,rider";
+ cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+ cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+ cfg.recordMergeStrategyId =
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+ // Set record merge mode to event time ordering
+ cfg.configs.add(String.format("%s=%s",
HoodieWriteConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.EVENT_TIME_ORDERING.name()));
+ //addRecordMerger(HoodieRecordType.AVRO, cfg.configs);
Review Comment:
why commented out?
if not required, can we remove it
##########
hudi-common/src/main/java/org/apache/hudi/Comparables.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.util.ValidationUtils;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
+
+public class Comparables implements Comparable, Serializable {
+ protected static final long serialVersionUID = 1L;
+ private static final Comparables DEFAULT_VALUE = new
Comparables(DEFAULT_ORDERING_VALUE);
+
+ private final List<Comparable> comparables;
+
+ public Comparables(List<Comparable> comparables) {
+ this.comparables = comparables;
+ }
+
+ public Comparables(Comparable comparable) {
+ this.comparables = Collections.singletonList(comparable);
+ }
+
+ public static Comparables getDefault() {
+ return DEFAULT_VALUE;
+ }
+
+ public static boolean isDefault(Comparable orderingVal) {
+ if (orderingVal instanceof Comparables) {
+ return ((Comparables) orderingVal).comparables.size() == 1
+ && ((Comparables)
orderingVal).comparables.get(0).equals(DEFAULT_ORDERING_VALUE);
+ } else {
+ return orderingVal.equals(DEFAULT_ORDERING_VALUE);
+ }
+ }
+
+ /**
+ * Returns whether the given two comparable values come from the same
runtime class.
+ */
+ public static boolean isSameClass(Comparable<?> v, Comparable<?> o) {
+ if (v.getClass() != o.getClass()) {
+ // If class is not same return false
+ return false;
+ } else if (v instanceof Comparables) {
+ if (((Comparables) v).comparables.size() != ((Comparables)
o).comparables.size()) {
+ // if comparables size is not same return false
+ return false;
+ } else {
+ // compare class of comparable list of both arguments
+ return IntStream.range(0, ((Comparables) v).comparables.size())
+ .mapToObj(i -> ((Comparables) v).comparables.get(i).getClass() ==
((Comparables) o).comparables.get(i).getClass())
+ .reduce(Boolean::logicalAnd)
+ .orElse(true);
+ }
+ }
+ // return true if class is same and input objects are not instance of
Comparables class
+ return true;
+ }
+
+ public static Comparable getDefaultOrderingValue() {
+ return DEFAULT_ORDERING_VALUE;
+ }
+
+ @Override
+ public int compareTo(Object o) {
+ ValidationUtils.checkArgument(o instanceof Comparables, "Comparables can
only be compared with another Comparables");
+ Comparables otherComparables = (Comparables) o;
+ ValidationUtils.checkArgument(comparables.size() ==
otherComparables.comparables.size(), "Comparables should be of same size");
+ for (int i = 0; i < comparables.size(); i++) {
+ int comparingValue =
comparables.get(i).compareTo(otherComparables.comparables.get(i));
Review Comment:
yes, lets ensure the ConfigProperty calls this out
##########
hudi-common/src/main/java/org/apache/hudi/Comparables.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.util.ValidationUtils;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
+
+public class Comparables implements Comparable, Serializable {
+ protected static final long serialVersionUID = 1L;
+
+ private final List<Comparable> comparables;
Review Comment:
hey @the-other-tim-brown : Whats your take on this? do you have any
suggestion towards this.
@lokeshj1703 : can we do a mirco benchmarking. using master for a single
ordering field vs this branch.
we can create a MOR file slice w/ N no of log files so that we do repeated
compare calls. and then do snapshot read to see if we can spot any perf
difference.
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -185,6 +189,49 @@ public void
testReadFileGroupInMergeOnReadTable(RecordMergeMode recordMergeMode,
}
}
+ @ParameterizedTest
+ @ValueSource(strings = {"avro", "parquet"})
+ public void testReadFileGroupWithMultipleOrderingFields(String
logDataBlockFormat) throws Exception {
+ RecordMergeMode recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+ Map<String, String> writeConfigs = new
HashMap<>(getCommonConfigs(recordMergeMode, true));
+ writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
logDataBlockFormat);
+ writeConfigs.put("hoodie.datasource.write.table.type",
HoodieTableType.MERGE_ON_READ.name());
+ // Use two precombine values - combination of timestamp and rider
+ String orderingValues = "timestamp,rider";
+ writeConfigs.put("hoodie.datasource.write.precombine.field",
orderingValues);
+ writeConfigs.put("hoodie.payload.ordering.field", orderingValues);
+
+ try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
+ // Initial commit. rider column gets value of rider-002
+ List<HoodieRecord> initialRecords = dataGen.generateInserts("002", 100);
+ commitToTable(initialRecords, INSERT.value(), true, writeConfigs);
+ validateOutputFromFileGroupReader(
+ getStorageConf(), getBasePath(), true, 0, recordMergeMode,
+ initialRecords, initialRecords);
+
+ // The updates have rider values as rider-001 and the existing records
have rider values as rider-001
Review Comment:
guess there is a typo in this comment.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1955,6 +1957,62 @@ public void testFilterDupesWithPrecombine(
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void testDeltaStreamerWithMultipleOrderingFields(HoodieTableType
tableType) throws Exception {
+ String tableBasePath = basePath + "/test_with_multiple_ordering_fields";
+ HoodieDeltaStreamer.Config cfg =
+ TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+ cfg.tableType = tableType.name();
+ cfg.filterDupes = true;
+ cfg.sourceOrderingFields = "timestamp,rider";
+ cfg.recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+ cfg.payloadClassName = DefaultHoodieRecordPayload.class.getName();
+ cfg.recordMergeStrategyId =
HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
+ // Set record merge mode to event time ordering
+ cfg.configs.add(String.format("%s=%s",
HoodieWriteConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.EVENT_TIME_ORDERING.name()));
+ //addRecordMerger(HoodieRecordType.AVRO, cfg.configs);
+ TestDataSource.recordInstantTime = Option.of("002");
+ new HoodieStreamer(cfg, jsc).sync();
+
+ assertRecordCount(1000, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
+
+ // Generate new updates with lower recordInstantTime so that updates are
rejected
+ TestDataSource.recordInstantTime = Option.of("001");
+ runStreamSync(cfg, false, 50, WriteOperationType.UPSERT);
+ int numInserts = 25;
+ // TestDataSource generates 500 inserts, 450 updates and 50 deletes
+ assertRecordCount(1025, tableBasePath, sqlContext); // if filter dupes is
not enabled, we should be expecting 3000 records here.
+ TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
+ // Filter records with rider-001 value and deduct the number of inserts
+ long numUpdates =
sparkSession.read().format("hudi").load(tableBasePath).filter("rider =
'rider-001'").count()
+ - numInserts;
+ // There should be no updates since ordering value rider-001 is lower than
existing record ordering value rider-002
+ assertEquals(0, numUpdates);
+
+ // Generate new updates with lower recordInstantTime so that updates are
rejected
Review Comment:
again, lets fix the comments
##########
hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java:
##########
@@ -185,6 +189,49 @@ public void
testReadFileGroupInMergeOnReadTable(RecordMergeMode recordMergeMode,
}
}
+ @ParameterizedTest
+ @ValueSource(strings = {"avro", "parquet"})
+ public void testReadFileGroupWithMultipleOrderingFields(String
logDataBlockFormat) throws Exception {
Review Comment:
we may not need diff log block formats.
but lets add a test at spark ds layer for COW table. (I assume it does not
make sense to add COW here w/ FG reader)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]