I searched [1] using the keywords “reliable” and got nothing, so I cannot draw the same conclusion as you.
If an implementation claims to support reliable storage, it should inherit interface ShuffleDriverComponents and override method supportsReliableStorage [2] to return true, for example, Apache Celeborn [3], a Remote Shuffle Service for Spark. Thanks, Cheng Pan [1] https://spark.apache.org/docs/latest/running-on-kubernetes.html#local-storage [2] https://github.com/apache/spark/blob/v3.5.2/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java#L65-L72 [3] https://github.com/apache/celeborn/blob/v0.5.1/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/CelebornShuffleDataIO.java#L56 > On Aug 20, 2024, at 18:42, Aaron Grubb <aa...@kaden.ai> wrote: > > Hi Cheng, > > Due to the documentation [1]. This is why I suggested at the end of the > message you replied to that documentation should be updated or > clarified. Can you explain how persistent volume claims in Kubernetes are > "unreliable" storage? > > Thanks, > Aaron > > https://spark.apache.org/docs/latest/running-on-kubernetes.html#local-storage > > On Tue, 2024-08-20 at 18:37 +0800, Cheng Pan wrote: >> org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO does NOT support >> reliable storage, so the condition 4) is false even with this >> configuration. >> I’m not sure why you think it does. >> >> Thanks, >> Cheng Pan >> >>> On Aug 20, 2024, at 18:27, Aaron Grubb <aa...@kaden.ai> wrote: >>> >>> Adding spark.shuffle.useOldFetchProtocol=true changed the outcome of the >>> job however it still was not stable in the face of spot >>> instances >>> going away. Adding spark.decommission.enabled=true, >>> spark.storage.decommission.enabled=true and >>> spark.executor.decommission.killInterval=110 >>> appears to have completely stabilized the job (not sure which did the trick >>> as I added them at the same time). Perhaps extra >>> documentation or >>> clarifications should be added as it doesn't seem clear to me how to >>> arrivate at job stability using dynamic allocation without trial and >>> error. >>> >>> On Mon, 2024-08-19 at 13:01 +0000, Aaron Grubb wrote: >>>> Hi all, >>>> >>>> I'm running Spark on Kubernetes on AWS using only spot instances for >>>> executors with dynamic allocation enabled. This particular job is >>>> being >>>> triggered by Airflow and it hit this bug [1] 6 times in a row. However, I >>>> had recently switched to using PersistentVolumeClaims in >>>> Spark >>>> with >>>> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO >>>> but kept >>>> spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see >>>> under the notes for spark.dynamicAllocation.enabled [2] that >>>> these >>>> configurations are "or" not "and". However, when setting >>>> spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with >>>> the >>>> message >>>> >>>> org.apache.spark.SparkException: Dynamic allocation of executors requires >>>> one of the following conditions: 1) enabling external shuffle >>>> service through spark.shuffle.service.enabled. 2) enabling shuffle >>>> tracking through spark.dynamicAllocation.shuffleTracking.enabled. 3) >>>> enabling shuffle blocks decommission through spark.decommission.enabled >>>> and spark.storage.decommission.shuffleBlocks.enabled. 4) >>>> (Experimental) configuring spark.shuffle.sort.io.plugin.class to use a >>>> custom ShuffleDataIO who's ShuffleDriverComponents supports >>>> reliable >>>> storage. >>>> >>>> Am I hitting this bug unavoidably? Or is there a configuration I'm missing >>>> to enable >>>> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO >>>> to replace >>>> spark.dynamicAllocation.shuffleTracking.enabled=true? >>>> >>>> Using Spark 3.5.1 - here's my full spark-defaults.conf just in case >>>> >>>> spark.checkpoint.compress >>>> true >>>> spark.driver.cores >>>> 1 >>>> spark.driver.maxResultSize >>>> 2g >>>> spark.driver.memory >>>> 5140m >>>> spark.dynamicAllocation.enabled >>>> true >>>> spark.dynamicAllocation.executorAllocationRatio >>>> 0.33 >>>> spark.dynamicAllocation.maxExecutors >>>> 20 >>>> spark.dynamicAllocation.sustainedSchedulerBacklogTimeout >>>> 30 >>>> spark.eventLog.enabled >>>> true >>>> spark.executor.cores >>>> 3 >>>> spark.executor.logs.rolling.enableCompression >>>> true >>>> spark.executor.logs.rolling.maxRetainedFiles >>>> 48 >>>> spark.executor.logs.rolling.strategy >>>> time >>>> spark.executor.logs.rolling.time.interval >>>> hourly >>>> spark.hadoop.fs.s3a.impl >>>> org.apache.hadoop.fs.s3a.S3AFileSystem >>>> spark.hadoop.fs.s3a.connection.ssl.enabled >>>> false >>>> spark.hadoop.fs.s3a.fast.upload >>>> true >>>> spark.kryo.registrationRequired >>>> false >>>> spark.kryo.unsafe >>>> false >>>> spark.kryoserializer.buffer >>>> 1m >>>> spark.kryoserializer.buffer.max >>>> 1g >>>> spark.kubernetes.driver.limit.cores >>>> 750m >>>> spark.kubernetes.driver.ownPersistentVolumeClaim >>>> true >>>> spark.kubernetes.driver.request.cores >>>> 750m >>>> spark.kubernetes.driver.reusePersistentVolumeClaim >>>> true >>>> spark.kubernetes.driver.waitToReusePersistentVolumeClaim >>>> true >>>> spark.kubernetes.executor.limit.cores >>>> 3700m >>>> spark.kubernetes.executor.request.cores >>>> 3700m >>>> spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName >>>> OnDemand >>>> spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path >>>> /data/spark-x/executor-x >>>> spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly >>>> false >>>> spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit >>>> 20Gi >>>> spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass >>>> ebs-sc >>>> spark.kubernetes.namespace >>>> spark >>>> spark.serializer >>>> org.apache.spark.serializer.KryoSerializer >>>> spark.shuffle.sort.io.plugin.class >>>> org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO >>>> spark.sql.orc.compression.codec >>>> zlib >>>> spark.sql.pyspark.jvmStacktrace.enabled >>>> true >>>> spark.sql.sources.partitionOverwriteMode >>>> dynamic >>>> spark.sql.streaming.kafka.useDeprecatedOffsetFetching >>>> false >>>> spark.submit.deployMode >>>> cluster >>>> >>>> Thanks, >>>> Aaron >>>> >>>> [1] >>>> https://issues.apache.org/jira/browse/SPARK-45858 >>>> [2] >>>> https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>> >>> >>> >>> --------------------------------------------------------------------- >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >> > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org