Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5980#discussion_r187191338
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
 ---
    @@ -159,10 +159,51 @@ public SlotSharingGroupId getSlotSharingGroupId() {
         * the logical slot.
         *
         * @param cause of the payload release
    -    * @return true if the logical slot's payload could be released, 
otherwise false
         */
        @Override
    -   public boolean release(Throwable cause) {
    -           return releaseSlot(cause).isDone();
    +   public void release(Throwable cause) {
    +           if (STATE_UPDATER.compareAndSet(this, State.ALIVE, 
State.RELEASING)) {
    +                   signalPayloadRelease(cause);
    +           }
    +           state = State.RELEASED;
    +           releaseFuture.complete(null);
    +   }
    +
    +   private CompletableFuture<?> signalPayloadRelease(Throwable cause) {
    +           tryAssignPayload(TERMINATED_PAYLOAD);
    +           payload.fail(cause);
    +
    +           return payload.getTerminalStateFuture();
    +   }
    +
    +   private void returnSlotToOwner(CompletableFuture<?> 
terminalStateFuture) {
    +           final CompletableFuture<Boolean> slotReturnFuture = 
terminalStateFuture.handle((Object ignored, Throwable throwable) -> {
    +                   if (state == State.RELEASING) {
    --- End diff --
    
    What happens if this gets set concurrently to `RELEASED`? This would only 
work if `slotOwner.returnAllocatedSlot(this)` works in both cases (releasing 
and released) and the second path (returning the completed future) is the 
optimization/fast path if it is already released. (double checking the 
assumption).


---

Reply via email to