tillrohrmann commented on a change in pull request #13227:
URL: https://github.com/apache/flink/pull/13227#discussion_r481852784
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
##########
@@ -91,37 +91,21 @@ public MiniDispatcher(
ApplicationStatus status =
result.getSerializedThrowable().isPresent() ?
ApplicationStatus.FAILED :
ApplicationStatus.SUCCEEDED;
- LOG.debug("Shutting down per-job cluster
because someone retrieved the job result.");
+ LOG.debug("Shutting down cluster because
someone retrieved the job result.");
shutDownFuture.complete(status);
});
} else {
- LOG.debug("Not shutting down per-job cluster after
someone retrieved the job result.");
+ LOG.debug("Not shutting down cluster after someone
retrieved the job result.");
}
return jobResultFuture;
}
- @Override
- public CompletableFuture<Acknowledge> cancelJob(
- JobID jobId, Time timeout) {
- CompletableFuture<Acknowledge> cancelFuture =
super.cancelJob(jobId, timeout);
-
- cancelFuture.thenAccept((ignored) -> {
- LOG.debug("Shutting down per-job cluster because the
job was canceled.");
- shutDownFuture.complete(ApplicationStatus.CANCELED);
- });
-
- return cancelFuture;
- }
-
@Override
protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph
archivedExecutionGraph) {
super.jobReachedGloballyTerminalState(archivedExecutionGraph);
- if (executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) {
- // shut down since we don't have to wait for the
execution result retrieval
-
shutDownFuture.complete(ApplicationStatus.fromJobStatus(archivedExecutionGraph.getState()));
- }
+
shutDownFuture.complete(ApplicationStatus.fromJobStatus(archivedExecutionGraph.getState()));
Review comment:
I would suggest to add a field which stores whether `cancelJob` has been
called. If this is the case or if `executionMode ==
ClusterEntrypoint.ExecutionMode.DETACHED`, then we complete the shut down
future. It would also be great to add a test for it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]