This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new dd0b809c36b [HUDI-8630] Fix convert spark key generator issue (#12402)
dd0b809c36b is described below

commit dd0b809c36ba867cc994def57b5631db1c5111c9
Author: fhan <[email protected]>
AuthorDate: Thu Dec 12 12:19:24 2024 +0800

    [HUDI-8630] Fix convert spark key generator issue (#12402)
    
    * [HUDI-8630]fix convert spark key generator issue
    
    * update comments in UT
    
    ---------
    
    Co-authored-by: fhan <[email protected]>
---
 .../hudi/HoodieDatasetBulkInsertHelper.scala       |  6 ++--
 .../spark/sql/hudi/dml/TestInsertTable.scala       | 40 ++++++++++++++++++++++
 2 files changed, 43 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 8f01143506b..e5828143cb5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -30,11 +30,11 @@ import org.apache.hudi.data.HoodieJavaRDD
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.index.HoodieIndex.BucketIndexEngineType
 import org.apache.hudi.index.{HoodieIndex, SparkHoodieIndexFactory}
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.hudi.keygen.{AutoRecordGenWrapperKeyGenerator, 
BuiltinKeyGenerator, KeyGenUtils}
 import 
org.apache.hudi.table.action.commit.{BulkInsertDataInternalWriterHelper, 
ConsistentBucketBulkInsertDataInternalWriterHelper, ParallelismHelper}
 import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable}
 import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked
-
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
@@ -96,7 +96,7 @@ object HoodieDatasetBulkInsertHelper
             
typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, 
instantTime)
           }
           val sparkKeyGenerator =
-            ReflectionUtils.loadClass(keyGeneratorClassName, typedProps)
+            
ReflectionUtils.loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName),
 typedProps)
               .asInstanceOf[BuiltinKeyGenerator]
               val keyGenerator: BuiltinKeyGenerator = if 
(autoGenerateRecordKeys) {
                 new AutoRecordGenWrapperKeyGenerator(typedProps, 
sparkKeyGenerator).asInstanceOf[BuiltinKeyGenerator]
@@ -243,7 +243,7 @@ object HoodieDatasetBulkInsertHelper
 
   private def getPartitionPathFields(config: HoodieWriteConfig): 
mutable.Seq[String] = {
     val keyGeneratorClassName = 
config.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME)
-    val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new 
TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
+    val keyGenerator = 
ReflectionUtils.loadClass(HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName),
 new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
     keyGenerator.getPartitionPathFields.asScala
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index b8912d58c5c..3b3252cac31 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -2997,4 +2997,44 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test("Test SparkKeyGenerator When Bulk Insert") {
+    withSQLConf("hoodie.sql.bulk.insert.enable" -> "true", 
"hoodie.sql.insert.mode" -> "non-strict") {
+      withRecordType()(withTempDir { tmp =>
+        val tableName = generateTableName
+        // Create a multi-level partitioned table
+        // Specify wrong keygenarator by setting 
hoodie.datasource.write.keygenerator.class = 
'org.apache.hudi.keygen.ComplexAvroKeyGenerator'
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long,
+             |  dt string,
+             |  pt string
+             |) using hudi
+             |tblproperties (
+             |  type = 'mor',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts',
+             |  hoodie.table.keygenerator.class = 
'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
+             |  hoodie.datasource.write.keygenerator.class = 
'org.apache.hudi.keygen.ComplexAvroKeyGenerator'
+             |)
+             | partitioned by (dt, pt)
+             | location '${tmp.getCanonicalPath}/$tableName'
+       """.stripMargin)
+        //Insert data and check the same
+        spark.sql(
+          s"""insert into $tableName  values
+             |(1, 'a', 31, 1000, '2021-01-05', 'A'),
+             |(2, 'b', 18, 1000, '2021-01-05', 'A')
+             |""".stripMargin)
+        checkAnswer(s"select id, name, price, ts, dt, pt from $tableName order 
by dt")(
+          Seq(1, "a", 31, 1000, "2021-01-05", "A"),
+          Seq(2, "b", 18, 1000, "2021-01-05", "A")
+        )
+      })
+    }
+  }
 }

Reply via email to