buska88 commented on code in PR #47949: URL: https://github.com/apache/spark/pull/47949#discussion_r1990965346
########## core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala: ########## @@ -308,13 +328,59 @@ private[spark] class ExecutorAllocationManager( tasksPerExecutor).toInt val maxNeededWithSpeculationLocalityOffset = - if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) { - // If we have pending speculative tasks and only need a single executor, allocate one more - // to satisfy the locality requirements of speculation - maxNeeded + 1 - } else { - maxNeeded - } + if (pendingSpeculative > 0 && maxNeeded <= numExecutorsTarget && + executorMonitor.executorHostsCount == 1 && scheduler.isInstanceOf[TaskSchedulerImpl]) { + val now = clock.nanoTime() + if (excludeNodesTriggerTime == NOT_SET) { + excludeNodesTriggerTime = now + TimeUnit.MINUTES.toNanos(excludeNodeTriggerTimeoutMin) + logDebug(log"Current timestamp ${MDC(TIMESTAMP, now)}, " + + log"set excludeNodesTriggerTime to ${MDC(TIMESTAMP, excludeNodesTriggerTime)}") + maxNeeded + } else if (now < excludeNodesTriggerTime) { + maxNeeded + } else { + if (executorMonitor.hasAdjustMaxNeed) { + adjustMaxNeed + } else { + logDebug(log"${MDC(TIMESTAMP, now)} exceeds" + + log" ${MDC(TIMESTAMP, excludeNodesTriggerTime)}, start exclude node!") + val node = executorMonitor.getExecutorHostsName(0) + // check if current remaining host has attempts of speculative task + val speculativeTasksInfo = listener.getPendingSpeculativeTasksInfo() + if (scheduler.asInstanceOf[TaskSchedulerImpl]. + speculativeTasksHasAttemptOnHost(node, speculativeTasksInfo)) { + // make sure new maxNeed exceeds numExecutorsTarget and allocate executor + adjustMaxNeed = numExecutorsTarget + 1 + // If hasAdjustMaxNeed, use last adjust value as + // maxNeededWithSpeculationLocalityOffset in case of numExecutorsTarget keeps + // increasing during maxNumExecutorsNeededPerResourceProfile method called + if (scheduler.asInstanceOf[TaskSchedulerImpl].handleExcludeNodes(node)) { + logDebug(log"Exclude ${MDC(HOST, node)} for speculative, " + + log"old maxNeeded: ${MDC(COUNT, maxNeeded)}, " + + log"old numExecutorsTarget: ${MDC(COUNT, numExecutorsTarget)}, " + + log"current executors count: ${MDC(COUNT, executorMonitor.executorCount)}") + excludeSpeculativeNodes.add(node) + executorMonitor.setAdjustMaxNeed(true) + adjustMaxNeed + } else { + logDebug(log"${MDC(HOST, node)} has been excluded for other reason") + maxNeeded + } + } else { + logDebug(log"No speculative task found on ${MDC(HOST, node)}") + maxNeeded + } + } + } + } else if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) { + resetExcludeNodesTriggerTime() + // If we have pending speculative tasks and only need a single executor, allocate one more + // to satisfy the locality requirements of speculation + maxNeeded + 1 + } else { + resetExcludeNodesTriggerTime() + maxNeeded + } Review Comment: Thanks for reviewing.I have noticed the review message today.What if make this feature configurable? Users are more intolerant of being stuck than of wastage in some cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org