Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5087#discussion_r156736950 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java --- @@ -208,27 +209,61 @@ public void setLocality(Locality locality) { // ------------------------------------------------------------------------ @Override - public void releaseSlot() { + public void releaseInstanceSlot() { + releaseSlot(); + } + + @Override + public CompletableFuture<?> releaseSlot() { if (!isCanceled()) { + final CompletableFuture<?> terminationFuture; - // kill all tasks currently running in this slot - Execution exec = this.executedTask; - if (exec != null && !exec.isFinished()) { - exec.fail(new Exception("TaskManager was lost/killed: " + getTaskManagerLocation())); - } + if (payload != null) { + // trigger the failure of the slot payload + payload.fail(new FlinkException("TaskManager was lost/killed: " + getTaskManagerLocation())); - // release directly (if we are directly allocated), - // otherwise release through the parent shared slot - if (getParent() == null) { - // we have to give back the slot to the owning instance - if (markCancelled()) { - getOwner().returnAllocatedSlot(this); - } + // wait for the termination of the payload before releasing the slot + terminationFuture = payload.getTerminalStateFuture(); } else { - // we have to ask our parent to dispose us - getParent().releaseChild(this); + terminationFuture = CompletableFuture.completedFuture(null); } + + terminationFuture.whenComplete( + (Object ignored, Throwable throwable) -> { + // release directly (if we are directly allocated), + // otherwise release through the parent shared slot + if (getParent() == null) { + // we have to give back the slot to the owning instance + if (markCancelled()) { --- End diff -- `markCancelled` shall only guard against multiple accesses. Either a `SimpleSlot` is not part of a `SharedSlot`, then there must be a single thread which enters the if branch if `markCancelled` returns `false` for another thread. Or `SimpleSlot` is part of a `SharedSlot`, then the `parent` is not null and, thus, the release future will be completed in the else branch.
---