dyang108 opened a new issue, #12975: URL: https://github.com/apache/hudi/issues/12975
**Describe the problem you faced** Running Deltastreamer with async compaction on a schedule takes continuously longer as dataset grows, compactions seem to start failing. We can't keep up on writing with our input Kafka topic, on a small dataset. **To Reproduce** Steps to reproduce the behavior: I have a Kafka Avro topic that Deltastreamer reads and writes to an s3 table. The data size is not crazy, but it seems the files created are too small. We have accumulated 112,889 files totaling 1.4TB in our S3 prefix, after running the job for 3 months Deltastreamer: ``` /opt/spark/bin/spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --master k8s://https://172.20.0.1:443 --deploy-mode cluster --conf spark.kubernetes.namespace=deltastreamer --conf spark.app.name=photo-inferences-deltastreamer-1741891516380541751 --conf spark.kubernetes.driver.pod.name=photo-inferences-deltastreamer-1741891516380541751-driver --conf spark.kubernetes.container.image.pullPolicy=Always --conf spark.kubernetes.submission.waitAppCompletion=false --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false --conf spark.executor.memory=40g --conf spark.kubernetes.container.image.pullPolicy=Always --conf spark.ui.prometheus.enabled=true --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand --conf spark.driver.cores=2 --conf spark.eventLog.rolling.maxFileSize=128m --conf spark.kubernetes.submission.waitAppCompletion=false --conf spark.cores.max=10 --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=gp3-fastest --conf spark.ui.port=4040 --conf spark.sql.shuffle.partitions=4000 --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/tmp/dir1 --conf spark.cassandra.output.consistency.level=QUORUM --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider --conf spark.eventLog.enabled=true --conf spark.cassandra.input.consistency.level=QUORUM --conf spark.master=k8s://https://foo.us-east-1.eks.amazonaws.com:443 --conf spark.eventLog.rolling.enabled=true --conf spark.sql.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.executor.instances=4 --conf spark.executor.cores=10 --conf spark.kubernetes.executor.request.cores=6 --conf spark.snowflake.sfUser=spark --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension --conf spark.jars.ivy=/tmp/.ivy2 --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog --conf spark.driver.memory=4g --conf spark.hadoop.fs.s3a.path.style.access=true --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=200Gi --conf spark.kubernetes.executor.label.version=3.2.1 --conf spark.sql.parquet.datetimeRebaseModeInWrite=CORRECTED --conf spark.sql.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.hudi.timeline.server.port=26754 --conf spark.metrics.namespace=unknown --conf spark.ui.showConsoleProgress=true --conf spark.default.parallelism=4000 --conf spark.kubernetes.resource.type=java --conf spark.hadoop.fs.s3a.connection.ssl.enabled=false --conf spark.hadoop.fs.s3a.connection.ssl.enabled=false --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider --conf spark.kubernetes.driver.label.sparkoperator.k8s.io/app-name=photo-inferences-deltastreamer-1741891516380541751 --conf spark.kubernetes.driver.label.sparkoperator.k8s.io/launched-by-spark-operator=true --conf spark.kubernetes.driver.label.sparkoperator.k8s.io/submission-id=7c1bc0fd-d164-4bc0-9ec7-9e013a2ca300 --conf spark.kubernetes.driver.container.image=<image> --conf spark.driver.cores=2 --conf spark.kubernetes.driver.request.cores=2 --conf spark.kubernetes.authenticate.driver.serviceAccountName=photo-inferences-deltastreamer --conf spark.driver.extraJavaOptions=-Dconfig.override_with_env_vars=true -Dconfig.file=/etc/spark/work-dir/conf/photo-inferences-deltastreamer.json --conf spark.kubernetes.driver.label.version=3.3.0 --conf spark.kubernetes.driver.label.app.kubernetes.io/name=photo-inferences-deltastreamer --conf spark.kubernetes.driver.label.app.kubernetes.io/managed-by=Helm --conf spark.kubernetes.driver.label.export-metrics-to-prometheus=true --conf spark.kubernetes.driver.label.namespace=deltastreamer --conf spark.kubernetes.driver.label.sparkoperator.k8s.io/scheduled-app-name=photo-inferences-deltastreamer --conf spark.kubernetes.driver.annotation.fluentbit.io/exclude=true --conf spark.kubernetes.driver.annotation.karpenter.sh/do-not-disrupt=true --conf spark.kubernetes.driver.annotation.sidecar.istio.io/inject=false --conf spark.kubernetes.executor.label.sparkoperator.k8s.io/app-name=photo-inferences-deltastreamer-1741891516380541751 --conf spark.kubernetes.executor.label.sparkoperator.k8s.io/launched-by-spark-operator=true --conf spark.kubernetes.executor.label.sparkoperator.k8s.io/submission-id=7c1bc0fd-d164-4bc0-9ec7-9e013a2ca300 --conf spark.kubernetes.executor.container.image=<image> --conf spark.executor.cores=10 --conf spark.kubernetes.executor.request.cores=6 --conf spark.kubernetes.authenticate.executor.serviceAccountName=photo-inferences-deltastreamer --conf spark.kubernetes.executor.label.app.kubernetes.io/managed-by=Helm --conf spark.kubernetes.executor.label.export-metrics-to-prometheus=true --conf spark.kubernetes.executor.label.namespace=deltastreamer --conf spark.kubernetes.executor.label.sparkoperator.k8s.io/scheduled-app-name=photo-inferences-deltastreamer --conf spark.kubernetes.executor.label.version=3.3.0 --conf spark.kubernetes.executor.annotation.sidecar.istio.io/inject=false --conf spark.kubernetes.executor.annotation.fluentbit.io/exclude=true --conf spark.kubernetes.executor.annotation.karpenter.sh/do-not-disrupt=true --conf spark.executor.extraJavaOptions=-Dconfig.override_with_env_vars=true -Dconfig.file=/configmap-data/photo-inferences-deltastreamer.json -XX:+UnlockExperimentalVMOptions -XX:+UseG1GC -XX:MaxGCPauseMillis=500 --conf spark.kubernetes.node.selector.karpenter.sh/nodepool=spark local:///etc/spark/jars/deltastreamer-server_2.12-0.0.1-SNAPSHOT.jar --props /kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider --source-class org.apache.hudi.utilities.sources.AvroKafkaSource --target-base-path s3a://my-bucket/photo-data/hudi --target-table photo_inf --op UPSERT --source-ordering-field ts --table-type MERGE_ON_READ --source-limit 1000000 --max-pending-compactions 2 --hoodie-conf hoodie.datasource.compaction.async.enable=true --hoodie-conf auto.offset.reset=earliest --hoodie-conf hoodie.filesystem.view.remote.port=26754 --hoodie-conf hoodie.datasource.compaction.async.enable=true ``` Compactor: ``` /opt/spark/bin/spark-submit --class org.apache.hudi.utilities.HoodieCompactor --master k8s://https://172.20.0.1:443 --deploy-mode cluster --conf spark.kubernetes.namespace=deltastreamer --conf spark.app.name=photo-inferences-compactor-1741645510295959759 --conf spark.kubernetes.driver.pod.name=photo-inferences-compactor-1741645510295959759-driver --conf spark.kubernetes.container.image.pullPolicy=Always --conf spark.kubernetes.submission.waitAppCompletion=false --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog --conf spark.hadoop.fs.s3a.connection.ssl.enabled=false --conf spark.sql.parquet.datetimeRebaseModeInWrite=CORRECTED --conf spark.cassandra.input.consistency.level=QUORUM --conf spark.hadoop.fs.s3a.path.style.access=true --conf spark.kubernetes.resource.type=java --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand --conf spark.ui.showConsoleProgress=true --conf spark.executor.instances=4 --conf spark.jars.ivy=/tmp/.ivy2 --conf spark.driver.memory=24g --conf spark.executor.cores=4 --conf spark.sql.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.kubernetes.container.image.pullPolicy=Always --conf spark.kubernetes.submission.waitAppCompletion=false --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension --conf spark.kubernetes.executor.label.version=3.2.1 --conf spark.ui.port=4040 --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=gp3-fastest --conf spark.eventLog.rolling.maxFileSize=128m --conf spark.ui.prometheus.enabled=true --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider --conf spark.cores.max=16 --conf spark.executor.memory=24g --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false --conf spark.driver.cores=2 --conf spark.hudi.timeline.server.port=26754 --conf spark.sql.shuffle.partitions=4000 --conf spark.master=k8s://https://foo.us-east-1.eks.amazonaws.com:443 --conf spark.snowflake.sfUser=spark --conf spark.default.parallelism=4000 --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=200Gi --conf spark.sql.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.cassandra.output.consistency.level=QUORUM --conf spark.eventLog.rolling.enabled=true --conf spark.eventLog.enabled=true --conf spark.metrics.namespace=unknown --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/tmp/dir1 --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider --conf spark.hadoop.fs.s3a.connection.ssl.enabled=false --conf spark.kubernetes.driver.label.sparkoperator.k8s.io/app-name=photo-inferences-compactor-1741645510295959759 --conf spark.kubernetes.driver.label.sparkoperator.k8s.io/launched-by-spark-operator=true --conf spark.kubernetes.driver.label.sparkoperator.k8s.io/submission-id=477b19e0-cd38-4711-bb08-7b28db176c23 --conf spark.kubernetes.driver.container.image=<image> --conf spark.driver.cores=2 --conf spark.kubernetes.driver.request.cores=2 --conf spark.kubernetes.authenticate.driver.serviceAccountName=photo-inferences-compactor --conf spark.driver.extraJavaOptions=-Dconfig.override_with_env_vars=true -Dconfig.file=/etc/spark/work-dir/conf/photo-inferences-compactor.json --conf spark.kubernetes.driver.label.version=3.3.0 --conf spark.kubernetes.driver.label.app.kubernetes.io/name=photo-inferences-compactor --conf spark.kubernetes.driver.label.app.kubernetes.io/managed-by=Helm --conf spark.kubernetes.driver.label.export-metrics-to-prometheus=true --conf spark.kubernetes.driver.label.namespace=deltastreamer --conf spark.kubernetes.driver.label.sparkoperator.k8s.io/scheduled-app-name=photo-inferences-compactor --conf spark.kubernetes.driver.annotation.fluentbit.io/exclude=true --conf spark.kubernetes.driver.annotation.karpenter.sh/do-not-disrupt=true --conf spark.kubernetes.driver.annotation.sidecar.istio.io/inject=false --conf spark.kubernetes.executor.label.sparkoperator.k8s.io/app-name=photo-inferences-compactor-1741645510295959759 --conf spark.kubernetes.executor.label.sparkoperator.k8s.io/launched-by-spark-operator=true --conf spark.kubernetes.executor.label.sparkoperator.k8s.io/submission-id=477b19e0-cd38-4711-bb08-7b28db176c23 --conf spark.kubernetes.executor.container.image=<image> --conf spark.executor.cores=4 --conf spark.kubernetes.executor.request.cores=4 --conf spark.kubernetes.authenticate.executor.serviceAccountName=photo-inferences-compactor --conf spark.kubernetes.executor.label.namespace=deltastreamer --conf spark.kubernetes.executor.label.sparkoperator.k8s.io/scheduled-app-name=photo-inferences-compactor --conf spark.kubernetes.executor.label.version=3.3.0 --conf spark.kubernetes.executor.label.app.kubernetes.io/managed-by=Helm --conf spark.kubernetes.executor.label.export-metrics-to-prometheus=true --conf spark.kubernetes.executor.annotation.fluentbit.io/exclude=true --conf spark.kubernetes.executor.annotation.karpenter.sh/do-not-disrupt=true --conf spark.kubernetes.executor.annotation.sidecar.istio.io/inject=false --conf spark.executor.extraJavaOptions=-Dconfig.override_with_env_vars=true -Dconfig.file=/configmap-data/photo-inferences-compactor.json -XX:+UnlockExperimentalVMOptions -XX:+UseG1GC -XX:MaxGCPauseMillis=500 --conf spark.kubernetes.node.selector.karpenter.sh/nodepool=spark local:///etc/spark/jars/deltastreamer-server_2.12-0.0.1-SNAPSHOT.jar --base-path s3a://my-bucket/photo-data/hudi --table-name photo_inf ``` kafka-source.properties: ``` hoodie.upsert.shuffle.parallelism=4 hoodie.insert.shuffle.parallelism=4 hoodie.delete.shuffle.parallelism=4 hoodie.bulkinsert.shuffle.parallelism=4 hoodie.schema.cache.enable=true hoodie.datasource.write.recordkey.field=mediaId hoodie.datasource.write.partitionpath.field=partition hoodie.deltastreamer.schemaprovider.registry.url=<my-schema-registry> hoodie.deltastreamer.source.kafka.topic=<input-topic> hoodie.deltastreamer.source.kafka.group.id=<input-consumer-group> bootstrap.servers=<my-kafka-bootstrap-servers> auto.offset.reset=latest schema.registry.url=<my-schema-registry> ``` The `partition` column is an autoincrement userId % 10000 to create an even distribution. The `ts` is the processing time on the producer of the input-topic. I'm running compaction every 2 hours, and running deltastreamer without continuous mode every 6 hours (if a deltastreamer is already running, we don't overwrite the existing instance). I've tried a healthy number of combinations of inline compactions. (ie [hoodie.compact.schedule.inline](https://hudi.apache.org/docs/configurations/#hoodiecompactscheduleinline) [hoodie.compact.inline.max.delta.commits](https://hudi.apache.org/docs/configurations/#hoodiecompactinlinemaxdeltacommits) hoodie.compact.inline hoodie.compaction.strategy, etc) - they all seem to eventually fall into difficulty a few weeks along in the pipeline. The last successful compaction requested was 20241207093706000.compaction.requested. And the inflight: 20241207093706000.compaction.inflight. The last successful Deltastreamer run took 56 hours, a few days ago. The runtime of the deltastreamer job has been steadily increasing since deploying this. I'm also seeing failed Deltastreamer runs after the last success, failing with Caused by: java.io.FileNotFoundException: No such file or directory: s3a://my-bucket/photo-data/hudi/9868/ab48667d-ac44-460a-850a-bc413f42e540-0_50-96-94348_20250312191529445.parquet **Expected behavior** Deltastreamer should not take this long, especially given mostly-append nature of the incoming data. Compaction should be successfully completing. **Environment Description** * Hudi version : 0.15.0 * Spark version : 3.3.0 * Hive version : 2.3.9 * Hadoop version : 3.3.6 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : yes **Additional context** Chatted with @vinothchandar at Onehouse about this issue. There's certainly something faulty with my configuration but would love some help on this issue. Add any other context about the problem here. **Stacktrace** ```Add the stacktrace of the error.``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org