Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5980#discussion_r187305983 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java --- @@ -159,10 +159,51 @@ public SlotSharingGroupId getSlotSharingGroupId() { * the logical slot. * * @param cause of the payload release - * @return true if the logical slot's payload could be released, otherwise false */ @Override - public boolean release(Throwable cause) { - return releaseSlot(cause).isDone(); + public void release(Throwable cause) { + if (STATE_UPDATER.compareAndSet(this, State.ALIVE, State.RELEASING)) { + signalPayloadRelease(cause); + } + state = State.RELEASED; + releaseFuture.complete(null); + } + + private CompletableFuture<?> signalPayloadRelease(Throwable cause) { + tryAssignPayload(TERMINATED_PAYLOAD); + payload.fail(cause); + + return payload.getTerminalStateFuture(); + } + + private void returnSlotToOwner(CompletableFuture<?> terminalStateFuture) { + final CompletableFuture<Boolean> slotReturnFuture = terminalStateFuture.handle((Object ignored, Throwable throwable) -> { + if (state == State.RELEASING) { + return slotOwner.returnAllocatedSlot(this); + } else { + return CompletableFuture.completedFuture(true); + } + }).thenCompose(Function.identity()); + + slotReturnFuture.whenComplete( + (Object ignored, Throwable throwable) -> { + state = State.RELEASED; --- End diff -- yes, this is cleaner
---