This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ecd088df954 [HUDI-6922] Fix inconsistency between base file format and 
catalog input format (#9830)
ecd088df954 is described below

commit ecd088df9543bd1986e66eae81791bda4f24be27
Author: Wechar Yu <[email protected]>
AuthorDate: Sun Oct 8 09:11:37 2023 +0800

    [HUDI-6922] Fix inconsistency between base file format and catalog input 
format (#9830)
---
 .../hudi/command/CreateHoodieTableCommand.scala    | 14 +++---
 .../apache/spark/sql/hudi/TestCreateTable.scala    | 52 ++++++++++++++++++++++
 2 files changed, 57 insertions(+), 9 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
index d6e4a70b39f..038ae141c51 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
@@ -118,15 +118,11 @@ object CreateHoodieTableCommand {
     val properties = tableConfig.getProps.asScala.toMap
 
     val tableType = tableConfig.getTableType.name()
-    val inputFormat = tableType match {
-      case DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL =>
-        classOf[HoodieParquetInputFormat].getCanonicalName
-      case DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL =>
-        classOf[HoodieParquetRealtimeInputFormat].getCanonicalName
-      case _=> throw new IllegalArgumentException(s"UnKnow table 
type:$tableType")
-    }
-    val outputFormat = 
HoodieInputFormatUtils.getOutputFormatClassName(HoodieFileFormat.PARQUET)
-    val serdeFormat = 
HoodieInputFormatUtils.getSerDeClassName(HoodieFileFormat.PARQUET)
+
+    val fileFormat = tableConfig.getBaseFileFormat
+    val inputFormat = 
HoodieInputFormatUtils.getInputFormatClassName(fileFormat, tableType == 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+    val outputFormat = 
HoodieInputFormatUtils.getOutputFormatClassName(fileFormat)
+    val serdeFormat = HoodieInputFormatUtils.getSerDeClassName(fileFormat)
 
     // only parameters irrelevant to hudi can be set to storage.properties
     val storageProperties = HoodieOptionConfig.deleteHoodieOptions(properties)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index 642592a6c9f..f0fd5159450 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -1409,4 +1409,56 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
       assertResult(tableSchemaAfterCreate1.get)(tableSchemaAfterCreate2.get)
     }
   }
+
+  test("Test Create Hoodie Table with base file format") {
+    // Parquet
+    Seq("cow", "mor").foreach { tableType =>
+      withTable(generateTableName) { tableName =>
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | tblproperties (
+             |  primaryKey ='id',
+             |  type = '$tableType',
+             |  preCombineField = 'ts',
+             |  hoodie.table.base.file.format = 'PARQUET'
+             | )
+       """.stripMargin)
+        val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+        
assertResult(table.storage.serde.get)("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+        assertResult(table.storage.inputFormat.get)(
+          if (tableType.equals("mor")) 
"org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat"
+          else "org.apache.hudi.hadoop.HoodieParquetInputFormat")
+        
assertResult(table.storage.outputFormat.get)("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
+      }
+    }
+
+    // Orc
+    withTable(generateTableName) { tableName =>
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | tblproperties (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  preCombineField = 'ts',
+           |  hoodie.table.base.file.format = 'ORC'
+           | )
+       """.stripMargin)
+      val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+      
assertResult(table.storage.serde.get)("org.apache.hadoop.hive.ql.io.orc.OrcSerde")
+      
assertResult(table.storage.inputFormat.get)("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")
+      
assertResult(table.storage.outputFormat.get)("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")
+    }
+  }
 }

Reply via email to