[
https://issues.apache.org/jira/browse/FLINK-8085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16278898#comment-16278898
]
ASF GitHub Bot commented on FLINK-8085:
---------------------------------------
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5087#discussion_r155018207
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
---
@@ -267,10 +270,77 @@ public void testAnyPreferredLocationCalculation()
throws ExecutionException, Int
assertThat(preferredLocations,
containsInAnyOrder(taskManagerLocation1, taskManagerLocation3));
}
+ /**
+ * Checks that the {@link Execution} termination future is only
completed after the
+ * assigned slot has been released.
+ *
+ * <p>NOTE: This test only fails spuriously without the fix of this
commit. Thus, one has
+ * to execute this test multiple times to see the failure.
+ */
+ @Test
+ public void testTerminationFutureIsCompletedAfterSlotRelease() throws
Exception {
+ final JobVertexID jobVertexId = new JobVertexID();
+ final JobVertex jobVertex = new JobVertex("Test vertex",
jobVertexId);
+ jobVertex.setInvokableClass(NoOpInvokable.class);
+
+ final SingleSlotTestingSlotOwner slotOwner = new
SingleSlotTestingSlotOwner();
+
+ final SimpleSlot slot = new SimpleSlot(
+ new JobID(),
+ slotOwner,
+ new LocalTaskManagerLocation(),
+ 0,
+ new SimpleAckingTaskManagerGateway());
+
+ final ProgrammedSlotProvider slotProvider = new
ProgrammedSlotProvider(1);
+ slotProvider.addSlot(jobVertexId, 0,
CompletableFuture.completedFuture(slot));
+
+ ExecutionGraph executionGraph =
ExecutionGraphTestUtils.createSimpleTestGraph(
+ new JobID(),
+ slotProvider,
+ new NoRestartStrategy(),
+ jobVertex);
+
+ ExecutionJobVertex executionJobVertex =
executionGraph.getJobVertex(jobVertexId);
+
+ ExecutionVertex executionVertex =
executionJobVertex.getTaskVertices()[0];
+
+ assertTrue(executionVertex.scheduleForExecution(slotProvider,
false, LocationPreferenceConstraint.ANY));
+
+ Execution currentExecutionAttempt =
executionVertex.getCurrentExecutionAttempt();
+
+ CompletableFuture<Slot> returnedSlotFuture =
slotOwner.getReturnedSlotFuture();
+ CompletableFuture<?> terminationFuture =
executionVertex.cancel();
+
+ // run canceling in a separate thread to allow an interleaving
between termination
+ // future callback registrations
+ CompletableFuture.runAsync(
+ () -> currentExecutionAttempt.cancelingComplete(),
+ TestingUtils.defaultExecutor());
--- End diff --
If an executor is created, where is it shutdown afterwards? I think this
should be just fine:
```
new Thread() {
@Override
public void run() {
currentExecutionAttempt.cancelingComplete();
}
}.start();
```
> Thin out the LogicalSlot interface
> ----------------------------------
>
> Key: FLINK-8085
> URL: https://issues.apache.org/jira/browse/FLINK-8085
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Coordination
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Minor
> Labels: flip-6
>
> The {{LogicalSlot}} interface contains method which we don't strictly need
> (e.g. {{isCanceled}}, {{isReleased}}). Moreover, we should decouple the
> {{LogicalSlot}} from the {{Execution}} by only setting the
> {{ExecutionAttemptID}} instead of {{Execution}}.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)