buska88 commented on code in PR #47949:
URL: https://github.com/apache/spark/pull/47949#discussion_r1990965346


##########
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala:
##########
@@ -308,13 +328,59 @@ private[spark] class ExecutorAllocationManager(
       tasksPerExecutor).toInt
 
     val maxNeededWithSpeculationLocalityOffset =
-      if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
-      // If we have pending speculative tasks and only need a single executor, 
allocate one more
-      // to satisfy the locality requirements of speculation
-      maxNeeded + 1
-    } else {
-      maxNeeded
-    }
+      if (pendingSpeculative > 0 && maxNeeded <= numExecutorsTarget &&
+        executorMonitor.executorHostsCount == 1 && 
scheduler.isInstanceOf[TaskSchedulerImpl]) {
+        val now = clock.nanoTime()
+        if (excludeNodesTriggerTime == NOT_SET) {
+          excludeNodesTriggerTime = now + 
TimeUnit.MINUTES.toNanos(excludeNodeTriggerTimeoutMin)
+          logDebug(log"Current timestamp ${MDC(TIMESTAMP, now)}, " +
+            log"set excludeNodesTriggerTime to ${MDC(TIMESTAMP, 
excludeNodesTriggerTime)}")
+          maxNeeded
+        } else if (now < excludeNodesTriggerTime) {
+          maxNeeded
+        } else {
+          if (executorMonitor.hasAdjustMaxNeed) {
+            adjustMaxNeed
+          } else {
+            logDebug(log"${MDC(TIMESTAMP, now)} exceeds" +
+              log" ${MDC(TIMESTAMP, excludeNodesTriggerTime)}, start exclude 
node!")
+            val node = executorMonitor.getExecutorHostsName(0)
+            // check if current remaining host has attempts of speculative task
+            val speculativeTasksInfo = 
listener.getPendingSpeculativeTasksInfo()
+            if (scheduler.asInstanceOf[TaskSchedulerImpl].
+              speculativeTasksHasAttemptOnHost(node, speculativeTasksInfo)) {
+              // make sure new maxNeed exceeds numExecutorsTarget and allocate 
executor
+              adjustMaxNeed = numExecutorsTarget + 1
+              // If hasAdjustMaxNeed, use last adjust value as
+              // maxNeededWithSpeculationLocalityOffset in case of 
numExecutorsTarget keeps
+              // increasing during maxNumExecutorsNeededPerResourceProfile 
method called
+              if 
(scheduler.asInstanceOf[TaskSchedulerImpl].handleExcludeNodes(node)) {
+                logDebug(log"Exclude ${MDC(HOST, node)} for speculative, " +
+                  log"old maxNeeded: ${MDC(COUNT, maxNeeded)}, " +
+                  log"old numExecutorsTarget: ${MDC(COUNT, 
numExecutorsTarget)}, " +
+                  log"current executors count: ${MDC(COUNT, 
executorMonitor.executorCount)}")
+                excludeSpeculativeNodes.add(node)
+                executorMonitor.setAdjustMaxNeed(true)
+                adjustMaxNeed
+              } else {
+                logDebug(log"${MDC(HOST, node)} has been excluded for other 
reason")
+                maxNeeded
+              }
+            } else {
+              logDebug(log"No speculative task found on ${MDC(HOST, node)}")
+              maxNeeded
+            }
+          }
+        }
+      } else if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative 
> 0) {
+        resetExcludeNodesTriggerTime()
+        // If we have pending speculative tasks and only need a single 
executor, allocate one more
+        // to satisfy the locality requirements of speculation
+        maxNeeded + 1
+      } else {
+        resetExcludeNodesTriggerTime()
+        maxNeeded
+      }

Review Comment:
   Thanks for reviewing.I have noticed the review message today.What if make 
this feature configurable?
   Users are more intolerant of being stuck than of wastage in some cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to