This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ecd088df954 [HUDI-6922] Fix inconsistency between base file format and
catalog input format (#9830)
ecd088df954 is described below
commit ecd088df9543bd1986e66eae81791bda4f24be27
Author: Wechar Yu <[email protected]>
AuthorDate: Sun Oct 8 09:11:37 2023 +0800
[HUDI-6922] Fix inconsistency between base file format and catalog input
format (#9830)
---
.../hudi/command/CreateHoodieTableCommand.scala | 14 +++---
.../apache/spark/sql/hudi/TestCreateTable.scala | 52 ++++++++++++++++++++++
2 files changed, 57 insertions(+), 9 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
index d6e4a70b39f..038ae141c51 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
@@ -118,15 +118,11 @@ object CreateHoodieTableCommand {
val properties = tableConfig.getProps.asScala.toMap
val tableType = tableConfig.getTableType.name()
- val inputFormat = tableType match {
- case DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL =>
- classOf[HoodieParquetInputFormat].getCanonicalName
- case DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL =>
- classOf[HoodieParquetRealtimeInputFormat].getCanonicalName
- case _=> throw new IllegalArgumentException(s"UnKnow table
type:$tableType")
- }
- val outputFormat =
HoodieInputFormatUtils.getOutputFormatClassName(HoodieFileFormat.PARQUET)
- val serdeFormat =
HoodieInputFormatUtils.getSerDeClassName(HoodieFileFormat.PARQUET)
+
+ val fileFormat = tableConfig.getBaseFileFormat
+ val inputFormat =
HoodieInputFormatUtils.getInputFormatClassName(fileFormat, tableType ==
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ val outputFormat =
HoodieInputFormatUtils.getOutputFormatClassName(fileFormat)
+ val serdeFormat = HoodieInputFormatUtils.getSerDeClassName(fileFormat)
// only parameters irrelevant to hudi can be set to storage.properties
val storageProperties = HoodieOptionConfig.deleteHoodieOptions(properties)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index 642592a6c9f..f0fd5159450 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -1409,4 +1409,56 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
assertResult(tableSchemaAfterCreate1.get)(tableSchemaAfterCreate2.get)
}
}
+
+ test("Test Create Hoodie Table with base file format") {
+ // Parquet
+ Seq("cow", "mor").foreach { tableType =>
+ withTable(generateTableName) { tableName =>
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | primaryKey ='id',
+ | type = '$tableType',
+ | preCombineField = 'ts',
+ | hoodie.table.base.file.format = 'PARQUET'
+ | )
+ """.stripMargin)
+ val table =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+
assertResult(table.storage.serde.get)("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
+ assertResult(table.storage.inputFormat.get)(
+ if (tableType.equals("mor"))
"org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat"
+ else "org.apache.hudi.hadoop.HoodieParquetInputFormat")
+
assertResult(table.storage.outputFormat.get)("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
+ }
+ }
+
+ // Orc
+ withTable(generateTableName) { tableName =>
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | primaryKey ='id',
+ | type = 'cow',
+ | preCombineField = 'ts',
+ | hoodie.table.base.file.format = 'ORC'
+ | )
+ """.stripMargin)
+ val table =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+
assertResult(table.storage.serde.get)("org.apache.hadoop.hive.ql.io.orc.OrcSerde")
+
assertResult(table.storage.inputFormat.get)("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")
+
assertResult(table.storage.outputFormat.get)("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")
+ }
+ }
}