[ 
https://issues.apache.org/jira/browse/HUDI-3065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raymond Xu updated HUDI-3065:
-----------------------------
    Description: 
with 0.8.0, if partition is of the format  "/partitionKey=partitionValue", 
Spark auto partition discovery will kick in. we can see explicit fields in 
hudi's table schema. 

But with 0.9.0, it does not happen. 

// launch spark shell with 0.8.0 
{code:scala}
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath", 
"(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city="))

newDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).save(basePath)

val tripsSnapshotDF = spark.
        read.
        format("hudi").
        load(basePath)

tripsSnapshotDF.printSchema
{code}

// output : check for continent, country, city in the end.

{code}
|– _hoodie_commit_time: string (nullable = true)|

 |-- _hoodie_commit_seqno: string (nullable = true)

 |-- _hoodie_record_key: string (nullable = true)

 |-- _hoodie_partition_path: string (nullable = true)

 |-- _hoodie_file_name: string (nullable = true)

 |-- begin_lat: double (nullable = true)

 |-- begin_lon: double (nullable = true)

 |-- driver: string (nullable = true)

 |-- end_lat: double (nullable = true)

 |-- end_lon: double (nullable = true)

 |-- fare: double (nullable = true)

 |-- partitionpath: string (nullable = true)

 |-- rider: string (nullable = true)

 |-- ts: long (nullable = true)

 |-- uuid: string (nullable = true)

 |-- continent: string (nullable = true)

 |-- country: string (nullable = true)

 |-- city: string (nullable = true)

 {code}

 

Lets run this with 0.9.0.

{code:scala}
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath", 
"(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city="))

newDf.write.format("hudi").  
options(getQuickstartWriteConfigs).  
option(PRECOMBINE_FIELD_OPT_KEY, "ts").  
option(RECORDKEY_FIELD_OPT_KEY, "uuid").  
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").  
option(TABLE_NAME, tableName).  
mode(Overwrite).  save(basePath)

val tripsSnapshotDF = spark.
     |   read.
     |   format("hudi").
     |   load(basePath )

tripsSnapshotDF.printSchema

{code}

//output: continent, country, city is missing. 

{code}
root

 |-- _hoodie_commit_time: string (nullable = true)

 |-- _hoodie_commit_seqno: string (nullable = true)

 |-- _hoodie_record_key: string (nullable = true)

 |-- _hoodie_partition_path: string (nullable = true)

 |-- _hoodie_file_name: string (nullable = true)

 |-- begin_lat: double (nullable = true)

 |-- begin_lon: double (nullable = true)

 |-- driver: string (nullable = true)

 |-- end_lat: double (nullable = true)

 |-- end_lon: double (nullable = true)

 |-- fare: double (nullable = true)

 |-- rider: string (nullable = true)

 |-- ts: long (nullable = true)

 |-- uuid: string (nullable = true)

 |-- partitionpath: string (nullable = true)

 {code}

Ref issue: [https://github.com/apache/hudi/issues/3984]

 

 

 

 

  was:
with 0.8.0, if partition is of the format  "/partitionKey=partitionValue", 
Spark auto partition discovery will kick in. we can see explicit fields in 
hudi's table schema. 

But with 0.9.0, it does not happen. 

// launch spark shell with 0.8.0 
{code:java}
import org.apache.hudi.QuickstartUtils._import 
scala.collection.JavaConversions._import org.apache.spark.sql.SaveMode._import 
org.apache.hudi.DataSourceReadOptions._import 
org.apache.hudi.DataSourceWriteOptions._import 
org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"val basePath = "file:///tmp/hudi_trips_cow"val 
dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath", 
"(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city="))

newDf.write.format("hudi").  options(getQuickstartWriteConfigs).  
option(PRECOMBINE_FIELD_OPT_KEY, "ts").  option(RECORDKEY_FIELD_OPT_KEY, 
"uuid").  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").  
option(TABLE_NAME, tableName).  mode(Overwrite).  save(basePath)

val tripsSnapshotDF = spark.
        read.
        format("hudi").
        load(basePath)

tripsSnapshotDF.printSchema {code}
//output : check for continent, country, city in the end. 
|– _hoodie_commit_time: string (nullable = true)|

 |-- _hoodie_commit_seqno: string (nullable = true)

 |-- _hoodie_record_key: string (nullable = true)

 |-- _hoodie_partition_path: string (nullable = true)

 |-- _hoodie_file_name: string (nullable = true)

 |-- begin_lat: double (nullable = true)

 |-- begin_lon: double (nullable = true)

 |-- driver: string (nullable = true)

 |-- end_lat: double (nullable = true)

 |-- end_lon: double (nullable = true)

 |-- fare: double (nullable = true)

 |-- partitionpath: string (nullable = true)

 |-- rider: string (nullable = true)

 |-- ts: long (nullable = true)

 |-- uuid: string (nullable = true)

 |-- continent: string (nullable = true)

 |-- country: string (nullable = true)

 |-- city: string (nullable = true)

 

 

Lets run this with 0.9.0.
{code:java}
import org.apache.hudi.QuickstartUtils._import 
scala.collection.JavaConversions._import org.apache.spark.sql.SaveMode._import 
org.apache.hudi.DataSourceReadOptions._import 
org.apache.hudi.DataSourceWriteOptions._import 
org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"val basePath = "file:///tmp/hudi_trips_cow"val 
dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath", 
"(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city="))

newDf.write.format("hudi").  options(getQuickstartWriteConfigs).  
option(PRECOMBINE_FIELD_OPT_KEY, "ts").  option(RECORDKEY_FIELD_OPT_KEY, 
"uuid").  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").  
option(TABLE_NAME, tableName).  mode(Overwrite).  save(basePath)

val tripsSnapshotDF = spark.
     |   read.
     |   format("hudi").
     |   load(basePath )

tripsSnapshotDF.printSchema {code}
/output: continent, country, city is missing. 

root

 |-- _hoodie_commit_time: string (nullable = true)

 |-- _hoodie_commit_seqno: string (nullable = true)

 |-- _hoodie_record_key: string (nullable = true)

 |-- _hoodie_partition_path: string (nullable = true)

 |-- _hoodie_file_name: string (nullable = true)

 |-- begin_lat: double (nullable = true)

 |-- begin_lon: double (nullable = true)

 |-- driver: string (nullable = true)

 |-- end_lat: double (nullable = true)

 |-- end_lon: double (nullable = true)

 |-- fare: double (nullable = true)

 |-- rider: string (nullable = true)

 |-- ts: long (nullable = true)

 |-- uuid: string (nullable = true)

 |-- partitionpath: string (nullable = true)

 

Ref issue: [https://github.com/apache/hudi/issues/3984]

 

 

 

 


> spark auto partition discovery does not work from 0.9.0
> -------------------------------------------------------
>
>                 Key: HUDI-3065
>                 URL: https://issues.apache.org/jira/browse/HUDI-3065
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Spark Integration
>            Reporter: sivabalan narayanan
>            Assignee: Yann Byron
>            Priority: Major
>              Labels: core-flow-ds, sev:critical, spark
>             Fix For: 0.10.1
>
>
> with 0.8.0, if partition is of the format  "/partitionKey=partitionValue", 
> Spark auto partition discovery will kick in. we can see explicit fields in 
> hudi's table schema. 
> But with 0.9.0, it does not happen. 
> // launch spark shell with 0.8.0 
> {code:scala}
> import org.apache.hudi.QuickstartUtils._
> import scala.collection.JavaConversions._
> import org.apache.spark.sql.SaveMode._
> import org.apache.hudi.DataSourceReadOptions._
> import org.apache.hudi.DataSourceWriteOptions._
> import org.apache.hudi.config.HoodieWriteConfig._
> val tableName = "hudi_trips_cow"
> val basePath = "file:///tmp/hudi_trips_cow"
> val dataGen = new DataGenerator
> val inserts = convertToStringList(dataGen.generateInserts(10))
> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
> val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath", 
> "(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city="))
> newDf.write.format("hudi").
> options(getQuickstartWriteConfigs).
> option(PRECOMBINE_FIELD_OPT_KEY, "ts").
> option(RECORDKEY_FIELD_OPT_KEY, "uuid").
> option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
> option(TABLE_NAME, tableName).
> mode(Overwrite).save(basePath)
> val tripsSnapshotDF = spark.
>         read.
>         format("hudi").
>         load(basePath)
> tripsSnapshotDF.printSchema
> {code}
> // output : check for continent, country, city in the end.
> {code}
> |– _hoodie_commit_time: string (nullable = true)|
>  |-- _hoodie_commit_seqno: string (nullable = true)
>  |-- _hoodie_record_key: string (nullable = true)
>  |-- _hoodie_partition_path: string (nullable = true)
>  |-- _hoodie_file_name: string (nullable = true)
>  |-- begin_lat: double (nullable = true)
>  |-- begin_lon: double (nullable = true)
>  |-- driver: string (nullable = true)
>  |-- end_lat: double (nullable = true)
>  |-- end_lon: double (nullable = true)
>  |-- fare: double (nullable = true)
>  |-- partitionpath: string (nullable = true)
>  |-- rider: string (nullable = true)
>  |-- ts: long (nullable = true)
>  |-- uuid: string (nullable = true)
>  |-- continent: string (nullable = true)
>  |-- country: string (nullable = true)
>  |-- city: string (nullable = true)
>  {code}
>  
> Lets run this with 0.9.0.
> {code:scala}
> import org.apache.hudi.QuickstartUtils._
> import scala.collection.JavaConversions._
> import org.apache.spark.sql.SaveMode._
> import org.apache.hudi.DataSourceReadOptions._
> import org.apache.hudi.DataSourceWriteOptions._
> import org.apache.hudi.config.HoodieWriteConfig._
> val tableName = "hudi_trips_cow"
> val basePath = "file:///tmp/hudi_trips_cow"
> val dataGen = new DataGenerator
> val inserts = convertToStringList(dataGen.generateInserts(10))
> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
> val newDf = df.withColumn("partitionpath", regexp_replace($"partitionpath", 
> "(.*)(\\/){1}(.*)(\\/){1}", "continent=$1$2country=$3$4city="))
> newDf.write.format("hudi").  
> options(getQuickstartWriteConfigs).  
> option(PRECOMBINE_FIELD_OPT_KEY, "ts").  
> option(RECORDKEY_FIELD_OPT_KEY, "uuid").  
> option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").  
> option(TABLE_NAME, tableName).  
> mode(Overwrite).  save(basePath)
> val tripsSnapshotDF = spark.
>      |   read.
>      |   format("hudi").
>      |   load(basePath )
> tripsSnapshotDF.printSchema
> {code}
> //output: continent, country, city is missing. 
> {code}
> root
>  |-- _hoodie_commit_time: string (nullable = true)
>  |-- _hoodie_commit_seqno: string (nullable = true)
>  |-- _hoodie_record_key: string (nullable = true)
>  |-- _hoodie_partition_path: string (nullable = true)
>  |-- _hoodie_file_name: string (nullable = true)
>  |-- begin_lat: double (nullable = true)
>  |-- begin_lon: double (nullable = true)
>  |-- driver: string (nullable = true)
>  |-- end_lat: double (nullable = true)
>  |-- end_lon: double (nullable = true)
>  |-- fare: double (nullable = true)
>  |-- rider: string (nullable = true)
>  |-- ts: long (nullable = true)
>  |-- uuid: string (nullable = true)
>  |-- partitionpath: string (nullable = true)
>  {code}
> Ref issue: [https://github.com/apache/hudi/issues/3984]
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to