dmenin opened a new issue #3975: URL: https://github.com/apache/hudi/issues/3975
I have a situation where the data is partitioned by “year”, “month” and “day” and I need to enforce uniqueness across all partitions (a Key data can change from one day to the other). My first attempt was to use by GLOBAL_INDEX, which prevents data duplication, but is not scalable: as the amount of data grows, the load time also increases. So I am using a SIMPLE index and doing the “de-duplication” myself by identifying the rows on older partitions that are present on the insert dataset and deleting them. In other words, if I have key 123 on partition 10 and I receive key 123 again on partition 11, I delete the record from 10 and insert the one from 11. The delete\insert steps are made with 2 calls to the `df.write.format(HUDI_FORMAT)… `hudi command with the difference that on the insert, I use the “hoodie.datasource.write.operation” as “upsert” and on the delete, the operation is “delete”. PS: I have also tried using an alternative approach where I use the “upsert” write operation and the “org.apache.hudi.common.model.EmptyHoodieRecordPayload" for the “hoodie.datasource.write.payload.class” – and the result is exactly the same. I am looking for some guidance on why is the delete step taking too long. The insert (a few hundred thousand rows) happens in close to 30 seconds and the delete, which is less than 1000 rows takes more than 3 minutes. Looking at the spark logs, I see 3 jobs named "getting small files from partitions" (these are only for the delete operation) and their trace is basically the same: org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1557) org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:609) org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:274) org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164) org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185) org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223) org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220) org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181) org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134) org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133) org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989) org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110) org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) The only difference is the number of stages and tasks **Environment Description**  Any idea on why that is happening and what can I do so speed up the process? Thanks * Hudi version : 0.9 * Spark version : 3 * Storage (HDFS/S3/GCS..) : Aws S3 * Running on Docker? (yes/no) : no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
