ivoson commented on code in PR #54136:
URL: https://github.com/apache/spark/pull/54136#discussion_r2894314522
##########
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:
##########
@@ -155,33 +161,44 @@ private[storage] class BlockManagerDecommissioner(
} else if (e.getCause != null && e.getCause.getMessage != null
&& e.getCause.getMessage
.contains(blockSavedOnDecommissionedBlockManagerException)) {
- isTargetDecommissioned = true
+ // Target is decommissioned, don't penalize the block.
keepRunning = false
+ needRetry = true
+ newRetryCount = retryCount
+ } else if (e.getCause != null && e.getCause.getMessage != null
+ && e.getCause.getMessage
+ .contains(shuffleManagerNotInitializedException)) {
+ // Target executor's ShuffleManager is not yet initialized.
+ // This is transient, so requeue without incrementing
failure count
+ // and keep the migration thread running for this peer.
+ logWarning(log"Target executor's ShuffleManager not
initialized for " +
+ log"${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)}. Will
retry.")
+ needRetry = true
+ newRetryCount = retryCount
Review Comment:
Will still increment the retry count to avoid infinite retry. So the block
would be either picked up by some other nodes, or retry for a few times
(default 30) on the current node.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]