I searched [1] using the keywords “reliable” and got nothing, so I cannot draw 
the same conclusion as you.

If an implementation claims to support reliable storage, it should inherit 
interface ShuffleDriverComponents and override method supportsReliableStorage 
[2] to return true, for example, Apache Celeborn [3], a Remote Shuffle Service 
for Spark.

Thanks,
Cheng Pan

[1] 
https://spark.apache.org/docs/latest/running-on-kubernetes.html#local-storage
[2] 
https://github.com/apache/spark/blob/v3.5.2/core/src/main/java/org/apache/spark/shuffle/api/ShuffleDriverComponents.java#L65-L72
[3] 
https://github.com/apache/celeborn/blob/v0.5.1/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/CelebornShuffleDataIO.java#L56

> On Aug 20, 2024, at 18:42, Aaron Grubb <aa...@kaden.ai> wrote:
> 
> Hi Cheng,
> 
> Due to the documentation [1]. This is why I suggested at the end of the 
> message you replied to that documentation should be updated or
> clarified. Can you explain how persistent volume claims in Kubernetes are 
> "unreliable" storage?
> 
> Thanks,
> Aaron
> 
> https://spark.apache.org/docs/latest/running-on-kubernetes.html#local-storage
> 
> On Tue, 2024-08-20 at 18:37 +0800, Cheng Pan wrote:
>> org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO does NOT support 
>> reliable storage, so the condition 4) is false even with this
>> configuration.
>> I’m not sure why you think it does.
>> 
>> Thanks,
>> Cheng Pan
>> 
>>> On Aug 20, 2024, at 18:27, Aaron Grubb <aa...@kaden.ai> wrote:
>>> 
>>> Adding spark.shuffle.useOldFetchProtocol=true changed the outcome of the 
>>> job however it still was not stable in the face of spot
>>> instances
>>> going away. Adding spark.decommission.enabled=true, 
>>> spark.storage.decommission.enabled=true and
>>> spark.executor.decommission.killInterval=110
>>> appears to have completely stabilized the job (not sure which did the trick 
>>> as I added them at the same time). Perhaps extra
>>> documentation or
>>> clarifications should be added as it doesn't seem clear to me how to 
>>> arrivate at job stability using dynamic allocation without trial and
>>> error.
>>> 
>>> On Mon, 2024-08-19 at 13:01 +0000, Aaron Grubb wrote:
>>>> Hi all,
>>>> 
>>>> I'm running Spark on Kubernetes on AWS using only spot instances for 
>>>> executors with dynamic allocation enabled. This particular job is
>>>> being
>>>> triggered by Airflow and it hit this bug [1] 6 times in a row. However, I 
>>>> had recently switched to using PersistentVolumeClaims in
>>>> Spark
>>>> with
>>>> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
>>>>  but kept
>>>> spark.dynamicAllocation.shuffleTracking.enabled=true. Upon review, I see 
>>>> under the notes for spark.dynamicAllocation.enabled [2] that
>>>> these
>>>> configurations are "or" not "and". However, when setting 
>>>> spark.dynamicAllocation.shuffleTracking.enabled=false, my job crashes with 
>>>> the
>>>> message
>>>> 
>>>> org.apache.spark.SparkException: Dynamic allocation of executors requires 
>>>> one of the following conditions: 1) enabling external shuffle
>>>> service through spark.shuffle.service.enabled. 2) enabling shuffle 
>>>> tracking through spark.dynamicAllocation.shuffleTracking.enabled. 3)
>>>> enabling shuffle blocks decommission through spark.decommission.enabled 
>>>> and spark.storage.decommission.shuffleBlocks.enabled. 4)
>>>> (Experimental) configuring spark.shuffle.sort.io.plugin.class to use a 
>>>> custom ShuffleDataIO who's ShuffleDriverComponents supports
>>>> reliable
>>>> storage.
>>>> 
>>>> Am I hitting this bug unavoidably? Or is there a configuration I'm missing 
>>>> to enable
>>>> spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
>>>>  to replace
>>>> spark.dynamicAllocation.shuffleTracking.enabled=true?
>>>> 
>>>> Using Spark 3.5.1 - here's my full spark-defaults.conf just in case
>>>> 
>>>> spark.checkpoint.compress                                                  
>>>>                     true
>>>> spark.driver.cores                                                         
>>>>                                       1
>>>> spark.driver.maxResultSize                                                 
>>>>                     2g
>>>> spark.driver.memory                                                        
>>>>                     5140m
>>>> spark.dynamicAllocation.enabled                                            
>>>>                     true
>>>> spark.dynamicAllocation.executorAllocationRatio                            
>>>>                     0.33
>>>> spark.dynamicAllocation.maxExecutors                                       
>>>>                     20
>>>> spark.dynamicAllocation.sustainedSchedulerBacklogTimeout                   
>>>>                     30
>>>> spark.eventLog.enabled                                                     
>>>>                     true
>>>> spark.executor.cores                                                       
>>>>                                   3
>>>> spark.executor.logs.rolling.enableCompression                              
>>>>                     true
>>>> spark.executor.logs.rolling.maxRetainedFiles                               
>>>>                     48
>>>> spark.executor.logs.rolling.strategy                                       
>>>>                     time
>>>> spark.executor.logs.rolling.time.interval                                  
>>>>                     hourly
>>>> spark.hadoop.fs.s3a.impl                                                   
>>>>                     org.apache.hadoop.fs.s3a.S3AFileSystem
>>>> spark.hadoop.fs.s3a.connection.ssl.enabled                                 
>>>>                     false
>>>> spark.hadoop.fs.s3a.fast.upload                                            
>>>>                     true
>>>> spark.kryo.registrationRequired                                            
>>>>                     false
>>>> spark.kryo.unsafe                                                          
>>>>                     false
>>>> spark.kryoserializer.buffer                                                
>>>>                     1m
>>>> spark.kryoserializer.buffer.max                                            
>>>>                     1g
>>>> spark.kubernetes.driver.limit.cores                                        
>>>>                     750m
>>>> spark.kubernetes.driver.ownPersistentVolumeClaim                           
>>>>                     true
>>>> spark.kubernetes.driver.request.cores                                      
>>>>                     750m
>>>> spark.kubernetes.driver.reusePersistentVolumeClaim                         
>>>>                     true
>>>> spark.kubernetes.driver.waitToReusePersistentVolumeClaim                   
>>>>                     true
>>>> spark.kubernetes.executor.limit.cores                                      
>>>>                     3700m
>>>> spark.kubernetes.executor.request.cores                                    
>>>>                     3700m
>>>> spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName
>>>>     OnDemand
>>>> spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path
>>>>            /data/spark-x/executor-x
>>>> spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly
>>>>        false
>>>> spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit
>>>>     20Gi
>>>> spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass
>>>>  ebs-sc
>>>> spark.kubernetes.namespace                                                 
>>>>                     spark
>>>> spark.serializer
>>>> org.apache.spark.serializer.KryoSerializer
>>>> spark.shuffle.sort.io.plugin.class
>>>> org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO
>>>> spark.sql.orc.compression.codec                                            
>>>>                     zlib
>>>> spark.sql.pyspark.jvmStacktrace.enabled                                    
>>>>                     true
>>>> spark.sql.sources.partitionOverwriteMode                                   
>>>>                     dynamic
>>>> spark.sql.streaming.kafka.useDeprecatedOffsetFetching                      
>>>>                     false
>>>> spark.submit.deployMode                                                    
>>>>                     cluster
>>>> 
>>>> Thanks,
>>>> Aaron
>>>> 
>>>> [1]
>>>> https://issues.apache.org/jira/browse/SPARK-45858
>>>> [2]
>>>> https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation
>>>> 
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>> 
>>> 
>>> 
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> 
>> 
> 


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to