Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6279#discussion_r201869163 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -536,7 +540,25 @@ private JobManagerRunner createJobManagerRunner(JobGraph jobGraph) throws Except private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean cleanupHA) { final CompletableFuture<Void> cleanupFuture = removeJob(jobId, cleanupHA); - registerOrphanedJobManagerTerminationFuture(cleanupFuture); + registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture); + } + + private void registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableFuture<Void> jobManagerRunnerTerminationFuture) { + Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId)); + + jobManagerTerminationFutures.put(jobId, jobManagerRunnerTerminationFuture); + + // clean up the pending termination future + jobManagerRunnerTerminationFuture.thenRunAsync( + () -> { + final CompletableFuture<Void> terminationFuture = jobManagerTerminationFutures.remove(jobId); + + //noinspection ObjectEquality + if (terminationFuture != null && terminationFuture != jobManagerRunnerTerminationFuture) { + jobManagerTerminationFutures.put(jobId, terminationFuture); --- End diff -- It can happen because we also clear the termination future in the callback of the `Dispatcher#waitForTerminatingJobManager` method.
---