[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16612253#comment-16612253
 ] 

ASF GitHub Bot commented on FLINK-10011:
----------------------------------------

tillrohrmann closed pull request #6588: [Backport 1.6][FLINK-10011] Release 
JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6588
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
index 069cb833a3a..45d11412c50 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/services/ZooKeeperMesosServices.java
@@ -56,8 +56,7 @@ public MesosWorkerStore createMesosWorkerStore(Configuration 
configuration, Exec
 
                ZooKeeperStateHandleStore<MesosWorkerStore.Worker> 
zooKeeperStateHandleStore = 
zooKeeperUtilityFactory.createZooKeeperStateHandleStore(
                        "/workers",
-                       stateStorageHelper,
-                       executor);
+                       stateStorageHelper);
 
                ZooKeeperSharedValue frameworkId = 
zooKeeperUtilityFactory.createSharedValue("/frameworkId", new byte[0]);
                ZooKeeperSharedCount totalTaskCount = 
zooKeeperUtilityFactory.createSharedCount("/taskCount", 0);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
index f2f905971c3..9a992814235 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
@@ -19,9 +19,11 @@
 package org.apache.flink.runtime.akka;
 
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
 
 import akka.actor.ActorRef;
 import akka.actor.Kill;
+import akka.actor.PoisonPill;
 import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,5 +87,13 @@
                return FutureUtils.completeAll(terminationFutures);
        }
 
+       public static void stopActor(AkkaActorGateway akkaActorGateway) {
+               stopActor(akkaActorGateway.actor());
+       }
+
+       public static void stopActor(ActorRef actorRef) {
+               actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+       }
+
        private ActorUtils() {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index f22127041d3..131733924ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -25,14 +25,13 @@
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.ConsumerWithException;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -86,6 +85,8 @@
         */
        private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
 
+       private final Executor executor;
+
        /**
         * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
         *
@@ -98,7 +99,7 @@
         *                                       start with a '/')
         * @param stateStorage                   State storage to be used to 
persist the completed
         *                                       checkpoint
-        * @param executor to give to the ZooKeeperStateHandleStore to run 
ZooKeeper callbacks
+        * @param executor to execute blocking calls
         * @throws Exception
         */
        public ZooKeeperCompletedCheckpointStore(
@@ -123,10 +124,12 @@ public ZooKeeperCompletedCheckpointStore(
                // All operations will have the path as root
                this.client = client.usingNamespace(client.getNamespace() + 
checkpointsPath);
 
-               this.checkpointsInZooKeeper = new 
ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
+               this.checkpointsInZooKeeper = new 
ZooKeeperStateHandleStore<>(this.client, stateStorage);
 
                this.completedCheckpoints = new 
ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
 
+               this.executor = checkNotNull(executor);
+
                LOG.info("Initialized in '{}'.", checkpointsPath);
        }
 
@@ -236,16 +239,30 @@ public void addCheckpoint(final CompletedCheckpoint 
checkpoint) throws Exception
 
                // Everything worked, let's remove a previous checkpoint if 
necessary.
                while (completedCheckpoints.size() > 
maxNumberOfCheckpointsToRetain) {
-                       try {
-                               
removeSubsumed(completedCheckpoints.removeFirst());
-                       } catch (Exception e) {
-                               LOG.warn("Failed to subsume the old 
checkpoint", e);
-                       }
+                       final CompletedCheckpoint completedCheckpoint = 
completedCheckpoints.removeFirst();
+                       tryRemoveCompletedCheckpoint(completedCheckpoint, 
CompletedCheckpoint::discardOnSubsume);
                }
 
                LOG.debug("Added {} to {}.", checkpoint, path);
        }
 
+       private void tryRemoveCompletedCheckpoint(CompletedCheckpoint 
completedCheckpoint, ConsumerWithException<CompletedCheckpoint, Exception> 
discardCallback) {
+               try {
+                       if (tryRemove(completedCheckpoint.getCheckpointID())) {
+                               executor.execute(() -> {
+                                       try {
+                                               
discardCallback.accept(completedCheckpoint);
+                                       } catch (Exception e) {
+                                               LOG.warn("Could not discard 
completed checkpoint {}.", completedCheckpoint.getCheckpointID(), e);
+                                       }
+                               });
+
+                       }
+               } catch (Exception e) {
+                       LOG.warn("Failed to subsume the old checkpoint", e);
+               }
+       }
+
        @Override
        public CompletedCheckpoint getLatestCheckpoint() {
                if (completedCheckpoints.isEmpty()) {
@@ -278,11 +295,9 @@ public void shutdown(JobStatus jobStatus) throws Exception 
{
                        LOG.info("Shutting down");
 
                        for (CompletedCheckpoint checkpoint : 
completedCheckpoints) {
-                               try {
-                                       removeShutdown(checkpoint, jobStatus);
-                               } catch (Exception e) {
-                                       LOG.error("Failed to discard 
checkpoint.", e);
-                               }
+                               tryRemoveCompletedCheckpoint(
+                                       checkpoint,
+                                       completedCheckpoint -> 
completedCheckpoint.discardOnShutdown(jobStatus));
                        }
 
                        completedCheckpoints.clear();
@@ -305,59 +320,13 @@ public void shutdown(JobStatus jobStatus) throws 
Exception {
        // 
------------------------------------------------------------------------
 
        /**
-        * Removes a subsumed checkpoint from ZooKeeper and drops the state.
-        */
-       private void removeSubsumed(
-               final CompletedCheckpoint completedCheckpoint) throws Exception 
{
-
-               if (completedCheckpoint == null) {
-                       return;
-               }
-
-               ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> 
action =
-                       new 
ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-                               @Override
-                               public void apply(@Nullable 
RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-                                       if (value != null) {
-                                               try {
-                                                       
completedCheckpoint.discardOnSubsume();
-                                               } catch (Exception e) {
-                                                       throw new 
FlinkException("Could not discard the completed checkpoint on subsume.", e);
-                                               }
-                                       }
-                               }
-                       };
-
-               checkpointsInZooKeeper.releaseAndTryRemove(
-                       
checkpointIdToPath(completedCheckpoint.getCheckpointID()),
-                       action);
-       }
-
-       /**
-        * Removes a checkpoint from ZooKeeper because of Job shutdown and 
drops the state.
+        * Tries to remove the checkpoint identified by the given checkpoint id.
+        *
+        * @param checkpointId identifying the checkpoint to remove
+        * @return true if the checkpoint could be removed
         */
-       private void removeShutdown(
-                       final CompletedCheckpoint completedCheckpoint,
-                       final JobStatus jobStatus) throws Exception {
-
-               if (completedCheckpoint == null) {
-                       return;
-               }
-
-               ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> 
removeAction = new 
ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
-                       @Override
-                       public void apply(@Nullable 
RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
-                               try {
-                                       
completedCheckpoint.discardOnShutdown(jobStatus);
-                               } catch (Exception e) {
-                                       throw new FlinkException("Could not 
discard the completed checkpoint on subsume.", e);
-                               }
-                       }
-               };
-
-               checkpointsInZooKeeper.releaseAndTryRemove(
-                       
checkpointIdToPath(completedCheckpoint.getCheckpointID()),
-                       removeAction);
+       private boolean tryRemove(long checkpointId) throws Exception {
+               return 
checkpointsInZooKeeper.releaseAndTryRemove(checkpointIdToPath(checkpointId));
        }
 
        /**
@@ -381,7 +350,7 @@ public static long pathToCheckpointId(String path) {
                        String numberString;
 
                        // check if we have a leading slash
-                       if ('/' == path.charAt(0) ) {
+                       if ('/' == path.charAt(0)) {
                                numberString = path.substring(1);
                        } else {
                                numberString = path;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c96acbd3192..c31e64c0adc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -572,30 +572,38 @@ private void 
registerJobManagerRunnerTerminationFuture(JobID jobId, CompletableF
                }
 
                return jobManagerRunnerTerminationFuture.thenRunAsync(
-                       () -> {
-                               jobManagerMetricGroup.removeJob(jobId);
+                       () -> cleanUpJobData(jobId, cleanupHA),
+                       getRpcService().getExecutor());
+       }
 
-                               boolean cleanupHABlobs = false;
-                               if (cleanupHA) {
-                                       try {
-                                               
submittedJobGraphStore.removeJobGraph(jobId);
+       private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
+               jobManagerMetricGroup.removeJob(jobId);
 
-                                               // only clean up the HA blobs 
if we could remove the job from HA storage
-                                               cleanupHABlobs = true;
-                                       } catch (Exception e) {
-                                               log.warn("Could not properly 
remove job {} from submitted job graph store.", jobId, e);
-                                       }
+               boolean cleanupHABlobs = false;
+               if (cleanupHA) {
+                       try {
+                               submittedJobGraphStore.removeJobGraph(jobId);
 
-                                       try {
-                                               
runningJobsRegistry.clearJob(jobId);
-                                       } catch (IOException e) {
-                                               log.warn("Could not properly 
remove job {} from the running jobs registry.", jobId, e);
-                                       }
-                               }
+                               // only clean up the HA blobs if we could 
remove the job from HA storage
+                               cleanupHABlobs = true;
+                       } catch (Exception e) {
+                               log.warn("Could not properly remove job {} from 
submitted job graph store.", jobId, e);
+                       }
 
-                               blobServer.cleanupJob(jobId, cleanupHABlobs);
-                       },
-                       getRpcService().getExecutor());
+                       try {
+                               runningJobsRegistry.clearJob(jobId);
+                       } catch (IOException e) {
+                               log.warn("Could not properly remove job {} from 
the running jobs registry.", jobId, e);
+                       }
+               } else {
+                       try {
+                               submittedJobGraphStore.releaseJobGraph(jobId);
+                       } catch (Exception e) {
+                               log.warn("Could not properly release job {} 
from submitted job graph store.", jobId, e);
+                       }
+               }
+
+               blobServer.cleanupJob(jobId, cleanupHABlobs);
        }
 
        /**
@@ -806,8 +814,7 @@ public void grantLeadership(final UUID newLeaderSessionID) {
        }
 
        private CompletableFuture<Void> waitForTerminatingJobManager(JobID 
jobId, JobGraph jobGraph, ConsumerWithException<JobGraph, ?> action) {
-               final CompletableFuture<Void> jobManagerTerminationFuture = 
jobManagerTerminationFutures
-                       .getOrDefault(jobId, 
CompletableFuture.completedFuture(null))
+               final CompletableFuture<Void> jobManagerTerminationFuture = 
getJobTerminationFuture(jobId)
                        .exceptionally((Throwable throwable) -> {
                                throw new CompletionException(
                                        new DispatcherException(
@@ -822,6 +829,14 @@ public void grantLeadership(final UUID newLeaderSessionID) 
{
                        getMainThreadExecutor());
        }
 
+       protected CompletableFuture<Void> getJobTerminationFuture(JobID jobId) {
+               if (jobManagerRunners.containsKey(jobId)) {
+                       return FutureUtils.completedExceptionally(new 
DispatcherException(String.format("Job with job id %s is still running.", 
jobId)));
+               } else {
+                       return jobManagerTerminationFutures.getOrDefault(jobId, 
CompletableFuture.completedFuture(null));
+               }
+       }
+
        private void setNewFencingToken(@Nullable DispatcherId dispatcherId) {
                // clear the state if we've been the leader before
                if (getFencingToken() != null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
index 26d3abc2f1f..fe7f5f14cc8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SingleJobSubmittedJobGraphStore.java
@@ -66,12 +66,17 @@ public void putJobGraph(SubmittedJobGraph jobGraph) throws 
Exception {
        }
 
        @Override
-       public void removeJobGraph(JobID jobId) throws Exception {
+       public void removeJobGraph(JobID jobId) {
                // ignore
        }
 
        @Override
-       public Collection<JobID> getJobIds() throws Exception {
+       public void releaseJobGraph(JobID jobId) {
+               // ignore
+       }
+
+       @Override
+       public Collection<JobID> getJobIds() {
                return Collections.singleton(jobGraph.getJobID());
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 3882479ce95..ea96d7d43d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -179,7 +179,7 @@ public CheckpointRecoveryFactory 
getCheckpointRecoveryFactory() {
 
        @Override
        public SubmittedJobGraphStore getSubmittedJobGraphStore() throws 
Exception {
-               return ZooKeeperUtils.createSubmittedJobGraphs(client, 
configuration, executor);
+               return ZooKeeperUtils.createSubmittedJobGraphs(client, 
configuration);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index d1ca1a38853..f28621f0d69 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -43,22 +43,27 @@ public void stop() {
        }
 
        @Override
-       public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+       public void putJobGraph(SubmittedJobGraph jobGraph) {
                // Nothing to do
        }
 
        @Override
-       public void removeJobGraph(JobID jobId) throws Exception {
+       public void removeJobGraph(JobID jobId) {
                // Nothing to do
        }
 
        @Override
-       public Collection<JobID> getJobIds() throws Exception {
+       public void releaseJobGraph(JobID jobId) {
+               // nothing to do
+       }
+
+       @Override
+       public Collection<JobID> getJobIds() {
                return Collections.emptyList();
        }
 
        @Override
-       public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+       public SubmittedJobGraph recoverJobGraph(JobID jobId) {
                return null;
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
index 7e624ec6e1d..b40a4a2b95f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 
 import javax.annotation.Nullable;
 
@@ -58,6 +59,17 @@
         */
        void removeJobGraph(JobID jobId) throws Exception;
 
+       /**
+        * Releases the locks on the specified {@link JobGraph}.
+        *
+        * Releasing the locks allows that another instance can delete the job 
from
+        * the {@link SubmittedJobGraphStore}.
+        *
+        * @param jobId specifying the job to release the locks for
+        * @throws Exception if the locks cannot be released
+        */
+       void releaseJobGraph(JobID jobId) throws Exception;
+
        /**
         * Get all job ids of submitted job graphs to the submitted job graph 
store.
         *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 7ba5d481177..2b935af229a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -41,7 +41,6 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -69,13 +68,13 @@
        /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
        private final Object cacheLock = new Object();
 
-       /** Client (not a namespace facade) */
+       /** Client (not a namespace facade). */
        private final CuratorFramework client;
 
        /** The set of IDs of all added job graphs. */
        private final Set<JobID> addedJobGraphs = new HashSet<>();
 
-       /** Completed checkpoints in ZooKeeper */
+       /** Completed checkpoints in ZooKeeper. */
        private final ZooKeeperStateHandleStore<SubmittedJobGraph> 
jobGraphsInZooKeeper;
 
        /**
@@ -94,19 +93,17 @@
        private boolean isRunning;
 
        /**
-        * Submitted job graph store backed by ZooKeeper
+        * Submitted job graph store backed by ZooKeeper.
         *
         * @param client ZooKeeper client
         * @param currentJobsPath ZooKeeper path for current job graphs
         * @param stateStorage State storage used to persist the submitted jobs
-        * @param executor to give to the ZooKeeperStateHandleStore to run 
ZooKeeper callbacks
         * @throws Exception
         */
        public ZooKeeperSubmittedJobGraphStore(
                        CuratorFramework client,
                        String currentJobsPath,
-                       RetrievableStateStorageHelper<SubmittedJobGraph> 
stateStorage,
-                       Executor executor) throws Exception {
+                       RetrievableStateStorageHelper<SubmittedJobGraph> 
stateStorage) throws Exception {
 
                checkNotNull(currentJobsPath, "Current jobs path");
                checkNotNull(stateStorage, "State storage");
@@ -123,7 +120,7 @@ public ZooKeeperSubmittedJobGraphStore(
                CuratorFramework facade = 
client.usingNamespace(client.getNamespace() + currentJobsPath);
 
                this.zooKeeperFullBasePath = client.getNamespace() + 
currentJobsPath;
-               this.jobGraphsInZooKeeper = new 
ZooKeeperStateHandleStore<>(facade, stateStorage, executor);
+               this.jobGraphsInZooKeeper = new 
ZooKeeperStateHandleStore<>(facade, stateStorage);
 
                this.pathCache = new PathChildrenCache(facade, "/", false);
                pathCache.getListenable().addListener(new 
SubmittedJobGraphsPathCacheListener());
@@ -276,6 +273,24 @@ public void removeJobGraph(JobID jobId) throws Exception {
                LOG.info("Removed job graph {} from ZooKeeper.", jobId);
        }
 
+       @Override
+       public void releaseJobGraph(JobID jobId) throws Exception {
+               checkNotNull(jobId, "Job ID");
+               final String path = getPathForJob(jobId);
+
+               LOG.debug("Releasing locks of job graph {} from {}{}.", jobId, 
zooKeeperFullBasePath, path);
+
+               synchronized (cacheLock) {
+                       if (addedJobGraphs.contains(jobId)) {
+                               jobGraphsInZooKeeper.release(path);
+
+                               addedJobGraphs.remove(jobId);
+                       }
+               }
+
+               LOG.info("Released locks of job graph {} from ZooKeeper.", 
jobId);
+       }
+
        @Override
        public Collection<JobID> getJobIds() throws Exception {
                Collection<String> paths;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 43c930e6fea..cc1ec7044c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -54,6 +54,9 @@
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+/**
+ * Class containing helper functions to interact with ZooKeeper.
+ */
 public class ZooKeeperUtils {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperUtils.class);
@@ -227,14 +230,12 @@ public static ZooKeeperLeaderElectionService 
createLeaderElectionService(
         *
         * @param client        The {@link CuratorFramework} ZooKeeper client 
to use
         * @param configuration {@link Configuration} object
-        * @param executor to run ZooKeeper callbacks
         * @return {@link ZooKeeperSubmittedJobGraphStore} instance
         * @throws Exception if the submitted job graph store cannot be created
         */
        public static ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs(
                        CuratorFramework client,
-                       Configuration configuration,
-                       Executor executor) throws Exception {
+                       Configuration configuration) throws Exception {
 
                checkNotNull(configuration, "Configuration");
 
@@ -244,7 +245,9 @@ public static ZooKeeperSubmittedJobGraphStore 
createSubmittedJobGraphs(
                String zooKeeperSubmittedJobsPath = 
configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_JOBGRAPHS_PATH);
 
                return new ZooKeeperSubmittedJobGraphStore(
-                               client, zooKeeperSubmittedJobsPath, 
stateStorage, executor);
+                       client,
+                       zooKeeperSubmittedJobsPath,
+                       stateStorage);
        }
 
        /**
@@ -344,6 +347,9 @@ public static String generateZookeeperPath(String root, 
String namespace) {
                return root + namespace;
        }
 
+       /**
+        * Secure {@link ACLProvider} implementation.
+        */
        public static class SecureAclProvider implements ACLProvider {
                @Override
                public List<ACL> getDefaultAcl() {
@@ -356,6 +362,9 @@ public static String generateZookeeperPath(String root, 
String namespace) {
                }
        }
 
+       /**
+        * ZooKeeper client ACL mode enum.
+        */
        public enum ZkClientACLMode {
                CREATOR,
                OPEN;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index 87a433adace..8c3d31fc51b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -18,17 +18,13 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -36,6 +32,7 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -44,7 +41,6 @@
 import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -68,13 +64,13 @@
  * State handle in ZooKeeper =&gt; State handle exists
  * </pre>
  *
- * But not:
+ * <p>But not:
  *
  * <pre>
  * State handle exists =&gt; State handle in ZooKeeper
  * </pre>
  *
- * There can be lingering state handles when failures happen during operation. 
They
+ * <p>There can be lingering state handles when failures happen during 
operation. They
  * need to be cleaned up manually (see <a 
href="https://issues.apache.org/jira/browse/FLINK-2513";>
  * FLINK-2513</a> about a possible way to overcome this).
  *
@@ -84,13 +80,11 @@
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);
 
-       /** Curator ZooKeeper client */
+       /** Curator ZooKeeper client. */
        private final CuratorFramework client;
 
        private final RetrievableStateStorageHelper<T> storage;
 
-       private final Executor executor;
-
        /** Lock node name of this ZooKeeperStateHandleStore. The name should 
be unique among all other state handle stores. */
        private final String lockNode;
 
@@ -103,16 +97,13 @@
         *                            instance, e.g. 
<code>client.usingNamespace("/stateHandles")</code>
         * @param storage to persist the actual state and whose returned state 
handle is then written
         *                to ZooKeeper
-        * @param executor to run the ZooKeeper callbacks
         */
        public ZooKeeperStateHandleStore(
                CuratorFramework client,
-               RetrievableStateStorageHelper<T> storage,
-               Executor executor) {
+               RetrievableStateStorageHelper<T> storage) {
 
                this.client = checkNotNull(client, "Curator client");
                this.storage = checkNotNull(storage, "State storage");
-               this.executor = checkNotNull(executor);
 
                // Generate a unique lock node name
                lockNode = UUID.randomUUID().toString();
@@ -262,7 +253,7 @@ public int exists(String pathInZooKeeper) throws Exception {
        public Collection<String> getAllPaths() throws Exception {
                final String path = "/";
 
-               while(true) {
+               while (true) {
                        Stat stat = client.checkExists().forPath(path);
 
                        if (stat == null) {
@@ -393,33 +384,14 @@ public int exists(String pathInZooKeeper) throws 
Exception {
 
        /**
         * Releases the lock for the given state node and tries to remove the 
state node if it is no longer locked.
-        * The deletion of the state node is executed asynchronously.
-        *
-        * <p><strong>Important</strong>: This also discards the stored state 
handle after the given action
-        * has been executed.
-        *
-        * @param pathInZooKeeper Path of state handle to remove (expected to 
start with a '/')
-        * @throws Exception If the ZooKeeper operation fails
-        */
-       public void releaseAndTryRemove(String pathInZooKeeper) throws 
Exception {
-               releaseAndTryRemove(pathInZooKeeper, null);
-       }
-
-       /**
-        * Releases the lock for the given state node and tries to remove the 
state node if it is no longer locked.
-        * The deletion of the state node is executed asynchronously. After the 
state node has been deleted, the given
-        * callback is called with the {@link RetrievableStateHandle} of the 
deleted state node.
-        *
-        * <p><strong>Important</strong>: This also discards the stored state 
handle after the given action
-        * has been executed.
+        * It returns the {@link RetrievableStateHandle} stored under the given 
state node if any.
         *
         * @param pathInZooKeeper Path of state handle to remove
-        * @param callback The callback to execute after a successful deletion. 
Null if no action needs to be executed.
-        * @throws Exception If the ZooKeeper operation fails
+        * @return True if the state handle could be released
+        * @throws Exception If the ZooKeeper operation or discarding the state 
handle fails
         */
-       public void releaseAndTryRemove(
-                       String pathInZooKeeper,
-                       @Nullable final RemoveCallback<T> callback) throws 
Exception {
+       @Nullable
+       public boolean releaseAndTryRemove(String pathInZooKeeper) throws 
Exception {
                checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
                final String path = normalizePath(pathInZooKeeper);
@@ -429,14 +401,23 @@ public void releaseAndTryRemove(
                try {
                        stateHandle = get(path, false);
                } catch (Exception e) {
-                       LOG.warn("Could not retrieve the state handle from node 
" + path + '.', e);
+                       LOG.warn("Could not retrieve the state handle from node 
{}.", path, e);
                }
 
                release(pathInZooKeeper);
 
-               final BackgroundCallback backgroundCallback = new 
RemoveBackgroundCallback<>(stateHandle, callback, path);
+               try {
+                       client.delete().forPath(path);
+               } catch (KeeperException.NotEmptyException ignored) {
+                       LOG.debug("Could not delete znode {} because it is 
still locked.", path);
+                       return false;
+               }
+
+               if (stateHandle != null) {
+                       stateHandle.discardState();
+               }
 
-               client.delete().inBackground(backgroundCallback, 
executor).forPath(path);
+               return true;
        }
 
        /**
@@ -583,7 +564,7 @@ protected String getLockPath(String rootPath) {
        }
 
        /**
-        * Makes sure that every path starts with a "/"
+        * Makes sure that every path starts with a "/".
         *
         * @param path Path to normalize
         * @return Normalized path such that it starts with a "/"
@@ -595,103 +576,4 @@ private static String normalizePath(String path) {
                        return '/' + path;
                }
        }
-
-       // 
---------------------------------------------------------------------------------------------------------
-       // Utility classes
-       // 
---------------------------------------------------------------------------------------------------------
-
-       /**
-        * Callback which is executed when removing a node from ZooKeeper. The 
callback will call the given
-        * {@link RemoveCallback} if it is not null. Afterwards, it will 
discard the given {@link RetrievableStateHandle}
-        * if it is not null.
-        *
-        * @param <T> Type of the value stored in the RetrievableStateHandle
-        */
-       private static final class RemoveBackgroundCallback<T extends 
Serializable> implements BackgroundCallback {
-               @Nullable
-               private final RetrievableStateHandle<T> stateHandle;
-
-               @Nullable
-               private final RemoveCallback<T> callback;
-
-               private final String pathInZooKeeper;
-
-               private RemoveBackgroundCallback(
-                       @Nullable RetrievableStateHandle<T> stateHandle,
-                       @Nullable RemoveCallback<T> callback,
-                       String pathInZooKeeper) {
-
-                       this.stateHandle = stateHandle;
-                       this.callback = callback;
-                       this.pathInZooKeeper = 
Preconditions.checkNotNull(pathInZooKeeper);
-               }
-
-               @Override
-               public void processResult(CuratorFramework client, CuratorEvent 
event) throws Exception {
-                       try {
-                               if (event.getType() == CuratorEventType.DELETE) 
{
-                                       final KeeperException.Code resultCode = 
KeeperException.Code.get(event.getResultCode());
-
-                                       if (resultCode == 
KeeperException.Code.OK) {
-                                               Exception exception = null;
-
-                                               if (null != callback) {
-                                                       try {
-                                                               
callback.apply(stateHandle);
-                                                       } catch (Throwable e) {
-                                                               exception = new 
Exception("Could not execute delete action for node " +
-                                                                       
pathInZooKeeper + '.', e);
-                                                       }
-                                               }
-
-                                               if (stateHandle != null) {
-                                                       try {
-                                                               // Discard the 
state handle
-                                                               
stateHandle.discardState();
-                                                       } catch (Throwable e) {
-                                                               Exception 
newException = new Exception("Could not discard state handle of node " +
-                                                                       
pathInZooKeeper + '.', e);
-
-                                                               if (exception 
== null) {
-                                                                       
exception = newException;
-                                                               } else {
-                                                                       
exception.addSuppressed(newException);
-                                                               }
-                                                       }
-                                               }
-
-                                               if (exception != null) {
-                                                       throw exception;
-                                               }
-                                       } else if (resultCode == 
KeeperException.Code.NOTEMPTY) {
-                                               // Could not delete the node 
because it still contains children/locks
-                                               LOG.debug("Could not delete 
node " + pathInZooKeeper + " because it is still locked.");
-                                       } else {
-                                               throw new 
IllegalStateException("Unexpected result code " +
-                                                       resultCode.name() + " 
in '" + event + "' callback.");
-                                       }
-                               } else {
-                                       throw new 
IllegalStateException("Unexpected event type " +
-                                               event.getType() + " in '" + 
event + "' callback.");
-                               }
-                       } catch (Exception e) {
-                               LOG.warn("Failed to run callback for delete 
operation on node " + pathInZooKeeper + '.', e);
-                       }
-
-               }
-       }
-
-       /**
-        * Callback interface for remove calls
-        */
-       public interface RemoveCallback<T extends Serializable> {
-               /**
-                * Callback method. The parameter can be null if the {@link 
RetrievableStateHandle} could not be retrieved
-                * from ZooKeeper.
-                *
-                * @param value RetrievableStateHandle retrieved from 
ZooKeeper, null if it was not retrievable
-                * @throws FlinkException If the callback failed
-                */
-               void apply(@Nullable RetrievableStateHandle<T> value) throws 
FlinkException;
-       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
index d3b7dc5b379..3e294e0dbdd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperUtilityFactory.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.shared.SharedCount;
-import org.apache.curator.framework.recipes.shared.SharedValue;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.SharedValue;
+
 import java.io.Serializable;
-import java.util.concurrent.Executor;
 
 /**
  * Creates ZooKeeper utility classes without exposing the {@link 
CuratorFramework} dependency. The
@@ -71,7 +71,6 @@ public void close(boolean cleanup) throws Exception {
         *
         * @param zkStateHandleStorePath specifying the path in ZooKeeper to 
store the state handles to
         * @param stateStorageHelper storing the actual state data
-        * @param executor to run asynchronous callbacks of the state handle 
store
         * @param <T> Type of the state to be stored
         * @return a ZooKeeperStateHandleStore instance
         * @throws Exception if ZooKeeper could not create the provided state 
handle store path in
@@ -79,8 +78,7 @@ public void close(boolean cleanup) throws Exception {
         */
        public <T extends Serializable> ZooKeeperStateHandleStore<T> 
createZooKeeperStateHandleStore(
                        String zkStateHandleStorePath,
-                       RetrievableStateStorageHelper<T> stateStorageHelper,
-                       Executor executor) throws Exception {
+                       RetrievableStateStorageHelper<T> stateStorageHelper) 
throws Exception {
 
                
facade.newNamespaceAwareEnsurePath(zkStateHandleStorePath).ensure(facade.getZookeeperClient());
                CuratorFramework stateHandleStoreFacade = facade.usingNamespace(
@@ -88,7 +86,7 @@ public void close(boolean cleanup) throws Exception {
                                facade.getNamespace(),
                                zkStateHandleStorePath));
 
-               return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, 
stateStorageHelper, executor);
+               return new ZooKeeperStateHandleStore<>(stateHandleStoreFacade, 
stateStorageHelper);
        }
 
        /**
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4f0709ed416..c588ecc7053 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1728,21 +1728,22 @@ class JobManager(
     val futureOption = currentJobs.remove(jobID) match {
       case Some((eg, _)) =>
         val cleanUpFuture: Future[Unit] = Future {
-          val cleanupHABlobs = if (removeJobFromStateBackend) {
-            try {
+          val cleanupHABlobs = try {
+            if (removeJobFromStateBackend) {
               // ...otherwise, we can have lingering resources when there is a 
 concurrent shutdown
               // and the ZooKeeper client is closed. Not removing the job 
immediately allow the
               // shutdown to release all resources.
               submittedJobGraphs.removeJobGraph(jobID)
               true
-            } catch {
-              case t: Throwable => {
-                log.warn(s"Could not remove submitted job graph $jobID.", t)
-                false
-              }
+            } else {
+              submittedJobGraphs.releaseJobGraph(jobID)
+              false
+            }
+          } catch {
+            case t: Throwable => {
+              log.warn(s"Could not remove submitted job graph $jobID.", t)
+              false
             }
-          } else {
-            false
           }
 
           blobServer.cleanupJob(jobID, cleanupHABlobs)
@@ -1777,19 +1778,23 @@ class JobManager(
     */
   private def cancelAndClearEverything(cause: Throwable)
     : Seq[Future[Unit]] = {
-    val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
-      future {
-        eg.suspend(cause)
-        jobManagerMetricGroup.removeJob(eg.getJobID)
+
+    val futures = currentJobs.values.flatMap(
+      egJobInfo => {
+        val executionGraph = egJobInfo._1
+        val jobInfo = egJobInfo._2
+
+        executionGraph.suspend(cause)
+
+        val jobId = executionGraph.getJobID
 
         jobInfo.notifyNonDetachedClients(
           decorateMessage(
             Failure(
-              new JobExecutionException(jobID, "All jobs are cancelled and 
cleared.", cause))))
-      }(context.dispatcher)
-    }
+              new JobExecutionException(jobId, "All jobs are cancelled and 
cleared.", cause))))
 
-    currentJobs.clear()
+        removeJob(jobId, false)
+      })
 
     futures.toSeq
   }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 81569649663..c4d89030dc3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -193,7 +193,7 @@ public void testDiscardAllCheckpoints() throws Exception {
 
        // 
---------------------------------------------------------------------------------------------
 
-       protected TestCompletedCheckpoint createCheckpoint(
+       public static TestCompletedCheckpoint createCheckpoint(
                int id,
                SharedStateRegistry sharedStateRegistry) throws IOException {
 
@@ -226,7 +226,12 @@ protected void 
verifyCheckpointRegistered(Collection<OperatorState> operatorStat
                }
        }
 
-       protected void verifyCheckpointDiscarded(Collection<OperatorState> 
operatorStates) {
+       public static void verifyCheckpointDiscarded(TestCompletedCheckpoint 
completedCheckpoint) {
+               assertTrue(completedCheckpoint.isDiscarded());
+               
verifyCheckpointDiscarded(completedCheckpoint.getOperatorStates().values());
+       }
+
+       protected static void 
verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
                for (OperatorState operatorState : operatorStates) {
                        for (OperatorSubtaskState subtaskState : 
operatorState.getStates()) {
                                
Assert.assertTrue(((TestOperatorSubtaskState)subtaskState).discarded);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
new file mode 100644
index 00000000000..1f7d3691e50
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.ErrorListenerPathable;
+import org.apache.curator.utils.EnsurePath;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+/**
+ * Mockito based tests for the {@link ZooKeeperStateHandleStore}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ZooKeeperCompletedCheckpointStore.class)
+public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger {
+
+       /**
+        * Tests that the completed checkpoint store can retrieve all 
checkpoints stored in ZooKeeper
+        * and ignores those which cannot be retrieved via their state handles.
+        *
+        * <p>We have a timeout in case the ZooKeeper store get's into a 
deadlock/livelock situation.
+        */
+       @Test(timeout = 50000)
+       public void testCheckpointRecovery() throws Exception {
+               final JobID jobID = new JobID();
+               final long checkpoint1Id = 1L;
+               final long checkpoint2Id = 2;
+               final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> checkpointsInZooKeeper = new ArrayList<>(4);
+
+               final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
+               expectedCheckpointIds.add(1L);
+               expectedCheckpointIds.add(2L);
+
+               final RetrievableStateHandle<CompletedCheckpoint> 
failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
+               
when(failingRetrievableStateHandle.retrieveState()).thenThrow(new 
IOException("Test exception"));
+
+               final RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle1 = mock(RetrievableStateHandle.class);
+               when(retrievableStateHandle1.retrieveState()).then(
+                       (invocation) -> new CompletedCheckpoint(
+                               jobID,
+                               checkpoint1Id,
+                               1L,
+                               1L,
+                               new HashMap<>(),
+                               null,
+                               
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+                               new TestCompletedCheckpointStorageLocation()));
+
+               final RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle2 = mock(RetrievableStateHandle.class);
+               when(retrievableStateHandle2.retrieveState()).then(
+                       (invocation -> new CompletedCheckpoint(
+                               jobID,
+                               checkpoint2Id,
+                               2L,
+                               2L,
+                               new HashMap<>(),
+                               null,
+                               
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+                               new TestCompletedCheckpointStorageLocation())));
+
+               checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, 
"/foobar1"));
+               
checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, 
"/failing1"));
+               checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, 
"/foobar2"));
+               
checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, 
"/failing2"));
+
+               final CuratorFramework client = mock(CuratorFramework.class, 
Mockito.RETURNS_DEEP_STUBS);
+               final RetrievableStateStorageHelper<CompletedCheckpoint> 
storageHelperMock = mock(RetrievableStateStorageHelper.class);
+
+               ZooKeeperStateHandleStore<CompletedCheckpoint> 
zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, 
storageHelperMock));
+               
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
+               
doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
+
+               final int numCheckpointsToRetain = 1;
+
+               // Mocking for the delete operation on the CuratorFramework 
client
+               // It assures that the callback is executed synchronously
+
+               final EnsurePath ensurePathMock = mock(EnsurePath.class);
+               final CuratorEvent curatorEventMock = mock(CuratorEvent.class);
+               
when(curatorEventMock.getType()).thenReturn(CuratorEventType.DELETE);
+               when(curatorEventMock.getResultCode()).thenReturn(0);
+               
when(client.newNamespaceAwareEnsurePath(anyString())).thenReturn(ensurePathMock);
+
+               when(
+                       client
+                               .delete()
+                               .inBackground(any(BackgroundCallback.class), 
any(Executor.class))
+               ).thenAnswer(new Answer<ErrorListenerPathable<Void>>() {
+                       @Override
+                       public ErrorListenerPathable<Void> 
answer(InvocationOnMock invocation) throws Throwable {
+                               final BackgroundCallback callback = 
(BackgroundCallback) invocation.getArguments()[0];
+
+                               ErrorListenerPathable<Void> result = 
mock(ErrorListenerPathable.class);
+
+                               
when(result.forPath(anyString())).thenAnswer(new Answer<Void>() {
+                                       @Override
+                                       public Void answer(InvocationOnMock 
invocation) throws Throwable {
+
+                                               callback.processResult(client, 
curatorEventMock);
+
+                                               return null;
+                                       }
+                               });
+
+                               return result;
+                       }
+               });
+
+               final String checkpointsPath = "foobar";
+               final RetrievableStateStorageHelper<CompletedCheckpoint> 
stateStorage = mock(RetrievableStateStorageHelper.class);
+
+               ZooKeeperCompletedCheckpointStore 
zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
+                       numCheckpointsToRetain,
+                       client,
+                       checkpointsPath,
+                       stateStorage,
+                       Executors.directExecutor());
+
+               zooKeeperCompletedCheckpointStore.recover();
+
+               CompletedCheckpoint latestCompletedCheckpoint = 
zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
+
+               // check that we return the latest retrievable checkpoint
+               // this should remove the latest checkpoint because it is broken
+               assertEquals(checkpoint2Id, 
latestCompletedCheckpoint.getCheckpointID());
+
+               // this should remove the second broken checkpoint because 
we're iterating over all checkpoints
+               List<CompletedCheckpoint> completedCheckpoints = 
zooKeeperCompletedCheckpointStore.getAllCheckpoints();
+
+               Collection<Long> actualCheckpointIds = new 
HashSet<>(completedCheckpoints.size());
+
+               for (CompletedCheckpoint completedCheckpoint : 
completedCheckpoints) {
+                       
actualCheckpointIds.add(completedCheckpoint.getCheckpointID());
+               }
+
+               assertEquals(expectedCheckpointIds, actualCheckpointIds);
+
+               // check that we did not discard any of the state handles
+               verify(retrievableStateHandle1, never()).discardState();
+               verify(retrievableStateHandle2, never()).discardState();
+
+               // Make sure that we also didn't discard any of the broken 
handles. Only when checkpoints
+               // are subsumed should they be discarded.
+               verify(failingRetrievableStateHandle, never()).discardState();
+       }
+
+       /**
+        * Tests that the checkpoint does not exist in the store when we fail 
to add
+        * it into the store (i.e., there exists an exception thrown by the 
method).
+        */
+       @Test
+       public void testAddCheckpointWithFailedRemove() throws Exception {
+               final CuratorFramework client = mock(CuratorFramework.class, 
Mockito.RETURNS_DEEP_STUBS);
+               final RetrievableStateStorageHelper<CompletedCheckpoint> 
storageHelperMock = mock(RetrievableStateStorageHelper.class);
+
+               ZooKeeperStateHandleStore<CompletedCheckpoint> 
zookeeperStateHandleStoreMock =
+                       spy(new ZooKeeperStateHandleStore<>(client, 
storageHelperMock));
+               
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock);
+
+               doAnswer(new 
Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
+                       @Override
+                       public RetrievableStateHandle<CompletedCheckpoint> 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                               CompletedCheckpoint checkpoint = 
(CompletedCheckpoint) invocationOnMock.getArguments()[1];
+
+                               RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle = mock(RetrievableStateHandle.class);
+                               
when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint);
+
+                               return retrievableStateHandle;
+                       }
+               }).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), 
any(CompletedCheckpoint.class));
+
+               doThrow(new 
Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString());
+
+               final int numCheckpointsToRetain = 1;
+               final String checkpointsPath = "foobar";
+               final RetrievableStateStorageHelper<CompletedCheckpoint> 
stateSotrage = mock(RetrievableStateStorageHelper.class);
+
+               ZooKeeperCompletedCheckpointStore 
zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
+                       numCheckpointsToRetain,
+                       client,
+                       checkpointsPath,
+                       stateSotrage,
+                       Executors.directExecutor());
+
+               for (long i = 0; i <= numCheckpointsToRetain; ++i) {
+                       CompletedCheckpoint checkpointToAdd = 
mock(CompletedCheckpoint.class);
+                       doReturn(i).when(checkpointToAdd).getCheckpointID();
+                       
doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
+
+                       try {
+                               
zooKeeperCompletedCheckpointStore.addCheckpoint(checkpointToAdd);
+
+                               // The checkpoint should be in the store if we 
successfully add it into the store.
+                               List<CompletedCheckpoint> addedCheckpoints = 
zooKeeperCompletedCheckpointStore.getAllCheckpoints();
+                               
assertTrue(addedCheckpoints.contains(checkpointToAdd));
+                       } catch (Exception e) {
+                               // The checkpoint should not be in the store if 
any exception is thrown.
+                               List<CompletedCheckpoint> addedCheckpoints = 
zooKeeperCompletedCheckpointStore.getAllCheckpoints();
+                               
assertFalse(addedCheckpoints.contains(checkpointToAdd));
+                       }
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 0384733fdb1..f992d3b00c0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -18,60 +18,39 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
-import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
-import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.api.ErrorListenerPathable;
-import org.apache.curator.utils.EnsurePath;
+import org.hamcrest.Matchers;
+import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+
+import javax.annotation.Nonnull;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.io.Serializable;
 import java.util.List;
-import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.doAnswer;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
+import static org.junit.Assert.assertThat;
 
 /**
  * Tests for {@link ZooKeeperCompletedCheckpointStore}.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ZooKeeperCompletedCheckpointStore.class)
 public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 
+       @ClassRule
+       public static ZooKeeperResource zooKeeperResource = new 
ZooKeeperResource();
+
        @Test
        public void testPathConversion() {
                final long checkpointId = 42L;
@@ -82,188 +61,103 @@ public void testPathConversion() {
        }
 
        /**
-        * Tests that the completed checkpoint store can retrieve all 
checkpoints stored in ZooKeeper
-        * and ignores those which cannot be retrieved via their state handles.
-        *
-        * <p>We have a timeout in case the ZooKeeper store get's into a 
deadlock/livelock situation.
+        * Tests that subsumed checkpoints are discarded.
         */
-       @Test(timeout = 50000)
-       public void testCheckpointRecovery() throws Exception {
-               final JobID jobID = new JobID();
-               final long checkpoint1Id = 1L;
-               final long checkpoint2Id = 2;
-               final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> checkpointsInZooKeeper = new ArrayList<>(4);
-
-               final Collection<Long> expectedCheckpointIds = new HashSet<>(2);
-               expectedCheckpointIds.add(1L);
-               expectedCheckpointIds.add(2L);
-
-               final RetrievableStateHandle<CompletedCheckpoint> 
failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
-               
when(failingRetrievableStateHandle.retrieveState()).thenThrow(new 
IOException("Test exception"));
-
-               final RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle1 = mock(RetrievableStateHandle.class);
-               when(retrievableStateHandle1.retrieveState()).then(
-                       (invocation) -> new CompletedCheckpoint(
-                               jobID,
-                               checkpoint1Id,
-                               1L,
-                               1L,
-                               new HashMap<>(),
-                               null,
-                               
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                               new TestCompletedCheckpointStorageLocation()));
-
-               final RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle2 = mock(RetrievableStateHandle.class);
-               when(retrievableStateHandle2.retrieveState()).then(
-                       (invocation -> new CompletedCheckpoint(
-                               jobID,
-                               checkpoint2Id,
-                               2L,
-                               2L,
-                               new HashMap<>(),
-                               null,
-                               
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                               new TestCompletedCheckpointStorageLocation())));
-
-               checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, 
"/foobar1"));
-               
checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, 
"/failing1"));
-               checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, 
"/foobar2"));
-               
checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, 
"/failing2"));
-
-               final CuratorFramework client = mock(CuratorFramework.class, 
Mockito.RETURNS_DEEP_STUBS);
-               final RetrievableStateStorageHelper<CompletedCheckpoint> 
storageHelperMock = mock(RetrievableStateStorageHelper.class);
-
-               ZooKeeperStateHandleStore<CompletedCheckpoint> 
zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, 
storageHelperMock, Executors.directExecutor()));
-               
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zooKeeperStateHandleStoreMock);
-               
doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllSortedByNameAndLock();
-
-               final int numCheckpointsToRetain = 1;
-
-               // Mocking for the delete operation on the CuratorFramework 
client
-               // It assures that the callback is executed synchronously
-
-               final EnsurePath ensurePathMock = mock(EnsurePath.class);
-               final CuratorEvent curatorEventMock = mock(CuratorEvent.class);
-               
when(curatorEventMock.getType()).thenReturn(CuratorEventType.DELETE);
-               when(curatorEventMock.getResultCode()).thenReturn(0);
-               
when(client.newNamespaceAwareEnsurePath(anyString())).thenReturn(ensurePathMock);
-
-               when(
-                       client
-                               .delete()
-                               .inBackground(any(BackgroundCallback.class), 
any(Executor.class))
-               ).thenAnswer(new Answer<ErrorListenerPathable<Void>>() {
-                       @Override
-                       public ErrorListenerPathable<Void> 
answer(InvocationOnMock invocation) throws Throwable {
-                               final BackgroundCallback callback = 
(BackgroundCallback) invocation.getArguments()[0];
+       @Test
+       public void testDiscardingSubsumedCheckpoints() throws Exception {
+               final SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
+               final Configuration configuration = new Configuration();
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zooKeeperResource.getConnectString());
+
+               final CuratorFramework client = 
ZooKeeperUtils.startCuratorFramework(configuration);
+               final ZooKeeperCompletedCheckpointStore checkpointStore = 
createZooKeeperCheckpointStore(client);
+
+               try {
+                       final 
CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = 
CompletedCheckpointStoreTest.createCheckpoint(0, sharedStateRegistry);
+
+                       checkpointStore.addCheckpoint(checkpoint1);
+                       assertThat(checkpointStore.getAllCheckpoints(), 
Matchers.contains(checkpoint1));
+
+                       final 
CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint2 = 
CompletedCheckpointStoreTest.createCheckpoint(1, sharedStateRegistry);
+                       checkpointStore.addCheckpoint(checkpoint2);
+                       final List<CompletedCheckpoint> allCheckpoints = 
checkpointStore.getAllCheckpoints();
+                       assertThat(allCheckpoints, 
Matchers.contains(checkpoint2));
+                       assertThat(allCheckpoints, 
Matchers.not(Matchers.contains(checkpoint1)));
+
+                       // verify that the subsumed checkpoint is discarded
+                       
CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
+               } finally {
+                       client.close();
+               }
+       }
 
-                               ErrorListenerPathable<Void> result = 
mock(ErrorListenerPathable.class);
+       /**
+        * Tests that checkpoints are discarded when the completed checkpoint 
store is shut
+        * down with a globally terminal state.
+        */
+       @Test
+       public void testDiscardingCheckpointsAtShutDown() throws Exception {
+               final SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
+               final Configuration configuration = new Configuration();
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zooKeeperResource.getConnectString());
 
-                               
when(result.forPath(anyString())).thenAnswer(new Answer<Void>() {
-                                       @Override
-                                       public Void answer(InvocationOnMock 
invocation) throws Throwable {
+               final CuratorFramework client = 
ZooKeeperUtils.startCuratorFramework(configuration);
+               final ZooKeeperCompletedCheckpointStore checkpointStore = 
createZooKeeperCheckpointStore(client);
 
-                                               callback.processResult(client, 
curatorEventMock);
+               try {
+                       final 
CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = 
CompletedCheckpointStoreTest.createCheckpoint(0, sharedStateRegistry);
 
-                                               return null;
-                                       }
-                               });
+                       checkpointStore.addCheckpoint(checkpoint1);
+                       assertThat(checkpointStore.getAllCheckpoints(), 
Matchers.contains(checkpoint1));
 
-                               return result;
-                       }
-               });
+                       checkpointStore.shutdown(JobStatus.FINISHED);
 
-               final String checkpointsPath = "foobar";
-               final RetrievableStateStorageHelper<CompletedCheckpoint> 
stateStorage = mock(RetrievableStateStorageHelper.class);
+                       // verify that the checkpoint is discarded
+                       
CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
+               } finally {
+                       client.close();
+               }
+       }
 
-               ZooKeeperCompletedCheckpointStore 
zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
-                       numCheckpointsToRetain,
+       @Nonnull
+       private ZooKeeperCompletedCheckpointStore 
createZooKeeperCheckpointStore(CuratorFramework client) throws Exception {
+               return new ZooKeeperCompletedCheckpointStore(
+                       1,
                        client,
-                       checkpointsPath,
-                       stateStorage,
+                       "/checkpoints",
+                       new TestingRetrievableStateStorageHelper<>(),
                        Executors.directExecutor());
+       }
 
-               zooKeeperCompletedCheckpointStore.recover();
-
-               CompletedCheckpoint latestCompletedCheckpoint = 
zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
-
-               // check that we return the latest retrievable checkpoint
-               // this should remove the latest checkpoint because it is broken
-               assertEquals(checkpoint2Id, 
latestCompletedCheckpoint.getCheckpointID());
-
-               // this should remove the second broken checkpoint because 
we're iterating over all checkpoints
-               List<CompletedCheckpoint> completedCheckpoints = 
zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-
-               Collection<Long> actualCheckpointIds = new 
HashSet<>(completedCheckpoints.size());
-
-               for (CompletedCheckpoint completedCheckpoint : 
completedCheckpoints) {
-                       
actualCheckpointIds.add(completedCheckpoint.getCheckpointID());
+       private static final class TestingRetrievableStateStorageHelper<T 
extends Serializable> implements RetrievableStateStorageHelper<T> {
+               @Override
+               public RetrievableStateHandle<T> store(T state) {
+                       return new TestingRetrievableStateHandle<>(state);
                }
 
-               assertEquals(expectedCheckpointIds, actualCheckpointIds);
-
-               // check that we did not discard any of the state handles
-               verify(retrievableStateHandle1, never()).discardState();
-               verify(retrievableStateHandle2, never()).discardState();
+               private static class TestingRetrievableStateHandle<T extends 
Serializable> implements RetrievableStateHandle<T> {
 
-               // Make sure that we also didn't discard any of the broken 
handles. Only when checkpoints
-               // are subsumed should they be discarded.
-               verify(failingRetrievableStateHandle, never()).discardState();
-       }
+                       private static final long serialVersionUID = 
137053380713794300L;
 
-       /**
-        * Tests that the checkpoint does not exist in the store when we fail 
to add
-        * it into the store (i.e., there exists an exception thrown by the 
method).
-        */
-       @Test
-       public void testAddCheckpointWithFailedRemove() throws Exception {
-               final CuratorFramework client = mock(CuratorFramework.class, 
Mockito.RETURNS_DEEP_STUBS);
-               final RetrievableStateStorageHelper<CompletedCheckpoint> 
storageHelperMock = mock(RetrievableStateStorageHelper.class);
+                       private final T state;
 
-               ZooKeeperStateHandleStore<CompletedCheckpoint> 
zookeeperStateHandleStoreMock =
-                       spy(new ZooKeeperStateHandleStore<>(client, 
storageHelperMock, Executors.directExecutor()));
-               
whenNew(ZooKeeperStateHandleStore.class).withAnyArguments().thenReturn(zookeeperStateHandleStoreMock);
+                       private TestingRetrievableStateHandle(T state) {
+                               this.state = state;
+                       }
 
-               doAnswer(new 
Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
                        @Override
-                       public RetrievableStateHandle<CompletedCheckpoint> 
answer(InvocationOnMock invocationOnMock) throws Throwable {
-                               CompletedCheckpoint checkpoint = 
(CompletedCheckpoint) invocationOnMock.getArguments()[1];
-
-                               RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle = mock(RetrievableStateHandle.class);
-                               
when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint);
-
-                               return retrievableStateHandle;
+                       public T retrieveState() throws IOException, 
ClassNotFoundException {
+                               return state;
                        }
-               }).when(zookeeperStateHandleStoreMock).addAndLock(anyString(), 
any(CompletedCheckpoint.class));
-
-               doThrow(new 
Exception()).when(zookeeperStateHandleStoreMock).releaseAndTryRemove(anyString(),
 any(ZooKeeperStateHandleStore.RemoveCallback.class));
 
-               final int numCheckpointsToRetain = 1;
-               final String checkpointsPath = "foobar";
-               final RetrievableStateStorageHelper<CompletedCheckpoint> 
stateSotrage = mock(RetrievableStateStorageHelper.class);
-
-               ZooKeeperCompletedCheckpointStore 
zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore(
-                       numCheckpointsToRetain,
-                       client,
-                       checkpointsPath,
-                       stateSotrage,
-                       Executors.directExecutor());
+                       @Override
+                       public void discardState() throws Exception {
+                               // no op
+                       }
 
-               for (long i = 0; i <= numCheckpointsToRetain; ++i) {
-                       CompletedCheckpoint checkpointToAdd = 
mock(CompletedCheckpoint.class);
-                       doReturn(i).when(checkpointToAdd).getCheckpointID();
-                       
doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
-
-                       try {
-                               
zooKeeperCompletedCheckpointStore.addCheckpoint(checkpointToAdd);
-
-                               // The checkpoint should be in the store if we 
successfully add it into the store.
-                               List<CompletedCheckpoint> addedCheckpoints = 
zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-                               
assertTrue(addedCheckpoints.contains(checkpointToAdd));
-                       } catch (Exception e) {
-                               // The checkpoint should not be in the store if 
any exception is thrown.
-                               List<CompletedCheckpoint> addedCheckpoints = 
zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-                               
assertFalse(addedCheckpoints.contains(checkpointToAdd));
+                       @Override
+                       public long getStateSize() {
+                               return 0;
                        }
                }
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index 2c030d24c49..cb26f4862b1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -30,9 +30,11 @@
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -41,6 +43,7 @@
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -110,8 +113,6 @@ public static void teardownClass() throws 
ExecutionException, InterruptedExcepti
         */
        @Test
        public void testGrantingRevokingLeadership() throws Exception {
-
-               final Configuration configuration = new Configuration();
                final TestingHighAvailabilityServices highAvailabilityServices 
= new TestingHighAvailabilityServices();
                final JobGraph nonEmptyJobGraph = createNonEmptyJobGraph();
                final SubmittedJobGraph submittedJobGraph = new 
SubmittedJobGraph(nonEmptyJobGraph, null);
@@ -124,7 +125,34 @@ public void testGrantingRevokingLeadership() throws 
Exception {
 
                final BlockingQueue<DispatcherId> fencingTokens = new 
ArrayBlockingQueue<>(2);
 
-               final HATestingDispatcher dispatcher = new HATestingDispatcher(
+               final HATestingDispatcher dispatcher = 
createHADispatcher(highAvailabilityServices, fencingTokens);
+
+               dispatcher.start();
+
+               try {
+                       final UUID leaderId = UUID.randomUUID();
+                       dispatcherLeaderElectionService.isLeader(leaderId);
+
+                       dispatcherLeaderElectionService.notLeader();
+
+                       final DispatcherId firstFencingToken = 
fencingTokens.take();
+
+                       assertThat(firstFencingToken, 
equalTo(NULL_FENCING_TOKEN));
+
+                       enterGetJobIdsLatch.await();
+                       proceedGetJobIdsLatch.trigger();
+
+                       assertThat(dispatcher.getNumberJobs(timeout).get(), 
is(0));
+
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
+               }
+       }
+
+       @Nonnull
+       private HATestingDispatcher 
createHADispatcher(TestingHighAvailabilityServices highAvailabilityServices, 
BlockingQueue<DispatcherId> fencingTokens) throws Exception {
+               final Configuration configuration = new Configuration();
+               return new HATestingDispatcher(
                        rpcService,
                        UUID.randomUUID().toString(),
                        configuration,
@@ -138,33 +166,63 @@ public void testGrantingRevokingLeadership() throws 
Exception {
                        new TestingJobManagerRunnerFactory(new 
CompletableFuture<>(), new CompletableFuture<>()),
                        testingFatalErrorHandler,
                        fencingTokens);
+       }
+
+       /**
+        * Tests that all JobManagerRunner are terminated if the leadership of 
the
+        * Dispatcher is revoked.
+        */
+       @Test
+       public void testRevokeLeadershipTerminatesJobManagerRunners() throws 
Exception {
+
+               final TestingHighAvailabilityServices highAvailabilityServices 
= new TestingHighAvailabilityServices();
+               highAvailabilityServices.setSubmittedJobGraphStore(new 
StandaloneSubmittedJobGraphStore());
+
+               final TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
+               
highAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
+
+               final ArrayBlockingQueue<DispatcherId> fencingTokens = new 
ArrayBlockingQueue<>(2);
+               final HATestingDispatcher dispatcher = createHADispatcher(
+                       highAvailabilityServices,
+                       fencingTokens);
 
                dispatcher.start();
 
                try {
-                       final UUID leaderId = UUID.randomUUID();
-                       dispatcherLeaderElectionService.isLeader(leaderId);
+                       // grant leadership and submit a single job
+                       final DispatcherId expectedDispatcherId = 
DispatcherId.generate();
 
-                       dispatcherLeaderElectionService.notLeader();
+                       
leaderElectionService.isLeader(expectedDispatcherId.toUUID()).get();
 
-                       final DispatcherId firstFencingToken = 
fencingTokens.take();
+                       assertThat(fencingTokens.take(), 
is(equalTo(expectedDispatcherId)));
 
-                       assertThat(firstFencingToken, 
equalTo(NULL_FENCING_TOKEN));
+                       final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
-                       enterGetJobIdsLatch.await();
-                       proceedGetJobIdsLatch.trigger();
+                       final CompletableFuture<Acknowledge> submissionFuture = 
dispatcherGateway.submitJob(createNonEmptyJobGraph(), timeout);
 
-                       assertThat(dispatcher.getNumberJobs(timeout).get(), 
is(0));
+                       submissionFuture.get();
+
+                       assertThat(dispatcher.getNumberJobs(timeout).get(), 
is(1));
+
+                       // revoke the leadership --> this should stop all 
running JobManagerRunners
+                       leaderElectionService.notLeader();
+
+                       assertThat(fencingTokens.take(), 
is(equalTo(NULL_FENCING_TOKEN)));
 
+                       assertThat(dispatcher.getNumberJobs(timeout).get(), 
is(0));
                } finally {
                        RpcUtils.terminateRpcEndpoint(dispatcher, timeout);
                }
        }
 
        @Nonnull
-       private JobGraph createNonEmptyJobGraph() {
+       public static JobGraph createNonEmptyJobGraph() {
                final JobVertex noOpVertex = new JobVertex("NoOp vertex");
-               return new JobGraph(noOpVertex);
+               noOpVertex.setInvokableClass(NoOpInvokable.class);
+               final JobGraph jobGraph = new JobGraph(noOpVertex);
+               jobGraph.setAllowQueuedScheduling(true);
+
+               return jobGraph;
        }
 
        private static class HATestingDispatcher extends TestingDispatcher {
@@ -243,6 +301,11 @@ public void removeJobGraph(JobID jobId) throws Exception {
                        throw new UnsupportedOperationException("Should not be 
called.");
                }
 
+               @Override
+               public void releaseJobGraph(JobID jobId) throws Exception {
+                       throw new UnsupportedOperationException("Should not be 
called.");
+               }
+
                @Override
                public Collection<JobID> getJobIds() throws Exception {
                        enterGetJobIdsLatch.trigger();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpSubmittedJobGraphListener.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpSubmittedJobGraphListener.java
new file mode 100644
index 00000000000..493534dd93c
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/NoOpSubmittedJobGraphListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+
+/**
+ * No operation {@link 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener}
+ * implemetation for testing purposes.
+ */
+public enum NoOpSubmittedJobGraphListener implements 
SubmittedJobGraphStore.SubmittedJobGraphListener {
+       INSTANCE;
+
+       @Override
+       public void onAddedJobGraph(JobID jobId) {
+               // No op
+       }
+
+       @Override
+       public void onRemovedJobGraph(JobID jobId) {
+               // No op
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
index f5091ea5b10..5141be039f7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
@@ -29,8 +31,12 @@
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
 /**
  * {@link Dispatcher} implementation used for testing purposes.
  */
@@ -72,4 +78,11 @@ void completeJobExecution(ArchivedExecutionGraph 
archivedExecutionGraph) {
                runAsync(
                        () -> 
jobReachedGloballyTerminalState(archivedExecutionGraph));
        }
+
+       @VisibleForTesting
+       public CompletableFuture<Void> getJobTerminationFuture(@Nonnull JobID 
jobId, @Nonnull Time timeout) {
+               return callAsyncWithoutFencing(
+                       () -> getJobTerminationFuture(jobId),
+                       timeout).thenCompose(Function.identity());
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
new file mode 100644
index 00000000000..dd0375886a9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/ZooKeeperHADispatcherTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
+import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test cases for the interaction between ZooKeeper HA and the {@link 
Dispatcher}.
+ */
+public class ZooKeeperHADispatcherTest extends TestLogger {
+
+       private static final Time TIMEOUT = Time.seconds(10L);
+
+       @ClassRule
+       public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new 
ZooKeeperResource();
+
+       @ClassRule
+       public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+       private static Configuration configuration;
+
+       private static TestingRpcService rpcService;
+
+       private static BlobServer blobServer;
+
+       @Rule
+       public TestName name = new TestName();
+
+       private TestingFatalErrorHandler testingFatalErrorHandler;
+
+       @BeforeClass
+       public static void setupClass() throws IOException {
+               configuration = new Configuration();
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
ZOO_KEEPER_RESOURCE.getConnectString());
+               
configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+               rpcService = new TestingRpcService();
+               blobServer = new BlobServer(configuration, new VoidBlobStore());
+       }
+
+       @Before
+       public void setup() {
+               testingFatalErrorHandler = new TestingFatalErrorHandler();
+       }
+
+       @After
+       public void teardown() throws Exception {
+               if (testingFatalErrorHandler != null) {
+                       testingFatalErrorHandler.rethrowError();
+               }
+       }
+
+       @AfterClass
+       public static void teardownClass() throws Exception {
+               if (rpcService != null) {
+                       RpcUtils.terminateRpcService(rpcService, TIMEOUT);
+                       rpcService = null;
+               }
+
+               if (blobServer != null) {
+                       blobServer.close();
+                       blobServer = null;
+               }
+       }
+
+       /**
+        * Tests that the {@link Dispatcher} releases a locked {@link 
SubmittedJobGraph} if it
+        * lost the leadership.
+        */
+       @Test
+       public void testSubmittedJobGraphRelease() throws Exception {
+               final CuratorFramework client = 
ZooKeeperUtils.startCuratorFramework(configuration);
+               final CuratorFramework otherClient = 
ZooKeeperUtils.startCuratorFramework(configuration);
+
+               try (final TestingHighAvailabilityServices 
testingHighAvailabilityServices = new TestingHighAvailabilityServices()) {
+                       
testingHighAvailabilityServices.setSubmittedJobGraphStore(ZooKeeperUtils.createSubmittedJobGraphs(client,
 configuration));
+
+                       final ZooKeeperSubmittedJobGraphStore 
otherSubmittedJobGraphStore = ZooKeeperUtils.createSubmittedJobGraphs(
+                               otherClient,
+                               configuration);
+
+                       
otherSubmittedJobGraphStore.start(NoOpSubmittedJobGraphListener.INSTANCE);
+
+                       final TestingLeaderElectionService 
leaderElectionService = new TestingLeaderElectionService();
+                       
testingHighAvailabilityServices.setDispatcherLeaderElectionService(leaderElectionService);
+
+                       final TestingDispatcher dispatcher = 
createDispatcher(testingHighAvailabilityServices);
+
+                       dispatcher.start();
+
+                       try {
+                               final DispatcherId expectedLeaderId = 
DispatcherId.generate();
+                               
leaderElectionService.isLeader(expectedLeaderId.toUUID()).get();
+
+                               final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+                               final JobGraph nonEmptyJobGraph = 
DispatcherHATest.createNonEmptyJobGraph();
+                               final CompletableFuture<Acknowledge> 
submissionFuture = dispatcherGateway.submitJob(nonEmptyJobGraph, TIMEOUT);
+                               submissionFuture.get();
+
+                               Collection<JobID> jobIds = 
otherSubmittedJobGraphStore.getJobIds();
+
+                               final JobID jobId = nonEmptyJobGraph.getJobID();
+                               assertThat(jobIds, Matchers.contains(jobId));
+
+                               leaderElectionService.notLeader();
+
+                               // wait for the job to properly terminate
+                               final CompletableFuture<Void> 
jobTerminationFuture = dispatcher.getJobTerminationFuture(jobId, TIMEOUT);
+                               jobTerminationFuture.get();
+
+                               // recover the job
+                               final SubmittedJobGraph submittedJobGraph = 
otherSubmittedJobGraphStore.recoverJobGraph(jobId);
+
+                               assertThat(submittedJobGraph, 
Matchers.is(Matchers.notNullValue()));
+
+                               // check that the other submitted job graph 
store can remove the job graph after the original leader
+                               // has lost its leadership
+                               
otherSubmittedJobGraphStore.removeJobGraph(jobId);
+
+                               jobIds = 
otherSubmittedJobGraphStore.getJobIds();
+
+                               assertThat(jobIds, 
Matchers.not(Matchers.contains(jobId)));
+                       } finally {
+                               RpcUtils.terminateRpcEndpoint(dispatcher, 
TIMEOUT);
+                               client.close();
+                               otherClient.close();
+                       }
+               }
+       }
+
+       @Nonnull
+       private TestingDispatcher 
createDispatcher(TestingHighAvailabilityServices 
testingHighAvailabilityServices) throws Exception {
+               return new TestingDispatcher(
+                       rpcService,
+                       Dispatcher.DISPATCHER_NAME + '_' + name.getMethodName(),
+                       configuration,
+                       testingHighAvailabilityServices,
+                       new TestingResourceManagerGateway(),
+                       blobServer,
+                       new HeartbeatServices(1000L, 1000L),
+                       
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+                       null,
+                       new MemoryArchivedExecutionGraphStore(),
+                       new TestingJobManagerRunnerFactory(new 
CompletableFuture<>(), new CompletableFuture<>()),
+                       testingFatalErrorHandler);
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 873a4f1f4f4..c499f265416 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -32,8 +32,8 @@
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import 
org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
 import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
 import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
@@ -151,7 +151,6 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import static org.mockito.Mockito.mock;
 
 public class JobManagerTest extends TestLogger {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
new file mode 100644
index 00000000000..8e5b1b9b392
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherHATest;
+import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.actor.Identify;
+import akka.actor.Terminated;
+import akka.pattern.Patterns;
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the ZooKeeper HA service and {@link JobManager} interaction.
+ */
+public class ZooKeeperHAJobManagerTest extends TestLogger {
+
+       @ClassRule
+       public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new 
ZooKeeperResource();
+
+       @ClassRule
+       public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+       private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, 
TimeUnit.SECONDS);
+
+       private static ActorSystem system;
+
+       @BeforeClass
+       public static void setup() {
+               system = AkkaUtils.createLocalActorSystem(new Configuration());
+       }
+
+       @AfterClass
+       public static void teardown() throws Exception {
+               final Future<Terminated> terminationFuture = system.terminate();
+               Await.ready(terminationFuture, TIMEOUT);
+       }
+
+       /**
+        * Tests that the {@link JobManager} releases all locked {@link 
JobGraph} if it loses
+        * leadership.
+        */
+       @Test
+       public void testJobGraphReleaseWhenLosingLeadership() throws Exception {
+               final Configuration configuration = new Configuration();
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
ZOO_KEEPER_RESOURCE.getConnectString());
+               
configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+               try (TestingHighAvailabilityServices highAvailabilityServices = 
new TestingHighAvailabilityServices()) {
+
+                       final CuratorFramework client = 
ZooKeeperUtils.startCuratorFramework(configuration);
+                       final TestingLeaderElectionService 
leaderElectionService = new TestingLeaderElectionService();
+                       
highAvailabilityServices.setJobMasterLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID,
 leaderElectionService);
+                       
highAvailabilityServices.setSubmittedJobGraphStore(ZooKeeperUtils.createSubmittedJobGraphs(client,
 configuration));
+                       
highAvailabilityServices.setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory());
+
+                       final CuratorFramework otherClient = 
ZooKeeperUtils.startCuratorFramework(configuration);
+                       final ZooKeeperSubmittedJobGraphStore 
otherSubmittedJobGraphStore = 
ZooKeeperUtils.createSubmittedJobGraphs(otherClient, configuration);
+                       
otherSubmittedJobGraphStore.start(NoOpSubmittedJobGraphListener.INSTANCE);
+
+                       ActorRef jobManagerActorRef = null;
+                       try {
+                               jobManagerActorRef = 
JobManager.startJobManagerActors(
+                                       configuration,
+                                       system,
+                                       TestingUtils.defaultExecutor(),
+                                       TestingUtils.defaultExecutor(),
+                                       highAvailabilityServices,
+                                       NoOpMetricRegistry.INSTANCE,
+                                       Option.empty(),
+                                       TestingJobManager.class,
+                                       MemoryArchivist.class)._1();
+
+                               waitForActorToBeStarted(jobManagerActorRef, 
TIMEOUT);
+
+                               final ActorGateway jobManager = new 
AkkaActorGateway(jobManagerActorRef, 
HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+                               
leaderElectionService.isLeader(HighAvailabilityServices.DEFAULT_LEADER_ID).get();
+
+                               final JobGraph nonEmptyJobGraph = 
DispatcherHATest.createNonEmptyJobGraph();
+
+                               final JobManagerMessages.SubmitJob 
submitJobMessage = new JobManagerMessages.SubmitJob(nonEmptyJobGraph, 
ListeningBehaviour.DETACHED);
+
+                               Await.result(jobManager.ask(submitJobMessage, 
TIMEOUT), TIMEOUT);
+
+                               Collection<JobID> jobIds = 
otherSubmittedJobGraphStore.getJobIds();
+
+                               final JobID jobId = nonEmptyJobGraph.getJobID();
+                               assertThat(jobIds, contains(jobId));
+
+                               // revoke the leadership
+                               leaderElectionService.notLeader();
+
+                               
Await.result(jobManager.ask(TestingJobManagerMessages.getWaitForBackgroundTasksToFinish(),
 TIMEOUT), TIMEOUT);
+
+                               final SubmittedJobGraph recoveredJobGraph = 
akka.serialization.JavaSerializer.currentSystem().withValue(
+                                       ((ExtendedActorSystem) system),
+                                       () -> 
otherSubmittedJobGraphStore.recoverJobGraph(jobId));
+
+                               assertThat(recoveredJobGraph, 
is(notNullValue()));
+
+                               
otherSubmittedJobGraphStore.removeJobGraph(jobId);
+
+                               jobIds = 
otherSubmittedJobGraphStore.getJobIds();
+
+                               assertThat(jobIds, not(contains(jobId)));
+                       } finally {
+                               client.close();
+                               otherClient.close();
+
+                               if (jobManagerActorRef != null) {
+                                       
ActorUtils.stopActor(jobManagerActorRef);
+                               }
+                       }
+               }
+       }
+
+       private void waitForActorToBeStarted(ActorRef jobManagerActorRef, 
FiniteDuration timeout) throws InterruptedException, 
java.util.concurrent.TimeoutException {
+               Await.ready(Patterns.ask(jobManagerActorRef, new Identify(42), 
timeout.toMillis()), timeout);
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index c1a7b536721..e9be145c37f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -20,7 +20,6 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
@@ -90,8 +89,7 @@ public void testPutAndRemoveJobGraph() throws Exception {
                ZooKeeperSubmittedJobGraphStore jobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
                        ZooKeeper.createClient(),
                        "/testPutAndRemoveJobGraph",
-                       localStateStorage,
-                       Executors.directExecutor());
+                       localStateStorage);
 
                try {
                        SubmittedJobGraphListener listener = 
mock(SubmittedJobGraphListener.class);
@@ -147,7 +145,7 @@ public void testPutAndRemoveJobGraph() throws Exception {
        @Test
        public void testRecoverJobGraphs() throws Exception {
                ZooKeeperSubmittedJobGraphStore jobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
-                               ZooKeeper.createClient(), 
"/testRecoverJobGraphs", localStateStorage, Executors.directExecutor());
+                               ZooKeeper.createClient(), 
"/testRecoverJobGraphs", localStateStorage);
 
                try {
                        SubmittedJobGraphListener listener = 
mock(SubmittedJobGraphListener.class);
@@ -198,10 +196,10 @@ public void testConcurrentAddJobGraph() throws Exception {
 
                try {
                        jobGraphs = new ZooKeeperSubmittedJobGraphStore(
-                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
+                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", localStateStorage);
 
                        otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(
-                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", localStateStorage, Executors.directExecutor());
+                                       ZooKeeper.createClient(), 
"/testConcurrentAddJobGraph", localStateStorage);
 
 
                        SubmittedJobGraph jobGraph = 
createSubmittedJobGraph(new JobID(), 0);
@@ -257,10 +255,10 @@ public Void answer(InvocationOnMock invocation) throws 
Throwable {
        @Test(expected = IllegalStateException.class)
        public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
                ZooKeeperSubmittedJobGraphStore jobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
-                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, 
Executors.directExecutor());
+                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
 
                ZooKeeperSubmittedJobGraphStore otherJobGraphs = new 
ZooKeeperSubmittedJobGraphStore(
-                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage, 
Executors.directExecutor());
+                               ZooKeeper.createClient(), 
"/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
 
                jobGraphs.start(null);
                otherJobGraphs.start(null);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
index ba0dc80fbb5..3b9c5786ca4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InMemorySubmittedJobGraphStore.java
@@ -96,6 +96,11 @@ public synchronized void removeJobGraph(JobID jobId) throws 
Exception {
                storedJobs.remove(jobId);
        }
 
+       @Override
+       public void releaseJobGraph(JobID jobId) {
+               verifyIsStarted();
+       }
+
        @Override
        public synchronized Collection<JobID> getJobIds() throws Exception {
                verifyIsStarted();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
new file mode 100644
index 00000000000..c4c56949cd9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperResource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * {@link ExternalResource} which starts a {@link 
org.apache.zookeeper.server.ZooKeeperServer}.
+ */
+public class ZooKeeperResource extends ExternalResource {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperResource.class);
+
+       @Nullable
+       private TestingServer zooKeeperServer;
+
+       public String getConnectString() {
+               verifyIsRunning();
+               return zooKeeperServer.getConnectString();
+       }
+
+       private void verifyIsRunning() {
+               Preconditions.checkState(zooKeeperServer != null);
+       }
+
+       @Override
+       protected void before() throws Throwable {
+               terminateZooKeeperServer();
+               zooKeeperServer = new TestingServer(true);
+       }
+
+       private void terminateZooKeeperServer() throws IOException {
+               if (zooKeeperServer != null) {
+                       zooKeeperServer.stop();
+                       zooKeeperServer = null;
+               }
+       }
+
+       @Override
+       protected void after() {
+               try {
+                       terminateZooKeeperServer();
+               } catch (IOException e) {
+                       LOG.warn("Could not properly terminate the {}.", 
getClass().getSimpleName(), e);
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
index fd39b25991c..2dd27e7c897 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -18,21 +18,19 @@
 
 package org.apache.flink.runtime.zookeeper;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -41,7 +39,6 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -49,12 +46,7 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -88,8 +80,8 @@ public void cleanUp() throws Exception {
        @Test
        public void testAddAndLock() throws Exception {
                LongStateStorage longStateStorage = new LongStateStorage();
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<Long>(
-                               ZOOKEEPER.getClient(), longStateStorage, 
Executors.directExecutor());
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                       ZOOKEEPER.getClient(), longStateStorage);
 
                // Config
                final String pathInZooKeeper = "/testAdd";
@@ -136,7 +128,7 @@ public void testAddAlreadyExistingPath() throws Exception {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateHandleProvider);
 
                
ZOOKEEPER.getClient().create().forPath("/testAddAlreadyExistingPath");
 
@@ -161,7 +153,7 @@ public void testAddDiscardStateHandleAfterFailure() throws 
Exception {
                when(client.inTransaction().create()).thenThrow(new 
RuntimeException("Expected test Exception."));
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               client, stateHandleProvider, 
Executors.directExecutor());
+                               client, stateHandleProvider);
 
                // Config
                final String pathInZooKeeper = 
"/testAddDiscardStateHandleAfterFailure";
@@ -191,7 +183,7 @@ public void testReplace() throws Exception {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateHandleProvider);
 
                // Config
                final String pathInZooKeeper = "/testReplace";
@@ -230,7 +222,7 @@ public void testReplaceNonExistingPath() throws Exception {
                RetrievableStateStorageHelper<Long> stateStorage = new 
LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateStorage, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateStorage);
 
                store.replace("/testReplaceNonExistingPath", 0, 1L);
        }
@@ -247,7 +239,7 @@ public void testReplaceDiscardStateHandleAfterFailure() 
throws Exception {
                when(client.setData()).thenThrow(new RuntimeException("Expected 
test Exception."));
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               client, stateHandleProvider, 
Executors.directExecutor());
+                               client, stateHandleProvider);
 
                // Config
                final String pathInZooKeeper = 
"/testReplaceDiscardStateHandleAfterFailure";
@@ -289,7 +281,7 @@ public void testGetAndExists() throws Exception {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateHandleProvider);
 
                // Config
                final String pathInZooKeeper = "/testGetAndExists";
@@ -314,7 +306,7 @@ public void testGetNonExistingPath() throws Exception {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateHandleProvider);
 
                store.getAndLock("/testGetNonExistingPath");
        }
@@ -328,7 +320,7 @@ public void testGetAll() throws Exception {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateHandleProvider);
 
                // Config
                final String pathInZooKeeper = "/testGetAll";
@@ -359,7 +351,7 @@ public void testGetAllSortedByName() throws Exception {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateHandleProvider);
 
                // Config
                final String basePath = "/testGetAllSortedByName";
@@ -393,7 +385,7 @@ public void testRemove() throws Exception {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateHandleProvider);
 
                // Config
                final String pathInZooKeeper = "/testRemove";
@@ -401,50 +393,14 @@ public void testRemove() throws Exception {
 
                store.addAndLock(pathInZooKeeper, state);
 
+               final int numberOfGlobalDiscardCalls = 
LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls();
+
                // Test
                store.releaseAndTryRemove(pathInZooKeeper);
 
                // Verify discarded
                assertEquals(0, 
ZOOKEEPER.getClient().getChildren().forPath("/").size());
-       }
-
-       /**
-        * Tests that state handles are correctly removed with a callback.
-        */
-       @Test
-       public void testRemoveWithCallback() throws Exception {
-               // Setup
-               LongStateStorage stateHandleProvider = new LongStateStorage();
-
-               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
-
-               // Config
-               final String pathInZooKeeper = "/testRemoveWithCallback";
-               final Long state = 27255442L;
-
-               store.addAndLock(pathInZooKeeper, state);
-
-               final CountDownLatch sync = new CountDownLatch(1);
-               ZooKeeperStateHandleStore.RemoveCallback<Long> callback = 
mock(ZooKeeperStateHandleStore.RemoveCallback.class);
-               doAnswer(new Answer<Void>() {
-                       @Override
-                       public Void answer(InvocationOnMock invocation) throws 
Throwable {
-                               sync.countDown();
-                               return null;
-                       }
-               }).when(callback).apply(any(RetrievableStateHandle.class));
-
-               // Test
-               store.releaseAndTryRemove(pathInZooKeeper, callback);
-
-               // Verify discarded and callback called
-               assertEquals(0, 
ZOOKEEPER.getClient().getChildren().forPath("/").size());
-
-               sync.await();
-
-               verify(callback, times(1))
-                               .apply(any(RetrievableStateHandle.class));
+               assertEquals(numberOfGlobalDiscardCalls + 1, 
LongRetrievableStateHandle.getNumberOfGlobalDiscardCalls());
        }
 
        /** Tests that all state handles are correctly discarded. */
@@ -454,7 +410,7 @@ public void testReleaseAndTryRemoveAll() throws Exception {
                LongStateStorage stateHandleProvider = new LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
-                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+                               ZOOKEEPER.getClient(), stateHandleProvider);
 
                // Config
                final String pathInZooKeeper = "/testDiscardAll";
@@ -486,8 +442,7 @@ public void testCorruptedData() throws Exception {
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
                        ZOOKEEPER.getClient(),
-                       stateStorage,
-                       Executors.directExecutor());
+                       stateStorage);
 
                final Collection<Long> input = new HashSet<>();
                input.add(1L);
@@ -543,13 +498,11 @@ public void testConcurrentDeleteOperation() throws 
Exception {
 
                ZooKeeperStateHandleStore<Long> zkStore1 = new 
ZooKeeperStateHandleStore<>(
                        ZOOKEEPER.getClient(),
-                       longStateStorage,
-                       Executors.directExecutor());
+                       longStateStorage);
 
                ZooKeeperStateHandleStore<Long> zkStore2 = new 
ZooKeeperStateHandleStore<>(
                        ZOOKEEPER.getClient(),
-                       longStateStorage,
-                       Executors.directExecutor());
+                       longStateStorage);
 
                final String statePath = "/state";
 
@@ -586,13 +539,11 @@ public void testLockCleanupWhenGetAndLockFails() throws 
Exception {
 
                ZooKeeperStateHandleStore<Long> zkStore1 = new 
ZooKeeperStateHandleStore<>(
                        ZOOKEEPER.getClient(),
-                       longStateStorage,
-                       Executors.directExecutor());
+                       longStateStorage);
 
                ZooKeeperStateHandleStore<Long> zkStore2 = new 
ZooKeeperStateHandleStore<>(
                        ZOOKEEPER.getClient(),
-                       longStateStorage,
-                       Executors.directExecutor());
+                       longStateStorage);
 
                final String path = "/state";
 
@@ -649,8 +600,7 @@ public void testLockCleanupWhenClientTimesOut() throws 
Exception {
 
                        ZooKeeperStateHandleStore<Long> zkStore = new 
ZooKeeperStateHandleStore<>(
                                client,
-                               longStateStorage,
-                               Executors.directExecutor());
+                               longStateStorage);
 
                        final String path = "/state";
 
@@ -682,8 +632,7 @@ public void testRelease() throws Exception {
 
                ZooKeeperStateHandleStore<Long> zkStore = new 
ZooKeeperStateHandleStore<>(
                        ZOOKEEPER.getClient(),
-                       longStateStorage,
-                       Executors.directExecutor());
+                       longStateStorage);
 
                final String path = "/state";
 
@@ -720,8 +669,7 @@ public void testReleaseAll() throws Exception {
 
                ZooKeeperStateHandleStore<Long> zkStore = new 
ZooKeeperStateHandleStore<>(
                        ZOOKEEPER.getClient(),
-                       longStateStorage,
-                       Executors.directExecutor());
+                       longStateStorage);
 
                final Collection<String> paths = Arrays.asList("/state1", 
"/state2", "/state3");
 
@@ -775,9 +723,11 @@ public void testReleaseAll() throws Exception {
 
                private static final long serialVersionUID = 
-3555329254423838912L;
 
+               private static int numberOfGlobalDiscardCalls = 0;
+
                private final Long state;
 
-               private int numberOfDiscardCalls;
+               private int numberOfDiscardCalls = 0;
 
                public LongRetrievableStateHandle(Long state) {
                        this.state = state;
@@ -790,6 +740,7 @@ public Long retrieveState() {
 
                @Override
                public void discardState() throws Exception {
+                       numberOfGlobalDiscardCalls++;
                        numberOfDiscardCalls++;
                }
 
@@ -798,8 +749,12 @@ public long getStateSize() {
                        return 0;
                }
 
-               public int getNumberOfDiscardCalls() {
+               int getNumberOfDiscardCalls() {
                        return numberOfDiscardCalls;
                }
+
+               public static int getNumberOfGlobalDiscardCalls() {
+                       return numberOfGlobalDiscardCalls;
+               }
        }
 }
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 0640f39f4cf..ebe46399395 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -454,6 +454,14 @@ trait TestingJobManagerLike extends FlinkActor {
         val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
         receiver ! Acknowledge.get()
       }
+
+    case WaitForBackgroundTasksToFinish =>
+      val future = futuresToComplete match {
+        case Some(futures) => Future.sequence(futures)
+        case None => Future.successful(Seq())
+      }
+
+      future.pipeTo(sender())
   }
 
   def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index c8529a9e07a..64af056f24d 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -59,6 +59,8 @@ object TestingJobManagerMessages {
 
   case object NotifyListeners
 
+  case object WaitForBackgroundTasksToFinish
+
   case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
   case class TaskManagerTerminated(taskManager: ActorRef)
 
@@ -164,4 +166,5 @@ object TestingJobManagerMessages {
   def getClientConnected(): AnyRef = ClientConnected
   def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
 
+  def getWaitForBackgroundTasksToFinish(): AnyRef = 
WaitForBackgroundTasksToFinish
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Old job resurrected during HA failover
> --------------------------------------
>
>                 Key: FLINK-10011
>                 URL: https://issues.apache.org/jira/browse/FLINK-10011
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.4.2, 1.5.2, 1.6.0
>            Reporter: Elias Levy
>            Assignee: Till Rohrmann
>            Priority: Blocker
>              Labels: pull-request-available
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x30000003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x30000003f4a0003, negotiated timeout = 40000}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
> leadership with leader session ID 
> Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter 
> Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 15:19:57 JM 2 reports it has lost leadership:
>  ** {{Associated JobManager 
> Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
>  ** {{Received leader address but not running in leader ActorSystem. 
> Cancelling registration.}}
>  * 15:19:57 TMs register with JM 1
>  * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs:
>  ** {{Attempting to recover all jobs.}}
>  ** {{There are 2 jobs to recover. Starting the job recovery.}}
>  ** {{Attempting to recover job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}}
>  ** {{Attempting to recover job 
> {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}}
>  * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form:
>  ** {{Got user-level KeeperException when processing 
> sessionid:0x2000001d2330001 type:create cxid:0x4211 zxid:0x60009dc70 
> txntype:-1 reqpath:n/a Error 
> Path:/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 
> Error:KeeperErrorCode = NodeExists for 
> /flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1}}
>  ** {{Got user-level KeeperException when processing 
> sessionid:0x2000001d2330001 type:create cxid:0x4230 zxid:0x60009dc78 
> txntype:-1 reqpath:n/a Error 
> Path:/flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2
>  Error:KeeperErrorCode = NodeExists for 
> /flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2}}
>  * 15:29:08 JM 1 starts to recover job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}
>  ** {{Recovered SubmittedJobGraph(2a4eff355aef849c5ca37dbac04f2ff1, 
> JobInfo(clients: 
> Set((Actor[akka.tcp://flink@ip-10-210-42-62.ec2.internal:37564/temp/$c],DETACHED)),
>  start: 1528833686265)).}}
>  ** {{Submitting recovered job 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Submitting job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job) (Recovery).}}
>  ** {{Using restart strategy 
> FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, 
> delayBetweenRestartAttempts=30000) for 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Successfully ran initialization on master in 0 ms.}}
>  ** {{Job recovers via failover strategy: full graph restart}}
>  ** {{Running initialization on master for job Some Job 
> (2a4eff355aef849c5ca37dbac04f2ff1).}}
>  ** {{Initialized in '/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1'.}}
>  ** {{Using application-defined state backend for checkpoint/savepoint 
> metadata: File State Backend @ s3://bucket/flink/cluster_1/checkpoints.}}
>  ** {{Persisting periodic checkpoints externally at 
> s3://bucket/flink/cluster_1/checkpoints-external.}}
>  ** {{Recovering checkpoints from ZooKeeper.}}
>  ** {{Found 3 checkpoints in ZooKeeper.}}
>  ** {{Trying to retrieve checkpoint 69255.}}
>  ** {{Trying to fetch 3 checkpoints from storage.}}
>  ** {{Trying to retrieve checkpoint 69256.}}
>  ** {{Trying to retrieve checkpoint 69257.}}
>  ** {{Restoring from latest valid checkpoint: Checkpoint 69257 @ 
> 1532989148882 for 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Scheduling job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job).}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state 
> CREATED to RUNNING.}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state 
> RUNNING to FAILING.}}{{ 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Not enough free slots available to run the job. You can decrease the operator 
> parallelism or increase the number of slots per TaskManager in the 
> configuration.}}
>  * 15:20:09 JM 1 starts to recover 
> {color:#d04437}61bca496065cd05e4263070a5e923a05{color}
>  ** {{Recovered SubmittedJobGraph(61bca496065cd05e4263070a5e923a05, 
> JobInfo(clients: 
> Set((Actor[akka.tcp://flink@ip-10-210-22-167.ec2.internal:41001/temp/$c],DETACHED)),
>  start: 1525728377900)).}}
>  ** {{Submitting recovered job 61bca496065cd05e4263070a5e923a05.}}
>  ** {{Submitting job 61bca496065cd05e4263070a5e923a05 (Some Job) (Recovery).}}
>  ** {{Using restart strategy 
> FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, 
> delayBetweenRestartAttempts=30000) for 61bca496065cd05e4263070a5e923a05.}}
>  ** {{Job recovers via failover strategy: full graph restart}}
>  ** {{Successfully ran initialization on master in 0 ms.}}
>  ** {{Running initialization on master for job Some Job 
> (61bca496065cd05e4263070a5e923a05).}}
>  ** {{Initialized in '/checkpoints/61bca496065cd05e4263070a5e923a05'.}}
>  ** {{Using application-defined state backend for checkpoint/savepoint 
> metadata: File State Backend @ s3://bucket/flink/cluster_1/checkpoints.}}
>  ** {{Persisting periodic checkpoints externally at 
> s3://bucket/flink/cluster_1/checkpoints-external.}}
>  ** {{Recovering checkpoints from ZooKeeper.}}
>  ** {{Scheduling job 61bca496065cd05e4263070a5e923a05 (Some Job).}}
>  ** {{Job Some Job (61bca496065cd05e4263070a5e923a05) switched from state 
> CREATED to RUNNING.}}
>  ** {{Trying to fetch 0 checkpoints from storage}}
>  ** {{Found 0 checkpoints in ZooKeeper.}}
>  * 15:20:09 JM 1 reports a bunch of metric collisions because of the two jobs:
>  ** {{Name collision: Group already contains a Metric with the name 
> 'lastCheckpointSize'. Metric will not be reported.[jobmanager, job]}}
>  ** {{Name collision: Group already contains a Metric with the name 
> 'lastCheckpointAlignmentBuffered'. Metric will not be reported.[jobmanager, 
> job]}}
>  ** etc
>  * 15:20:39 JM 1 begins attempting to restart the 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} job repeatedly
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state 
> FAILING to RESTARTING.}}
>  ** {{Restarting the job Some Job (2a4eff355aef849c5ca37dbac04f2ff1).}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state 
> RESTARTING to CREATED.}}
>  ** {{Recovering checkpoints from ZooKeeper.}}
>  ** {{Found 3 checkpoints in ZooKeeper.}}
>  ** {{Trying to fetch 3 checkpoints from storage.}}
>  ** {{Trying to retrieve checkpoint 69255.}}
>  ** {{Trying to retrieve checkpoint 69256.}}
>  ** {{Trying to retrieve checkpoint 69257.}}
>  ** {{Restoring from latest valid checkpoint: Checkpoint 69257 @ 
> 1532989148882 for 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state 
> CREATED to RUNNING.}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state 
> RUNNING to FAILING.}}
>  * 15:35:39 ZK 1 reestablishes connection with ZK 2 and 3 and becomes a 
> follower
>  
> h4. Analysis
>  
> The cluster was running job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.  The JM HA leader was 
> JM 2, which was connected to ZK 1.  ZK 1 was a follower in the ZK ensemble.  
> The ZK leader was ZK 2.  
> ZK 1 lost network connectivity for about 16 minutes.  Upon loss of 
> connectivity to ZK 1, JM 2 gives up leadership and shutdown the  
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} job.  JM 2 then 
> immediately connects to ZK 2, without its ZK session having expired.  
> Nonetheless, as it gave up leadership JM 1 is elected the new JM leader.
> JM 1 then erroneously decides there are two jobs to restore.  The previously 
> running job,  {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}, and 
> {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.  JM 1 decides there 
> are no checkpoints for 
> {color:#d04437}61bca496065cd05e4263070a5e923a05{color}, which starts right 
> away.   JM 1 attempts to restore 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} from the latest 
> checkpoint, but it fails because there aren't enough task slots in the 
> cluster as a result of the other job starting.  Thus,  
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} entered a restart loop.
> We manually stopped both jobs and starts a new job based on the last known 
> checkpoint for  {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.
>  
> Job {color:#d04437}61bca496065cd05e4263070a5e923a05{color}  is an old job 
> that we ran for a month between May 7th and June 5th.
> After our manual intervention, the the HA state directory in S3 looks like 
> this:
> {{s3cmd ls s3://bucket/flink/cluster_1/recovery/}}
> {{ DIR s3://bucket/flink/cluster_1/recovery/some_job/}}
> {{2018-07-31 17:33 35553 
> s3://bucket/flink/cluster_1/recovery/completedCheckpoint12e06bef01c5}}
> {{2018-07-31 17:34 35553 
> s3://bucket/flink/cluster_1/recovery/completedCheckpoint187e0d2ae7cb}}
> {{2018-07-31 17:32 35553 
> s3://bucket/flink/cluster_1/recovery/completedCheckpoint22fc8ca46f02}}
> {{2018-06-12 20:01 284626 
> s3://bucket/flink/cluster_1/recovery/submittedJobGraph7f627a661cec}}
> {{2018-07-30 23:01 285257 
> s3://bucket/flink/cluster_1/recovery/submittedJobGraphf3767780c00c}}
> submittedJobGraph7f627a661cec appears to be job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}, the long running job 
> that failed during the ZK failover
> submittedJobGraphf3767780c00c appears to be job 
> {color:#205081}d77948df92813a68ea6dfd6783f40e7e{color}, the job we started 
> restoring from a checkpoint after shutting down the duplicate jobs
>  
> A few questions come to mind.
> h5. Why does the JM terminate running jobs when it can immediately connect to 
> another ZK node and its ZK session has not expired?
> This seems to be a result of using the LeaderLatch recipe in Curator.  It's 
> [docs|https://github.com/Netflix/curator/wiki/Leader-Latch] state: 
> {quote}LeaderLatch instances add a ConnectionStateListener to watch for 
> connection problems. If SUSPENDED or LOST is reported, the LeaderLatch that 
> is the *leader will report that it is no longer the leader* (i.e. there will 
> not be a leader until the connection is re-established). If a LOST connection 
> is RECONNECTED, the LeaderLatch *will delete its previous ZNode and create a 
> new one*.
> Users of LeaderLatch must take account that connection issues can cause 
> leadership to be lost. i.e. hasLeadership() returns true but some time later 
> the connection is SUSPENDED or LOST. At that point hasLeadership() will 
> return false. It is highly recommended that LeaderLatch users register a 
> ConnectionStateListener.
> {quote}
> So not only is leadership lost while disconnected, but will likely stay lost 
> when reconnecting as a result of the znode deletion and recreation.
> This can probably be solved by using the Curator LeaderElection recipe 
> instead, which 
> [states|https://github.com/Netflix/curator/wiki/Leader-Election]:
> {quote}The {{LeaderSelectorListener}} class extends 
> {{ConnectionStateListener}}. When the LeaderSelector is started, it adds the 
> listener to the Curator instance. Users of the {{LeaderSelector}} must pay 
> attention to any connection state changes. If an instance becomes the leader, 
> it should respond to notification of being SUSPENDED or LOST.
> If the SUSPENDED state is reported, the instance must assume that it might no 
> longer be the leader until it receives a RECONNECTED state. If the LOST state 
> is reported, the instance is no longer the leader and its {{takeLeadership}} 
> method should exit.
> {quote}
> So with LeaderElection it seems that what to do during the SUSPENDED state is 
> left up to the application, which may choose to continue being leader until 
> the state becomes LOST.
> Obviously there are dangers with doing so, but at the very least you would 
> expect the JM not to give up leadership until it tried to connect to other ZK 
> members within the remaining ZK session timeout.
> This problem has been [previously 
> discussed|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Zookeeper-failure-handling-td19611.html]
>  in the mailing list, which led to FLINK-6174 and this 
> [PR|https://github.com/apache/flink/pull/3599], which appears to be a 
> modification of the Curator LeaderLatch recipe.  It also lead to FLINK-5703, 
> which seems like an attempt to keep jobs running during JM failover.  While 
> that is a valuable addition, I argue that is not required to avoid this 
> failure, as the previous leader can continue being leader so long as it 
> connects to a new ZK before its ZK session expires.
>  
> h5. Why did JM 1 resurrect the old job?
> Something must have been off with the state stored in the S3 HA recovery 
> directory.  The job must have not been fully removed.  
> In fact, right now the recovery directory has the file 
> submittedJobGraph7f627a661cec which appears to be job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}. Is that expected?  
> That job is no longer running.  Shouldn't that file no longer exist in the 
> recovery directory?
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to