dyang108 opened a new issue, #12975:
URL: https://github.com/apache/hudi/issues/12975

   **Describe the problem you faced**
   
   Running Deltastreamer with async compaction on a schedule takes continuously 
longer as dataset grows, compactions seem to start failing. We can't keep up on 
writing with our input Kafka topic, on a small dataset.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   I have a Kafka Avro topic that Deltastreamer reads and writes to an s3 
table. The data size is not crazy, but it seems the files created are too 
small. We have accumulated 112,889 files totaling 1.4TB in our S3 prefix, after 
running the job for 3 months
   
   Deltastreamer:
   ```
   /opt/spark/bin/spark-submit --class 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
       --master k8s://https://172.20.0.1:443 
       --deploy-mode cluster 
       --conf spark.kubernetes.namespace=deltastreamer 
       --conf spark.app.name=photo-inferences-deltastreamer-1741891516380541751 
       --conf 
spark.kubernetes.driver.pod.name=photo-inferences-deltastreamer-1741891516380541751-driver
 
       --conf spark.kubernetes.container.image.pullPolicy=Always 
       --conf spark.kubernetes.submission.waitAppCompletion=false 
       --conf 
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false
 
       --conf spark.executor.memory=40g 
       --conf spark.kubernetes.container.image.pullPolicy=Always 
       --conf spark.ui.prometheus.enabled=true 
       --conf 
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand
 
       --conf spark.driver.cores=2 
       --conf spark.eventLog.rolling.maxFileSize=128m 
       --conf spark.kubernetes.submission.waitAppCompletion=false 
       --conf spark.cores.max=10 
       --conf 
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=gp3-fastest
 
       --conf spark.ui.port=4040 
       --conf spark.sql.shuffle.partitions=4000 
       --conf 
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/tmp/dir1
 
       --conf spark.cassandra.output.consistency.level=QUORUM 
       --conf 
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
 
       --conf spark.eventLog.enabled=true 
       --conf spark.cassandra.input.consistency.level=QUORUM 
       --conf spark.master=k8s://https://foo.us-east-1.eks.amazonaws.com:443 
       --conf spark.eventLog.rolling.enabled=true 
       --conf spark.sql.parquet.int96RebaseModeInRead=CORRECTED 
       --conf spark.sql.parquet.datetimeRebaseModeInRead=CORRECTED 
       --conf spark.executor.instances=4 
       --conf spark.executor.cores=10 
       --conf spark.kubernetes.executor.request.cores=6 
       --conf spark.snowflake.sfUser=spark 
       --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
       --conf 
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension 
       --conf spark.jars.ivy=/tmp/.ivy2 
       --conf 
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog 
       --conf spark.driver.memory=4g 
       --conf spark.hadoop.fs.s3a.path.style.access=true 
       --conf 
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=200Gi
 
       --conf spark.kubernetes.executor.label.version=3.2.1 
       --conf spark.sql.parquet.datetimeRebaseModeInWrite=CORRECTED 
       --conf spark.sql.parquet.int96RebaseModeInWrite=CORRECTED 
       --conf spark.hudi.timeline.server.port=26754 
       --conf spark.metrics.namespace=unknown 
       --conf spark.ui.showConsoleProgress=true 
       --conf spark.default.parallelism=4000 
       --conf spark.kubernetes.resource.type=java 
       --conf spark.hadoop.fs.s3a.connection.ssl.enabled=false 
       --conf spark.hadoop.fs.s3a.connection.ssl.enabled=false 
       --conf 
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
 
       --conf 
spark.kubernetes.driver.label.sparkoperator.k8s.io/app-name=photo-inferences-deltastreamer-1741891516380541751
 
       --conf 
spark.kubernetes.driver.label.sparkoperator.k8s.io/launched-by-spark-operator=true
 
       --conf 
spark.kubernetes.driver.label.sparkoperator.k8s.io/submission-id=7c1bc0fd-d164-4bc0-9ec7-9e013a2ca300
 
       --conf spark.kubernetes.driver.container.image=<image>
       --conf spark.driver.cores=2 
       --conf spark.kubernetes.driver.request.cores=2 
       --conf 
spark.kubernetes.authenticate.driver.serviceAccountName=photo-inferences-deltastreamer
 
       --conf 
spark.driver.extraJavaOptions=-Dconfig.override_with_env_vars=true 
-Dconfig.file=/etc/spark/work-dir/conf/photo-inferences-deltastreamer.json 
       --conf spark.kubernetes.driver.label.version=3.3.0 
       --conf 
spark.kubernetes.driver.label.app.kubernetes.io/name=photo-inferences-deltastreamer
 
       --conf spark.kubernetes.driver.label.app.kubernetes.io/managed-by=Helm 
       --conf spark.kubernetes.driver.label.export-metrics-to-prometheus=true 
       --conf spark.kubernetes.driver.label.namespace=deltastreamer 
       --conf 
spark.kubernetes.driver.label.sparkoperator.k8s.io/scheduled-app-name=photo-inferences-deltastreamer
 
       --conf spark.kubernetes.driver.annotation.fluentbit.io/exclude=true 
       --conf 
spark.kubernetes.driver.annotation.karpenter.sh/do-not-disrupt=true 
       --conf spark.kubernetes.driver.annotation.sidecar.istio.io/inject=false 
       --conf 
spark.kubernetes.executor.label.sparkoperator.k8s.io/app-name=photo-inferences-deltastreamer-1741891516380541751
 
       --conf 
spark.kubernetes.executor.label.sparkoperator.k8s.io/launched-by-spark-operator=true
 
       --conf 
spark.kubernetes.executor.label.sparkoperator.k8s.io/submission-id=7c1bc0fd-d164-4bc0-9ec7-9e013a2ca300
 
       --conf spark.kubernetes.executor.container.image=<image>
       --conf spark.executor.cores=10 
       --conf spark.kubernetes.executor.request.cores=6 
       --conf 
spark.kubernetes.authenticate.executor.serviceAccountName=photo-inferences-deltastreamer
 
       --conf spark.kubernetes.executor.label.app.kubernetes.io/managed-by=Helm 
       --conf spark.kubernetes.executor.label.export-metrics-to-prometheus=true 
       --conf spark.kubernetes.executor.label.namespace=deltastreamer 
       --conf 
spark.kubernetes.executor.label.sparkoperator.k8s.io/scheduled-app-name=photo-inferences-deltastreamer
 
       --conf spark.kubernetes.executor.label.version=3.3.0 
       --conf 
spark.kubernetes.executor.annotation.sidecar.istio.io/inject=false 
       --conf spark.kubernetes.executor.annotation.fluentbit.io/exclude=true 
       --conf 
spark.kubernetes.executor.annotation.karpenter.sh/do-not-disrupt=true 
       --conf 
spark.executor.extraJavaOptions=-Dconfig.override_with_env_vars=true 
-Dconfig.file=/configmap-data/photo-inferences-deltastreamer.json 
-XX:+UnlockExperimentalVMOptions -XX:+UseG1GC -XX:MaxGCPauseMillis=500 
       --conf spark.kubernetes.node.selector.karpenter.sh/nodepool=spark 
local:///etc/spark/jars/deltastreamer-server_2.12-0.0.1-SNAPSHOT.jar 
       --props /kafka-source.properties 
       --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider 
       --source-class org.apache.hudi.utilities.sources.AvroKafkaSource 
       --target-base-path s3a://my-bucket/photo-data/hudi 
       --target-table photo_inf 
       --op UPSERT 
       --source-ordering-field ts 
       --table-type MERGE_ON_READ 
       --source-limit 1000000 
       --max-pending-compactions 2 
       --hoodie-conf hoodie.datasource.compaction.async.enable=true 
       --hoodie-conf auto.offset.reset=earliest 
       --hoodie-conf hoodie.filesystem.view.remote.port=26754 --hoodie-conf 
hoodie.datasource.compaction.async.enable=true
   ```
   
   Compactor:
   ```
   /opt/spark/bin/spark-submit 
       --class org.apache.hudi.utilities.HoodieCompactor 
       --master k8s://https://172.20.0.1:443 
       --deploy-mode cluster 
       --conf spark.kubernetes.namespace=deltastreamer 
       --conf spark.app.name=photo-inferences-compactor-1741645510295959759 
       --conf 
spark.kubernetes.driver.pod.name=photo-inferences-compactor-1741645510295959759-driver
 
       --conf spark.kubernetes.container.image.pullPolicy=Always 
       --conf spark.kubernetes.submission.waitAppCompletion=false 
       --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
       --conf 
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog 
       --conf spark.hadoop.fs.s3a.connection.ssl.enabled=false 
       --conf spark.sql.parquet.datetimeRebaseModeInWrite=CORRECTED 
       --conf spark.cassandra.input.consistency.level=QUORUM 
       --conf spark.hadoop.fs.s3a.path.style.access=true 
       --conf spark.kubernetes.resource.type=java 
       --conf 
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand
 
       --conf spark.ui.showConsoleProgress=true 
       --conf spark.executor.instances=4 
       --conf spark.jars.ivy=/tmp/.ivy2 
       --conf spark.driver.memory=24g 
       --conf spark.executor.cores=4 
       --conf spark.sql.parquet.int96RebaseModeInRead=CORRECTED 
       --conf spark.sql.parquet.int96RebaseModeInWrite=CORRECTED 
       --conf spark.kubernetes.container.image.pullPolicy=Always 
       --conf spark.kubernetes.submission.waitAppCompletion=false 
       --conf 
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension 
       --conf spark.kubernetes.executor.label.version=3.2.1 
       --conf spark.ui.port=4040 
       --conf 
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=gp3-fastest
 
       --conf spark.eventLog.rolling.maxFileSize=128m 
       --conf spark.ui.prometheus.enabled=true 
       --conf 
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
 
       --conf spark.cores.max=16 
       --conf spark.executor.memory=24g 
       --conf 
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false
 
       --conf spark.driver.cores=2 
       --conf spark.hudi.timeline.server.port=26754 
       --conf spark.sql.shuffle.partitions=4000 
       --conf spark.master=k8s://https://foo.us-east-1.eks.amazonaws.com:443 
       --conf spark.snowflake.sfUser=spark 
       --conf spark.default.parallelism=4000 
       --conf 
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=200Gi
 
       --conf spark.sql.parquet.datetimeRebaseModeInRead=CORRECTED 
       --conf spark.cassandra.output.consistency.level=QUORUM 
       --conf spark.eventLog.rolling.enabled=true 
       --conf spark.eventLog.enabled=true 
       --conf spark.metrics.namespace=unknown 
       --conf 
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/tmp/dir1
 
       --conf 
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
 
       --conf spark.hadoop.fs.s3a.connection.ssl.enabled=false 
       --conf 
spark.kubernetes.driver.label.sparkoperator.k8s.io/app-name=photo-inferences-compactor-1741645510295959759
 
       --conf 
spark.kubernetes.driver.label.sparkoperator.k8s.io/launched-by-spark-operator=true
 
       --conf 
spark.kubernetes.driver.label.sparkoperator.k8s.io/submission-id=477b19e0-cd38-4711-bb08-7b28db176c23
 
       --conf spark.kubernetes.driver.container.image=<image> 
       --conf spark.driver.cores=2 
       --conf spark.kubernetes.driver.request.cores=2 
       --conf 
spark.kubernetes.authenticate.driver.serviceAccountName=photo-inferences-compactor
 
       --conf 
spark.driver.extraJavaOptions=-Dconfig.override_with_env_vars=true 
-Dconfig.file=/etc/spark/work-dir/conf/photo-inferences-compactor.json 
       --conf spark.kubernetes.driver.label.version=3.3.0 
       --conf 
spark.kubernetes.driver.label.app.kubernetes.io/name=photo-inferences-compactor 
       --conf spark.kubernetes.driver.label.app.kubernetes.io/managed-by=Helm 
       --conf spark.kubernetes.driver.label.export-metrics-to-prometheus=true 
       --conf spark.kubernetes.driver.label.namespace=deltastreamer 
       --conf 
spark.kubernetes.driver.label.sparkoperator.k8s.io/scheduled-app-name=photo-inferences-compactor
 
       --conf spark.kubernetes.driver.annotation.fluentbit.io/exclude=true 
       --conf 
spark.kubernetes.driver.annotation.karpenter.sh/do-not-disrupt=true 
       --conf spark.kubernetes.driver.annotation.sidecar.istio.io/inject=false 
       --conf 
spark.kubernetes.executor.label.sparkoperator.k8s.io/app-name=photo-inferences-compactor-1741645510295959759
 
       --conf 
spark.kubernetes.executor.label.sparkoperator.k8s.io/launched-by-spark-operator=true
 
       --conf 
spark.kubernetes.executor.label.sparkoperator.k8s.io/submission-id=477b19e0-cd38-4711-bb08-7b28db176c23
 
       --conf spark.kubernetes.executor.container.image=<image> 
       --conf spark.executor.cores=4 
       --conf spark.kubernetes.executor.request.cores=4 
       --conf 
spark.kubernetes.authenticate.executor.serviceAccountName=photo-inferences-compactor
 
       --conf spark.kubernetes.executor.label.namespace=deltastreamer 
       --conf 
spark.kubernetes.executor.label.sparkoperator.k8s.io/scheduled-app-name=photo-inferences-compactor
 
       --conf spark.kubernetes.executor.label.version=3.3.0 
       --conf spark.kubernetes.executor.label.app.kubernetes.io/managed-by=Helm 
       --conf spark.kubernetes.executor.label.export-metrics-to-prometheus=true 
       --conf spark.kubernetes.executor.annotation.fluentbit.io/exclude=true 
       --conf 
spark.kubernetes.executor.annotation.karpenter.sh/do-not-disrupt=true 
       --conf 
spark.kubernetes.executor.annotation.sidecar.istio.io/inject=false 
       --conf 
spark.executor.extraJavaOptions=-Dconfig.override_with_env_vars=true 
-Dconfig.file=/configmap-data/photo-inferences-compactor.json 
-XX:+UnlockExperimentalVMOptions -XX:+UseG1GC -XX:MaxGCPauseMillis=500 
       --conf spark.kubernetes.node.selector.karpenter.sh/nodepool=spark 
local:///etc/spark/jars/deltastreamer-server_2.12-0.0.1-SNAPSHOT.jar 
       --base-path s3a://my-bucket/photo-data/hudi 
       --table-name photo_inf
   ```
   kafka-source.properties:
   ```
   hoodie.upsert.shuffle.parallelism=4
   hoodie.insert.shuffle.parallelism=4
   hoodie.delete.shuffle.parallelism=4
   hoodie.bulkinsert.shuffle.parallelism=4
   hoodie.schema.cache.enable=true
   hoodie.datasource.write.recordkey.field=mediaId
   hoodie.datasource.write.partitionpath.field=partition
   hoodie.deltastreamer.schemaprovider.registry.url=<my-schema-registry>
   hoodie.deltastreamer.source.kafka.topic=<input-topic>
   hoodie.deltastreamer.source.kafka.group.id=<input-consumer-group>
   
   bootstrap.servers=<my-kafka-bootstrap-servers>
   auto.offset.reset=latest
   schema.registry.url=<my-schema-registry>
   ```
   
   The `partition` column is an autoincrement userId % 10000 to create an even 
distribution.
   The `ts` is the processing time on the producer of the input-topic.
   
   I'm running compaction every 2 hours, and running deltastreamer without 
continuous mode every 6 hours (if a deltastreamer is already running, we don't 
overwrite the existing instance).
   
   I've tried a healthy number of combinations of inline compactions. (ie 
[hoodie.compact.schedule.inline](https://hudi.apache.org/docs/configurations/#hoodiecompactscheduleinline)
 
[hoodie.compact.inline.max.delta.commits](https://hudi.apache.org/docs/configurations/#hoodiecompactinlinemaxdeltacommits)
 hoodie.compact.inline hoodie.compaction.strategy, etc) - they all seem to 
eventually fall into difficulty a few weeks along in the pipeline.
   
   The last successful compaction requested was 
20241207093706000.compaction.requested. And the inflight: 
20241207093706000.compaction.inflight.
   
   The last successful Deltastreamer run took 56 hours, a few days ago. The 
runtime of the deltastreamer job has been steadily increasing since deploying 
this.
   
   I'm also seeing failed Deltastreamer runs after the last success, failing 
with Caused by: java.io.FileNotFoundException: No such file or directory: 
s3a://my-bucket/photo-data/hudi/9868/ab48667d-ac44-460a-850a-bc413f42e540-0_50-96-94348_20250312191529445.parquet
   
   
   **Expected behavior**
   
   Deltastreamer should not take this long, especially given mostly-append 
nature of the incoming data. Compaction should be successfully completing.
   
   **Environment Description**
   
   * Hudi version : 0.15.0
   
   * Spark version : 3.3.0
   
   * Hive version : 2.3.9
   
   * Hadoop version : 3.3.6
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : yes
   
   
   **Additional context**
   
   Chatted with @vinothchandar at Onehouse about this issue.
   
   There's certainly something faulty with my configuration but would love some 
help on this issue.
   
   
   
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to