Lujun-WC commented on issue #8391:
URL: https://github.com/apache/hudi/issues/8391#issuecomment-1499897104

   
   object DwdFoo6Order {
     def main(args: Array[String]): Unit = {
       val spark = SparkSession
         .builder()
         .config("spark.debug.maxToStringFields", "500")
         .config("spark.sql.debug.maxToStringFields", "500")
         .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
         .config("hive.exec.dynamic.partition", true)
         .config("hive.exec.dynamic.partition.mode", "nonstrict")
         .enableHiveSupport()
         .getOrCreate()
   
   
       import spark.implicits._
       val foo6KafkaDF: Dataset[Row] = spark
         .readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "10.10.10.1:9092")
         .option("subscribe", topic)
         .option("startingOffsets", "latest")
         .option("maxOffsetsPerTrigger", 1500000L)
         .option("failOnDataLoss", value = false)
         .load()
   
   
       val query = foo6KafkaDF
         .writeStream
         .queryName(queryName)
         .option("checkpointLocation", checkpointLocation)
         .trigger(Trigger.ProcessingTime(s"300 seconds"))
         .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
   
           //df processing
           val df = ...
   
         df.write.format("org.apache.hudi")
           .option(TBL_NAME.key(), hudiTable)
           .option(TABLE_TYPE.key(), HudiTable.COW)
           .option(OPERATION.key(), HudiWriteOpts.UPSERT)
           .option(RECORDKEY_FIELD.key(), "order_id")
           .option(PRECOMBINE_FIELD.key(), "sort_key")
           .option(PARTITIONPATH_FIELD.key(), "cdt,data_source")
           .option(WRITE_PAYLOAD_CLASS_NAME.key(), 
classOf[DefaultHoodieRecordPayload].getName)
           .option("hoodie.write.markers.type", "direct")
           .option("hoodie.index.type", "BLOOM")
           .option("hoodie.datasource.write.hive_style_partitioning", "true")
           .option("hoodie.insert.shuffle.parallelism", "150")
           .option("hoodie.bulkinsert.shuffle.parallelism", "150")
           .option("hoodie.upsert.shuffle.parallelism", "150")
           .option("hoodie.delete.shuffle.parallelism", "150")
           .mode(SaveMode.Append).save(basePath)
   
           batchDF.unpersist()
           ()
         }.start()
   
       query.awaitTermination()
     }
   
   }
   
   
   spark submit:
   spark-submit  --class com.example.DwdFoo6Order \
   --master yarn --deploy-mode cluster \
   --num-executors 4 --executor-cores 2 --executor-memory 8G \
   --packages 
org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.kafka:kafka-clients:2.8.0,com.alibaba:druid:1.2.6,mysql:mysql-connector-java:5.1.34
 \
   ./hudi-test.jar
   
   
   
   existing data size:
   cdt                   data_source  count(1)
   2023-03-16      foo6_standard   2
   2023-03-17      foo6_standard   2
   2023-03-18      foo6_standard   3
   2023-03-19      foo6_standard   3
   2023-03-20      foo6_standard   1
   2023-03-23      foo6_standard   3
   2023-03-24      foo6_standard   2
   2023-03-25      foo6_standard   1
   2023-03-26      foo6_standard   12
   2023-03-27      foo6_standard   12
   2023-03-28      foo6_standard   41
   2023-03-29      foo6_standard   70
   2023-03-30      foo6_standard   88
   2023-03-31      foo6_standard   301
   2023-04-01      foo6_standard   1613
   2023-04-02      foo6_standard   1828818
   2023-04-03      foo6_standard   1567815
   2023-04-04      foo6_standard   2269541
   2023-04-05      foo6_standard   2884449
   2023-04-06      foo6_standard   1933243
   2023-04-07      foo6_standard   304502
   
   The RECORDKEY_FIELD is order_id, which is a self-incrementing long type id. 
   The cdt represents the creation time of the order, and the creation time for 
the same order_id stays constant. 
   In each data batch (containing less than 100,000 records), 90% of the data 
belongs to the current day's partition, while 10% of the data updates the older 
partition
   
   The same data takes just 0.1 minutes to write to Hive, but writing to Hudi 
is unexpectedly slow. What could be the reason for this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to