This is an automated email from the ASF dual-hosted git repository.

jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 59d10808481 [HUDI-8183] Throw exception if record key field does not 
exist (#11934)
59d10808481 is described below

commit 59d10808481ad127c43e817608be8f528325e519
Author: Jon Vexler <[email protected]>
AuthorDate: Tue Sep 24 21:06:02 2024 -0400

    [HUDI-8183] Throw exception if record key field does not exist (#11934)
---
 .../java/org/apache/hudi/keygen/KeyGenUtils.java   |  8 ++++-
 .../apache/hudi/functional/TestCOWDataSource.scala | 38 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index 04925213ec2..85d8835875e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieKeyException;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
@@ -153,7 +154,12 @@ public class KeyGenUtils {
     StringBuilder recordKey = new StringBuilder();
     for (int i = 0; i < recordKeyFields.size(); i++) {
       String recordKeyField = recordKeyFields.get(i);
-      String recordKeyValue = 
HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyField, true, 
consistentLogicalTimestampEnabled);
+      String recordKeyValue;
+      try {
+        recordKeyValue = HoodieAvroUtils.getNestedFieldValAsString(record, 
recordKeyField, false, consistentLogicalTimestampEnabled);
+      } catch (HoodieException e) {
+        throw new HoodieKeyException("Record key field '" + recordKeyField + 
"' does not exist in the input record");
+      }
       if (recordKeyValue == null) {
         
recordKey.append(recordKeyField).append(DEFAULT_COMPOSITE_KEY_FILED_VALUE).append(NULL_RECORDKEY_PLACEHOLDER);
       } else if (recordKeyValue.isEmpty()) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index af98f85db6a..ef59d1009e2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -66,6 +66,7 @@ import java.sql.{Date, Timestamp}
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.function.Consumer
 
+import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 import scala.util.matching.Regex
 
@@ -713,6 +714,43 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings =  Array(
+    "_row_key,non_existent_field|Record key field 'non_existent_field' does 
not exist in the input record",
+    "non_existent_field|recordKey value: \"null\" for field: 
\"non_existent_field\" cannot be null or empty.",
+    "_row_key,tip_history.non_existent_field|Record key field 
'tip_history.non_existent_field' does not exist in the input record",
+    "tip_history.non_existent_field|recordKey value: \"null\" for field: 
\"tip_history.non_existent_field\" cannot be null or empty."))
+  def testMissingRecordkeyField(args: String): Unit = {
+    val splits = args.split('|')
+    val recordKeyFields = splits(0)
+    val errorMessage = splits(1)
+    val records1 = recordsToStrings(dataGen.generateInserts("001", 
5)).asScala.toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    try {
+      inputDF1.write.format("hudi")
+        .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), recordKeyFields)
+        .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
+        .mode(SaveMode.Overwrite)
+        .save(basePath)
+      fail("should fail when the specified record key field does not exist")
+    } catch {
+      case e: Exception => assertTrue(containsErrorMessage(e, errorMessage))
+    }
+  }
+
+  @tailrec
+  private def containsErrorMessage(e: Throwable, message: String): Boolean = {
+    if (e != null) {
+      if (e.getMessage.contains(message)) {
+        true
+      } else {
+        containsErrorMessage(e.getCause, message)
+      }
+    } else {
+      false
+    }
+  }
+
   @Test
   def testOverWriteModeUseReplaceAction(): Unit = {
     val (writeOpts, readOpts) = getWriterReaderOpts()

Reply via email to