Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5087#discussion_r155016510 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java --- @@ -267,10 +270,77 @@ public void testAnyPreferredLocationCalculation() throws ExecutionException, Int assertThat(preferredLocations, containsInAnyOrder(taskManagerLocation1, taskManagerLocation3)); } + /** + * Checks that the {@link Execution} termination future is only completed after the + * assigned slot has been released. + * + * <p>NOTE: This test only fails spuriously without the fix of this commit. Thus, one has + * to execute this test multiple times to see the failure. + */ + @Test + public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception { + final JobVertexID jobVertexId = new JobVertexID(); + final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId); + jobVertex.setInvokableClass(NoOpInvokable.class); + + final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner(); + + final SimpleSlot slot = new SimpleSlot( + new JobID(), + slotOwner, + new LocalTaskManagerLocation(), + 0, + new SimpleAckingTaskManagerGateway()); + + final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1); + slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot)); + + ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph( + new JobID(), + slotProvider, + new NoRestartStrategy(), + jobVertex); + + ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0]; + + assertTrue(executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY)); + + Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt(); + + CompletableFuture<Slot> returnedSlotFuture = slotOwner.getReturnedSlotFuture(); + CompletableFuture<?> terminationFuture = executionVertex.cancel(); + + // run canceling in a separate thread to allow an interleaving between termination + // future callback registrations + CompletableFuture.runAsync( + () -> currentExecutionAttempt.cancelingComplete(), + TestingUtils.defaultExecutor()); + + // to increase probability for problematic interleaving, let the current thread yield the processor + Thread.yield(); + + CompletableFuture<Boolean> restartFuture = terminationFuture.thenApply( + ignored -> { + try { + assertTrue(returnedSlotFuture.isDone()); + } catch (Exception e) { + throw new CompletionException(e); --- End diff -- `isDone` does not throw an exception. `assertTrue` throws an `Error`, which is not an `Exception`. Is it possible to get into this code path?
---