rangareddy commented on issue #12931:
URL: https://github.com/apache/hudi/issues/12931#issuecomment-2717687846

   Hi @robbik 
   
   I used the following code to reproduce this issue, but I was not able to 
reproduce it.
   
   ```sh
   export SPARK_VERSION=3.5
   
   spark-shell --packages 
org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:1.0.1 \
   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
   --conf 
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
 \
   --conf 
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
   --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
   ```
   
   ```scala
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.common.table.HoodieTableConfig._
   import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
   import org.apache.hudi.common.model.HoodieRecord
   import org.apache.hudi.config.HoodieCleanConfig
   import org.apache.hudi.config.HoodieIndexConfig
   import org.apache.hudi.config.HoodieCompactionConfig
   import org.apache.hudi.common.config.HoodieMetadataConfig
   import org.apache.hudi.hive.HiveSyncConfig
   import org.apache.hudi.common.table.HoodieTableConfig
   import org.apache.hudi.common.config.HoodieStorageConfig
   import org.apache.hudi.index.HoodieIndex
   import org.apache.hudi.common.model.HoodieTableType
   import org.apache.hudi.sync.common.HoodieSyncConfig
   import spark.implicits._
   
   val tableName = "trips_table"
   val basePath = "s3a://warehouse/trips_table"
   
   val columns = Seq("ts","uuid","rider","driver","fare","city")
   
   val data =
     
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
       
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70
 ,"san_francisco"),
       
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90
 ,"san_francisco"),
       
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"
    ),
       
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
   
   val hudiOptions = Map(
        "hoodie.write.record.merge.mode" -> "COMMIT_TIME_ORDERING", 
        "hoodie.datasource.write.partitionpath.field" -> "city", 
        "hoodie.table.name" -> tableName,
        "hoodie.datasource.write.table.type" -> "MERGE_ON_READ",
        "hoodie.metadata.enable" -> "true",
        "hoodie.metadata.index.column.stats.enable" -> "true",
        "hoodie.metadata.index.partition.stats.enable" -> "true",
        "hoodie.metadata.index.bloom.filter.enable" -> "true",
        "hoodie.metadata.record.index.enable" -> "true",
        "hoodie.metadata.record.index.max.filegroup.count" -> "500000000",
        "hoodie.metadata.record.index.min.filegroup.count" -> "100",
        HoodieIndexConfig.INDEX_TYPE.key() -> 
HoodieIndex.IndexType.RECORD_INDEX.name(),
        HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE.key() -> 
"true",
        HoodieIndexConfig.RECORD_INDEX_USE_CACHING.key() -> "true",
        HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key() -> 
"true",
        HoodieIndexConfig.BLOOM_INDEX_USE_CACHING.key() -> "true",
        HoodieIndexConfig.BLOOM_INDEX_USE_METADATA.key() -> "true",
        HoodieCleanConfig.CLEANER_POLICY.key() -> "KEEP_LATEST_COMMITS",
        HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "10",
        HoodieCompactionConfig.INLINE_COMPACT.key() -> "true",
        HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "20",
        HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0",
        HoodieMetadataConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key() -> "true",
        HoodieSyncConfig.META_SYNC_ENABLED.key() -> "false",
        HiveSyncConfig.HIVE_SYNC_ENABLED.key() -> "false",
        HoodieTableConfig.TYPE.key() -> HoodieTableType.MERGE_ON_READ.name(),
        HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy"
   )
   
   var insertsDF = spark.createDataFrame(data).toDF(columns:_*)
   insertsDF.write.format("hudi").
     options(hudiOptions).
     mode(Overwrite).
     save(basePath)
   
   spark.read.format("hudi").load(basePath).show()
   
   val deletesDF = spark.read.format("hudi").load(basePath).filter($"rider" === 
"rider-F")
   
   deletesDF.write.format("hudi").
     option("hoodie.datasource.write.operation", "delete").
     options(hudiOptions).
     mode(Append).
     save(basePath)
   
   spark.read.format("hudi").load(basePath).show()
   
   val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" === 
"rider-D").withColumn("fare", col("fare") * 10)
   
   updatesDf.write.format("hudi").
     option("hoodie.datasource.write.operation", "upsert").
     options(hudiOptions).
     mode(Append).
     save(basePath)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to