[ https://issues.apache.org/jira/browse/FLINK-4490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15453958#comment-15453958 ]
ASF GitHub Bot commented on FLINK-4490: --------------------------------------- Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/2447#discussion_r77101490 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java --- @@ -20,73 +20,125 @@ import org.apache.flink.runtime.instance.SimpleSlot; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * + */ public class SlotAllocationFuture { - + private final Object monitor = new Object(); - + private volatile SimpleSlot slot; - + private volatile SlotAllocationFutureAction action; - + // -------------------------------------------------------------------------------------------- + /** + * Creates a future that is uncompleted. + */ public SlotAllocationFuture() {} - + + /** + * Creates a future that is immediately completed. + * + * @param slot The task slot that completes the future. + */ public SlotAllocationFuture(SimpleSlot slot) { this.slot = slot; } - + // -------------------------------------------------------------------------------------------- - - public SimpleSlot waitTillAllocated() throws InterruptedException { - return waitTillAllocated(0); - } - - public SimpleSlot waitTillAllocated(long timeout) throws InterruptedException { + + public SimpleSlot waitTillCompleted() throws InterruptedException { synchronized (monitor) { while (slot == null) { - monitor.wait(timeout); + monitor.wait(); + } + return slot; + } + } + + public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { + checkArgument(timeout >= 0, "timeout may not be negative"); + checkNotNull(timeUnit, "timeUnit"); + + if (timeout == 0) { + return waitTillCompleted(); + } else { + final long deadline = System.nanoTime() + timeUnit.toNanos(timeout); + long millisToWait; + + synchronized (monitor) { + while (slot == null && (millisToWait = (deadline - System.nanoTime()) / 1_000_000) > 0) { + monitor.wait(millisToWait); + } + + if (slot != null) { + return slot; + } else { + throw new TimeoutException(); + } } - + } + } + + /** + * Gets the slot from this future. This method throws an exception, if the future has not been completed. + * This method never blocks. + * + * @return The slot with which this future was completed. + * @throws IllegalStateException Thrown, if this method is called before the future is completed. + */ + public SimpleSlot get() { + final SimpleSlot slot = this.slot; + if (slot != null) { return slot; + } else { + throw new IllegalStateException("The future is not complete - not slot available"); --- End diff -- Can we throw a explicitly exception like SlotNotReadyException instead of RuntimeException? > Decouple Slot and Instance > -------------------------- > > Key: FLINK-4490 > URL: https://issues.apache.org/jira/browse/FLINK-4490 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination > Reporter: Kurt Young > Fix For: 1.2.0 > > > Currently, {{Slot}} and {{Instance}} holds each other. For {{Instance}} > holding {{Slot}}, it makes sense because it reflects how many resources it > can provide and how many are using. > But it's not very necessary for {{Slot}} to hold {{Instance}} which it > belongs to. It only needs to hold some connection information and gateway to > talk to. Another downside for {{Slot}} holding {{Instance}} is that > {{Instance}} actually contains some allocate/de-allocation logicals, it will > be difficult if we want to do some allocation refactor without letting > {{Slot}} noticed. > We should abstract the connection information of {{Instance}} to let {{Slot}} > holds. (Actually we have {{InstanceConnectionInfo}} now, but lacks of > instance's akka gateway, maybe we can just adding the akka gateway to the > {{InstanceConnectionInfo}}) -- This message was sent by Atlassian JIRA (v6.3.4#6332)