[ https://issues.apache.org/jira/browse/HUDI-3065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Raymond Xu updated HUDI-3065: ----------------------------- Description: with 0.8.0, if partition is of the format "/partitionKey=partitionValue", Spark auto partition discovery will kick in. we can see explicit fields in hudi's table schema. But with 0.9.0, it does not happen. // launch spark shell with 0.8.0 {code:scala} import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ val tableName = "hudi_trips_cow" val basePath = "file:///tmp/hudi_trips_cow" val dataGen = new DataGenerator val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath", "(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city=")) newDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite).save(basePath) val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.printSchema {code} // output : check for continent, country, city in the end. {code} |– _hoodie_commit_time: string (nullable = true)| |-- _hoodie_commit_seqno: string (nullable = true) |-- _hoodie_record_key: string (nullable = true) |-- _hoodie_partition_path: string (nullable = true) |-- _hoodie_file_name: string (nullable = true) |-- begin_lat: double (nullable = true) |-- begin_lon: double (nullable = true) |-- driver: string (nullable = true) |-- end_lat: double (nullable = true) |-- end_lon: double (nullable = true) |-- fare: double (nullable = true) |-- partitionpath: string (nullable = true) |-- rider: string (nullable = true) |-- ts: long (nullable = true) |-- uuid: string (nullable = true) |-- continent: string (nullable = true) |-- country: string (nullable = true) |-- city: string (nullable = true) {code} Lets run this with 0.9.0. {code:scala} import org.apache.hudi.QuickstartUtils._ import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ val tableName = "hudi_trips_cow" val basePath = "file:///tmp/hudi_trips_cow" val dataGen = new DataGenerator val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath", "(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city=")) newDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath) val tripsSnapshotDF = spark. | read. | format("hudi"). | load(basePath ) tripsSnapshotDF.printSchema {code} //output: continent, country, city is missing. {code} root |-- _hoodie_commit_time: string (nullable = true) |-- _hoodie_commit_seqno: string (nullable = true) |-- _hoodie_record_key: string (nullable = true) |-- _hoodie_partition_path: string (nullable = true) |-- _hoodie_file_name: string (nullable = true) |-- begin_lat: double (nullable = true) |-- begin_lon: double (nullable = true) |-- driver: string (nullable = true) |-- end_lat: double (nullable = true) |-- end_lon: double (nullable = true) |-- fare: double (nullable = true) |-- rider: string (nullable = true) |-- ts: long (nullable = true) |-- uuid: string (nullable = true) |-- partitionpath: string (nullable = true) {code} Ref issue: [https://github.com/apache/hudi/issues/3984] was: with 0.8.0, if partition is of the format "/partitionKey=partitionValue", Spark auto partition discovery will kick in. we can see explicit fields in hudi's table schema. But with 0.9.0, it does not happen. // launch spark shell with 0.8.0 {code:java} import org.apache.hudi.QuickstartUtils._import scala.collection.JavaConversions._import org.apache.spark.sql.SaveMode._import org.apache.hudi.DataSourceReadOptions._import org.apache.hudi.DataSourceWriteOptions._import org.apache.hudi.config.HoodieWriteConfig._ val tableName = "hudi_trips_cow"val basePath = "file:///tmp/hudi_trips_cow"val dataGen = new DataGenerator val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath", "(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city=")) newDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath) val tripsSnapshotDF = spark. read. format("hudi"). load(basePath) tripsSnapshotDF.printSchema {code} //output : check for continent, country, city in the end. |– _hoodie_commit_time: string (nullable = true)| |-- _hoodie_commit_seqno: string (nullable = true) |-- _hoodie_record_key: string (nullable = true) |-- _hoodie_partition_path: string (nullable = true) |-- _hoodie_file_name: string (nullable = true) |-- begin_lat: double (nullable = true) |-- begin_lon: double (nullable = true) |-- driver: string (nullable = true) |-- end_lat: double (nullable = true) |-- end_lon: double (nullable = true) |-- fare: double (nullable = true) |-- partitionpath: string (nullable = true) |-- rider: string (nullable = true) |-- ts: long (nullable = true) |-- uuid: string (nullable = true) |-- continent: string (nullable = true) |-- country: string (nullable = true) |-- city: string (nullable = true) Lets run this with 0.9.0. {code:java} import org.apache.hudi.QuickstartUtils._import scala.collection.JavaConversions._import org.apache.spark.sql.SaveMode._import org.apache.hudi.DataSourceReadOptions._import org.apache.hudi.DataSourceWriteOptions._import org.apache.hudi.config.HoodieWriteConfig._ val tableName = "hudi_trips_cow"val basePath = "file:///tmp/hudi_trips_cow"val dataGen = new DataGenerator val inserts = convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath", "(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city=")) newDf.write.format("hudi"). options(getQuickstartWriteConfigs). option(PRECOMBINE_FIELD_OPT_KEY, "ts"). option(RECORDKEY_FIELD_OPT_KEY, "uuid"). option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). option(TABLE_NAME, tableName). mode(Overwrite). save(basePath) val tripsSnapshotDF = spark. | read. | format("hudi"). | load(basePath ) tripsSnapshotDF.printSchema {code} /output: continent, country, city is missing. root |-- _hoodie_commit_time: string (nullable = true) |-- _hoodie_commit_seqno: string (nullable = true) |-- _hoodie_record_key: string (nullable = true) |-- _hoodie_partition_path: string (nullable = true) |-- _hoodie_file_name: string (nullable = true) |-- begin_lat: double (nullable = true) |-- begin_lon: double (nullable = true) |-- driver: string (nullable = true) |-- end_lat: double (nullable = true) |-- end_lon: double (nullable = true) |-- fare: double (nullable = true) |-- rider: string (nullable = true) |-- ts: long (nullable = true) |-- uuid: string (nullable = true) |-- partitionpath: string (nullable = true) Ref issue: [https://github.com/apache/hudi/issues/3984] > spark auto partition discovery does not work from 0.9.0 > ------------------------------------------------------- > > Key: HUDI-3065 > URL: https://issues.apache.org/jira/browse/HUDI-3065 > Project: Apache Hudi > Issue Type: Bug > Components: Spark Integration > Reporter: sivabalan narayanan > Assignee: Yann Byron > Priority: Major > Labels: core-flow-ds, sev:critical, spark > Fix For: 0.10.1 > > > with 0.8.0, if partition is of the format "/partitionKey=partitionValue", > Spark auto partition discovery will kick in. we can see explicit fields in > hudi's table schema. > But with 0.9.0, it does not happen. > // launch spark shell with 0.8.0 > {code:scala} > import org.apache.hudi.QuickstartUtils._ > import scala.collection.JavaConversions._ > import org.apache.spark.sql.SaveMode._ > import org.apache.hudi.DataSourceReadOptions._ > import org.apache.hudi.DataSourceWriteOptions._ > import org.apache.hudi.config.HoodieWriteConfig._ > val tableName = "hudi_trips_cow" > val basePath = "file:///tmp/hudi_trips_cow" > val dataGen = new DataGenerator > val inserts = convertToStringList(dataGen.generateInserts(10)) > val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) > val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath", > "(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city=")) > newDf.write.format("hudi"). > options(getQuickstartWriteConfigs). > option(PRECOMBINE_FIELD_OPT_KEY, "ts"). > option(RECORDKEY_FIELD_OPT_KEY, "uuid"). > option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). > option(TABLE_NAME, tableName). > mode(Overwrite).save(basePath) > val tripsSnapshotDF = spark. > read. > format("hudi"). > load(basePath) > tripsSnapshotDF.printSchema > {code} > // output : check for continent, country, city in the end. > {code} > |– _hoodie_commit_time: string (nullable = true)| > |-- _hoodie_commit_seqno: string (nullable = true) > |-- _hoodie_record_key: string (nullable = true) > |-- _hoodie_partition_path: string (nullable = true) > |-- _hoodie_file_name: string (nullable = true) > |-- begin_lat: double (nullable = true) > |-- begin_lon: double (nullable = true) > |-- driver: string (nullable = true) > |-- end_lat: double (nullable = true) > |-- end_lon: double (nullable = true) > |-- fare: double (nullable = true) > |-- partitionpath: string (nullable = true) > |-- rider: string (nullable = true) > |-- ts: long (nullable = true) > |-- uuid: string (nullable = true) > |-- continent: string (nullable = true) > |-- country: string (nullable = true) > |-- city: string (nullable = true) > {code} > > Lets run this with 0.9.0. > {code:scala} > import org.apache.hudi.QuickstartUtils._ > import scala.collection.JavaConversions._ > import org.apache.spark.sql.SaveMode._ > import org.apache.hudi.DataSourceReadOptions._ > import org.apache.hudi.DataSourceWriteOptions._ > import org.apache.hudi.config.HoodieWriteConfig._ > val tableName = "hudi_trips_cow" > val basePath = "file:///tmp/hudi_trips_cow" > val dataGen = new DataGenerator > val inserts = convertToStringList(dataGen.generateInserts(10)) > val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) > val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath", > "(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city=")) > newDf.write.format("hudi"). > options(getQuickstartWriteConfigs). > option(PRECOMBINE_FIELD_OPT_KEY, "ts"). > option(RECORDKEY_FIELD_OPT_KEY, "uuid"). > option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). > option(TABLE_NAME, tableName). > mode(Overwrite). save(basePath) > val tripsSnapshotDF = spark. > | read. > | format("hudi"). > | load(basePath ) > tripsSnapshotDF.printSchema > {code} > //output: continent, country, city is missing. > {code} > root > |-- _hoodie_commit_time: string (nullable = true) > |-- _hoodie_commit_seqno: string (nullable = true) > |-- _hoodie_record_key: string (nullable = true) > |-- _hoodie_partition_path: string (nullable = true) > |-- _hoodie_file_name: string (nullable = true) > |-- begin_lat: double (nullable = true) > |-- begin_lon: double (nullable = true) > |-- driver: string (nullable = true) > |-- end_lat: double (nullable = true) > |-- end_lon: double (nullable = true) > |-- fare: double (nullable = true) > |-- rider: string (nullable = true) > |-- ts: long (nullable = true) > |-- uuid: string (nullable = true) > |-- partitionpath: string (nullable = true) > {code} > Ref issue: [https://github.com/apache/hudi/issues/3984] > > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)