hudi-bot opened a new issue, #17054:
URL: https://github.com/apache/hudi/issues/17054

   Using custom key generator throws an error when upsert is being performed:
   
   *Spark-SQL entrypoint:*
   {code:java}
   test("Create MOR table with custom keygen partition field") {
     withTempDir { tmp =>
       val tableName = "hudi_custom_keygen_pt_v8_mor"
   
       spark.sql(
         s"""
            |CREATE TABLE $tableName (
            |  id INT,
            |  name STRING,
            |  price DOUBLE,
            |  ts LONG,
            |  -- Partition Source Fields --
            |  part_country STRING,
            |  part_date BIGINT
            |) USING hudi
            | LOCATION '${tmp.getCanonicalPath}'
            | TBLPROPERTIES (
            |  primaryKey = 'id',
            |  type = 'mor',
            |  preCombineField = 'ts',
            |  -- Hive style partitioning needs to be disabled for timestamp 
keygen to work --
            |  hoodie.datasource.write.hive_style_partitioning = 'false',
            |  -- Timestamp Keygen and Partition Configs --
            |  hoodie.table.keygenerator.class = 
'org.apache.hudi.keygen.CustomKeyGenerator',
            |  hoodie.datasource.write.partitionpath.field = 
'part_country:simple,part_date:timestamp',
            |  hoodie.keygen.timebased.timestamp.type = 'EPOCHMILLISECONDS',
            |  hoodie.keygen.timebased.output.dateformat = 'yyyy-MM-dd',
            |  hoodie.keygen.timebased.timezone = 'UTC'
            | ) PARTITIONED BY (part_country, part_date)
      """.stripMargin)
   
       // RecordKey + partition
   
       // Configure Hudi properties
       spark.sql(s"SET hoodie.parquet.small.file.limit=0") // Write to a new 
parquet file for each commit
       spark.sql(s"SET hoodie.metadata.compact.max.delta.commits=1")
       spark.sql(s"SET hoodie.metadata.enable=true")
       spark.sql(s"SET hoodie.metadata.index.column.stats.enable=true")
   
       // Insert data with new partition values
       spark.sql(s"INSERT INTO $tableName VALUES(1, 'a1', 100.0, 1000, 'SG', 
1749284360000)")
       spark.sql(s"INSERT INTO $tableName VALUES(2, 'a2', 200.0, 1000, 'SG', 
1749204000000)")
       spark.sql(s"INSERT INTO $tableName VALUES(3, 'a3', 101.0, 1001, 'US', 
1749202000000)")
       spark.sql(s"INSERT INTO $tableName VALUES(4, 'a4', 201.0, 1001, 'CN', 
1749102000000)")
       spark.sql(s"INSERT INTO $tableName VALUES(5, 'a5', 300.0, 1002, 'MY', 
1747102000000)")
   
       // Generate logs through updates
       spark.sql(s"UPDATE $tableName SET price = ROUND(price * 1.02, 2)")
   
       spark.sql(s"SELECT * FROM $tableName").show(false)
     }
   }
   {code}
    
   
   Datasource-write entrypoint:
   {code:java}
   test("Create MOR table with custom keygen partition field") {
       withTempDir { tmp =>
         val tablePath = tmp.getCanonicalPath
         val tableName = "hudi_custom_keygen_pt_v8_mor"
   
         val hudiOptions = Map[String, String](
           HoodieWriteConfig.TBL_NAME.key -> tableName,
           DataSourceWriteOptions.TABLE_TYPE.key -> "MERGE_ON_READ",
           DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
           DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "ts",
   
           // Custom KeyGenerator and Partitioning Config
           DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
"org.apache.hudi.keygen.CustomKeyGenerator",
           DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> 
"part_country:SIMPLE,part_date:TIMESTAMP",
   
           // Timestamp-based Partitioning Configs
           "hoodie.keygen.timebased.timestamp.type" -> "EPOCHMILLISECONDS",
           "hoodie.keygen.timebased.output.dateformat" -> "yyyy-MM-dd",
           "hoodie.keygen.timebased.timezone" -> "UTC",
   
           // Disable Hive-style partitioning for CustomKeyGenerator to format 
the date partition
           DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "false",
   
           // Other configs from the SQL SET commands
           "hoodie.parquet.small.file.limit" -> "0",
           "hoodie.metadata.compact.max.delta.commits" -> "1",
           "hoodie.metadata.enable" -> "true",
           "hoodie.metadata.index.column.stats.enable" -> "true"
         )
   
         spark.createDataFrame(Seq(
           (1, "a1", 100.0, 1000L, "SG", 1749284360000L)
         )).toDF("id", "name", "price", "ts", "part_country", "part_date").write
           .format("hudi")
           .options(hudiOptions)
           .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
           .mode(SaveMode.Append)
           .save(tablePath)
   
         spark.createDataFrame(Seq(
             (2, "a2", 200.0, 1000L, "SG", 1749204000000L)
           )).toDF("id", "name", "price", "ts", "part_country", 
"part_date").write
           .format("hudi")
           .options(hudiOptions)
           .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
           .mode(SaveMode.Append)
           .save(tablePath)
   
         spark.createDataFrame(Seq(
             (3, "a3", 101.0, 1001L, "US", 1749202000000L)
           )).toDF("id", "name", "price", "ts", "part_country", 
"part_date").write
           .format("hudi")
           .options(hudiOptions)
           .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
           .mode(SaveMode.Append)
           .save(tablePath)
   
         spark.createDataFrame(Seq(
             (4, "a4", 201.0, 1001L, "CN", 1749102000000L)
           )).toDF("id", "name", "price", "ts", "part_country", 
"part_date").write
           .format("hudi")
           .options(hudiOptions)
           .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
           .mode(SaveMode.Append)
           .save(tablePath)
   
         spark.createDataFrame(Seq(
             (5, "a5", 300.0, 1002L, "MY", 1747102000000L)
           )).toDF("id", "name", "price", "ts", "part_country", 
"part_date").write
           .format("hudi")
           .options(hudiOptions)
           .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
           .mode(SaveMode.Append)
           .save(tablePath)
   
         // Read the current state of the table
         val hudiTableDF = spark.read.format("hudi").load(tablePath)
   
         // Apply the update logic
         val updatedDF = hudiTableDF.withColumn("price", 
functions.round(col("price") * 1.02, 2))
   
         // Write the updated data back
         updatedDF.write
           .format("hudi")
           .options(hudiOptions)
           .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
           .mode(SaveMode.Append)
           .save(tablePath)
   
         val finalDF = spark.read.format("hudi").load(tablePath)
   
         // Select relevant Hudi meta fields and all original columns to verify 
the update
         finalDF.select("_hoodie_commit_time", "id", "name", "price", "ts", 
"part_country", "part_date")
           .orderBy("id")
           .show(false)
   
       }
     } {code}
    
   
   Error:
   {code:java}
   Failed to cast value `2025-06-06` to `LongType` for partition column 
`part_date`
   java.lang.RuntimeException: Failed to cast value `2025-06-06` to `LongType` 
for partition column `part_date`
        at 
org.apache.spark.sql.execution.datasources.Spark3ParsePartitionUtil$.$anonfun$parsePartition$3(Spark3ParsePartitionUtil.scala:78)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108) 
{code}
    
   
    
   
   Same code has been tested on 0.12.x, 0.14.x, 0.15.x, 1.0.2. All of them have 
the same issue. 
   
   ## JIRA info
   
   - Link: https://issues.apache.org/jira/browse/HUDI-9519
   - Type: Bug


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to