I'm having trouble getting dynamic resource allocation to properly
terminate idle executors when using FSx Lustre for shuffle persistence on
EMR 7.8 (Spark 3.5.4) on EKS. I'm trying this strategy out to battle cost
via very severe data skew (I don't really care if a couple nodes run for
hours while the rest of the fleet deprovisions during a particular stage).
We have massive jobs that will sometimes shuffle 20 tb+

My expectation is that by using FSx for external shuffle storage, executors
can come and go as they please, while the shuffle is persisted on FSx.

*Setup:*

   - EMR on EKS with FSx Lustre mounted as persistent storage
   - Using KubernetesLocalDiskShuffleDataIO plugin for shuffle data recovery
   - *Goal:* Cost optimization by terminating executors during long tail
   operations

*Issue:*
Executors scale up fine, FSx mounting works and I see the shuffle spill to
FSx, but idle executors (0 active tasks) are not being terminated despite
60s idle timeout. They just sit there consuming resources. Job is running
successfully with shuffle data persisting correctly in FSx. I previously
had DRA working without FSx, but a majority of the executors held shuffle
data on NVMe drives so they never deprovisioned (although some did).

*Questions:*

   1. Is the KubernetesLocalDiskShuffleDataIO plugin preventing termination
   because it thinks shuffle data is still needed?
   2. Are my timeout settings too conservative? Should I be more aggressive?
   3. Any EMR-specific configurations that might override dynamic
   allocation behavior?

Has anyone successfully implemented dynamic allocation with persistent
shuffle storage on EMR on EKS? What am I missing?

*Configuration:*

"spark.dynamicAllocation.enabled": "true"
"spark.dynamicAllocation.shuffleTracking.enabled": "true"
"spark.dynamicAllocation.minExecutors": "1"
"spark.dynamicAllocation.maxExecutors": "200"
"spark.dynamicAllocation.initialExecutors": "3"
"spark.dynamicAllocation.executorIdleTimeout": "60s"
"spark.dynamicAllocation.cachedExecutorIdleTimeout": "90s"
"spark.dynamicAllocation.shuffleTracking.timeout": "30s"
"spark.local.dir": "/data/spark-tmp"
"spark.shuffle.sort.io.plugin.class":
"org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName":
"fsx-lustre-pvc"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path":
"/data"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly":
"false"
"spark.kubernetes.driver.ownPersistentVolumeClaim": "true"
"spark.kubernetes.driver.waitToReusePersistentVolumeClaim": "true"

*Environment:*
EMR 7.8.0, Spark 3.5.4, Kubernetes 1.32, FSx Lustre


Thanks!

Andrew


-- 
amm7...@gmail.com
(860) 748-3985

Reply via email to