attilapiros commented on code in PR #50630:
URL: https://github.com/apache/spark/pull/50630#discussion_r2069585506
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2362,12 +2363,15 @@ private[spark] class DAGScheduler(
if (numMissingPartitions < resultStage.numTasks) {
// TODO: support to rollback result tasks.
abortStage(resultStage, generateErrorMessage(resultStage), None)
- numAbortedJobs += 1
}
case _ =>
}
- (numAbortedJobs >= numJobsWithStage, rollingBackStages)
+
+ val numActiveJobsWithStageAfterRollback =
+ activeJobs.count(job => stagesToRollback.contains(job.finalStage))
Review Comment:
@mridulm I made this change as when the shuffle map stage was aborted
because of SHUFFLE_USE_OLD_FETCH_PROTOCOL that should be counted into
`numAborted` too.
And it is easier to use the `activeJobs` again as it is updated at:
https://github.com/apache/spark/blob/3028c56467352924e3c44554b28b508355616e8b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L907
Called via the call chain of `cleanupStateForJobAndIndependentStages()` <-
`failJobAndIndependentStages()` <- `abortStage()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]