Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r148209063 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) { // that way we do not have any operation that can fail between allocating the slots // and adding them to the list. If we had a failure in between there, that would // cause the slots to get lost - final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices()); final boolean queued = allowQueuedScheduling; - // we use this flag to handle failures in a 'finally' clause - // that allows us to not go through clumsy cast-and-rethrow logic - boolean successful = false; + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); - try { - // collecting all the slots may resize and fail in that operation without slots getting lost - final ArrayList<CompletableFuture<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures --- End diff -- There is no specific reason why we iterate over the vertices in topological order. We could also choose a completely random order for eager scheduling because the scheduling order will be determined by the preferred location futures (which at the moment is based on inputs only). If we should switch to state location then it basically means that we schedule the individual tasks independently because the vertices don't depend on the input locations.
---