Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5088#discussion_r155296898 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java --- @@ -144,6 +144,78 @@ public TaskManagerGateway getTaskManagerGateway() { return taskManagerGateway; } + /** + * Triggers the release of the logical slot. + */ + public void triggerLogicalSlotRelease() { + final LogicalSlot logicalSlot = logicalSlotReference.get(); + + if (logicalSlot != null) { + logicalSlot.releaseSlot(); + } + } + + /** + * Releases the logical slot. + * + * @return true if the logical slot could be released, false otherwise. + */ + public boolean releaseLogicalSlot() { + final LogicalSlot logicalSlot = logicalSlotReference.get(); + + if (logicalSlot != null) { + if (logicalSlot instanceof Slot) { + final Slot slot = (Slot) logicalSlot; + if (slot.markReleased()) { + logicalSlotReference.set(null); + return true; + } + } else { + throw new RuntimeException("Unsupported logical slot type encounterd " + logicalSlot.getClass()); --- End diff -- Typo: * encounterd*
---