Hi Spark users,
those interested in the storage decommissioning on Kubernetes might be
interested in some known issues and how to fix them:
https://issues.apache.org/jira/browse/SPARK-52504
Cheers,
Enrico
Am 19.02.25 um 12:23 schrieb Enrico Minack:
Hi Spark users,
I am trying to use the executor and storage decommissioning feature in
conjunction with an S3 fallback storage, on a K8S cluster.
I am wondering how mature this feature is considered, given it is around for
quite some time. Has this been used in anger on K8S? What is your experience
with this setup?
The most severe issue that I am seeing is when an executor gets decommissioned
while other executors are reading from it, they see the following error:
FetchFailed(BlockManagerId(2, 21.33.215.240, 30001, None), shuffleId=0,
mapIndex=1235, mapId=1235, reduceId=18, message=
org.apache.spark.shuffle.FetchFailedException
at
org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:437)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1239)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:971)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:86)
...
Caused by: org.apache.spark.ExecutorDeadException: [INTERNAL_ERROR_NETWORK] The
relative remote executor(Id: 2), which maintains the block data to fetch is
dead.
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:141)
at
org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:173)
at
org.apache.spark.network.shuffle.RetryingBlockTransferor.start(RetryingBlockTransferor.java:152)
at
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:151)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:374)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.send$1(ShuffleBlockFetcherIterator.scala:1209)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:1201)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:715)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:194)
at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:73)
...
This causes a stage retry, which is expensive and has a maximum retry limit.
This sounds exactly what has been reported
inhttps://issues.apache.org/jira/browse/SPARK-44389. Has anyone seen this and
managed to work around it?
Ideally, the ShuffleBlockFetcherIterator would fetch the new location of the
shuffle block and immediately attempts reading from there.
Cheers,
Enrico