Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5980#discussion_r187306715 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java --- @@ -159,10 +159,51 @@ public SlotSharingGroupId getSlotSharingGroupId() { * the logical slot. * * @param cause of the payload release - * @return true if the logical slot's payload could be released, otherwise false */ @Override - public boolean release(Throwable cause) { - return releaseSlot(cause).isDone(); + public void release(Throwable cause) { + if (STATE_UPDATER.compareAndSet(this, State.ALIVE, State.RELEASING)) { + signalPayloadRelease(cause); + } + state = State.RELEASED; + releaseFuture.complete(null); + } + + private CompletableFuture<?> signalPayloadRelease(Throwable cause) { + tryAssignPayload(TERMINATED_PAYLOAD); + payload.fail(cause); + + return payload.getTerminalStateFuture(); + } + + private void returnSlotToOwner(CompletableFuture<?> terminalStateFuture) { + final CompletableFuture<Boolean> slotReturnFuture = terminalStateFuture.handle((Object ignored, Throwable throwable) -> { + if (state == State.RELEASING) { --- End diff -- True, this entails a logical coupling which is currently the case. I'll add the double check to make it independent of this assumption.
---