This is an automated email from the ASF dual-hosted git repository.
jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 59d10808481 [HUDI-8183] Throw exception if record key field does not
exist (#11934)
59d10808481 is described below
commit 59d10808481ad127c43e817608be8f528325e519
Author: Jon Vexler <[email protected]>
AuthorDate: Tue Sep 24 21:06:02 2024 -0400
[HUDI-8183] Throw exception if record key field does not exist (#11934)
---
.../java/org/apache/hudi/keygen/KeyGenUtils.java | 8 ++++-
.../apache/hudi/functional/TestCOWDataSource.scala | 38 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 1 deletion(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index 04925213ec2..85d8835875e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
@@ -153,7 +154,12 @@ public class KeyGenUtils {
StringBuilder recordKey = new StringBuilder();
for (int i = 0; i < recordKeyFields.size(); i++) {
String recordKeyField = recordKeyFields.get(i);
- String recordKeyValue =
HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true,
consistentLogicalTimestampEnabled);
+ String recordKeyValue;
+ try {
+ recordKeyValue = HoodieAvroUtils.getNestedFieldValAsString(record,
recordKeyField, false, consistentLogicalTimestampEnabled);
+ } catch (HoodieException e) {
+ throw new HoodieKeyException("Record key field '" + recordKeyField +
"' does not exist in the input record");
+ }
if (recordKeyValue == null) {
recordKey.append(recordKeyField).append(DEFAULT_COMPOSITE_KEY_FILED_VALUE).append(NULL_RECORDKEY_PLACEHOLDER);
} else if (recordKeyValue.isEmpty()) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index af98f85db6a..ef59d1009e2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -66,6 +66,7 @@ import java.sql.{Date, Timestamp}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.function.Consumer
+import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.util.matching.Regex
@@ -713,6 +714,43 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
}
+ @ParameterizedTest
+ @ValueSource(strings = Array(
+ "_row_key,non_existent_field|Record key field 'non_existent_field' does
not exist in the input record",
+ "non_existent_field|recordKey value: \"null\" for field:
\"non_existent_field\" cannot be null or empty.",
+ "_row_key,tip_history.non_existent_field|Record key field
'tip_history.non_existent_field' does not exist in the input record",
+ "tip_history.non_existent_field|recordKey value: \"null\" for field:
\"tip_history.non_existent_field\" cannot be null or empty."))
+ def testMissingRecordkeyField(args: String): Unit = {
+ val splits = args.split('|')
+ val recordKeyFields = splits(0)
+ val errorMessage = splits(1)
+ val records1 = recordsToStrings(dataGen.generateInserts("001",
5)).asScala.toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ try {
+ inputDF1.write.format("hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), recordKeyFields)
+ .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ fail("should fail when the specified record key field does not exist")
+ } catch {
+ case e: Exception => assertTrue(containsErrorMessage(e, errorMessage))
+ }
+ }
+
+ @tailrec
+ private def containsErrorMessage(e: Throwable, message: String): Boolean = {
+ if (e != null) {
+ if (e.getMessage.contains(message)) {
+ true
+ } else {
+ containsErrorMessage(e.getCause, message)
+ }
+ } else {
+ false
+ }
+ }
+
@Test
def testOverWriteModeUseReplaceAction(): Unit = {
val (writeOpts, readOpts) = getWriterReaderOpts()