[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641014#comment-16641014
 ] 

ASF GitHub Bot commented on FLINK-10406:
----------------------------------------

asfgit closed pull request #6765: [FLINK-10406] [tests] Port JobManagerTest to 
new code base
URL: https://github.com/apache/flink/pull/6765
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
deleted file mode 100644
index 098f564f6bf..00000000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ /dev/null
@@ -1,1577 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.queryablestate.KvStateID;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
-import 
org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
-import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
-import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import 
org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import 
org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
-import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
-import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
-import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
-import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.messages.RegistrationMessages;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
-import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateRegistered;
-import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateUnregistered;
-import org.apache.flink.runtime.query.UnknownKvStateLocation;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManager;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
-import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus;
-import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
-import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
-import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
-import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
-import org.apache.flink.runtime.testingUtils.TestingTaskManager;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
-import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.runtime.testutils.StoppableInvokable;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Status;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestProbe;
-import com.typesafe.config.Config;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
-import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.JobResultFailure;
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.JobResultSuccess;
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.JobSubmitSuccess;
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning;
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.JobStatusIs;
-import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered;
-import static 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
-import static 
org.apache.flink.runtime.testingUtils.TestingUtils.TESTING_TIMEOUT;
-import static 
org.apache.flink.runtime.testingUtils.TestingUtils.startTestingCluster;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-public class JobManagerTest extends TestLogger {
-
-       @Rule
-       public final TemporaryFolder tmpFolder = new TemporaryFolder();
-
-       private static ActorSystem system;
-
-       private HighAvailabilityServices highAvailabilityServices;
-
-       @BeforeClass
-       public static void setup() {
-               system = AkkaUtils.createLocalActorSystem(new Configuration());
-       }
-
-       @AfterClass
-       public static void teardown() {
-               JavaTestKit.shutdownActorSystem(system);
-       }
-
-       @Before
-       public void setupTest() {
-               highAvailabilityServices = new 
EmbeddedHaServices(TestingUtils.defaultExecutor());
-       }
-
-       @After
-       public void tearDownTest() throws Exception {
-               highAvailabilityServices.closeAndCleanupAllData();
-               highAvailabilityServices = null;
-       }
-
-       @Test
-       public void testNullHostnameGoesToLocalhost() {
-               try {
-                       Tuple2<String, Object> address = new Tuple2<String, 
Object>(null, 1772);
-                       Config cfg = AkkaUtils.getAkkaConfig(new 
Configuration(),
-                                       new Some<Tuple2<String, 
Object>>(address));
-
-                       String hostname = 
cfg.getString("akka.remote.netty.tcp.hostname");
-                       
assertTrue(InetAddress.getByName(hostname).isLoopbackAddress());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       /**
-        * Tests responses to partition state requests.
-        */
-       @Test
-       public void testRequestPartitionState() throws Exception {
-               new JavaTestKit(system) {{
-                       new Within(duration("15 seconds")) {
-                               @Override
-                               protected void run() {
-                                       // Setup
-                                       TestingCluster cluster = null;
-
-                                       try {
-                                               cluster = 
startTestingCluster(2, 1, DEFAULT_AKKA_ASK_TIMEOUT());
-
-                                               final IntermediateDataSetID rid 
= new IntermediateDataSetID();
-
-                                               // Create a task
-                                               final JobVertex sender = new 
JobVertex("Sender");
-                                               sender.setParallelism(1);
-                                               
sender.setInvokableClass(BlockingNoOpInvokable.class); // just block
-                                               
sender.createAndAddResultDataSet(rid, PIPELINED);
-
-                                               final JobGraph jobGraph = new 
JobGraph("Blocking test job", sender);
-                                               final JobID jid = 
jobGraph.getJobID();
-
-                                               final ActorGateway 
jobManagerGateway = cluster.getLeaderGateway(
-                                                       
TestingUtils.TESTING_DURATION());
-
-                                               // we can set the leader 
session ID to None because we don't use this gateway to send messages
-                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
-
-                                               // Submit the job and wait for 
all vertices to be running
-                                               jobManagerGateway.tell(
-                                                       new SubmitJob(
-                                                               jobGraph,
-                                                               
ListeningBehaviour.EXECUTION_RESULT),
-                                                       testActorGateway);
-                                               
expectMsgClass(JobSubmitSuccess.class);
-
-                                               jobManagerGateway.tell(
-                                                       new 
WaitForAllVerticesToBeRunningOrFinished(jid),
-                                                       testActorGateway);
-
-                                               
expectMsgClass(AllVerticesRunning.class);
-
-                                               // This is the mock execution 
ID of the task requesting the state of the partition
-                                               final ExecutionAttemptID 
receiver = new ExecutionAttemptID();
-
-                                               // Request the execution graph 
to get the runtime info
-                                               jobManagerGateway.tell(new 
RequestExecutionGraph(jid), testActorGateway);
-
-                                               final ExecutionGraph eg = 
(ExecutionGraph) expectMsgClass(ExecutionGraphFound.class)
-                                                       .executionGraph();
-
-                                               final ExecutionVertex vertex = 
eg.getJobVertex(sender.getID())
-                                                       .getTaskVertices()[0];
-
-                                               final 
IntermediateResultPartition partition = vertex.getProducedPartitions()
-                                                       
.values().iterator().next();
-
-                                               final ResultPartitionID 
partitionId = new ResultPartitionID(
-                                                       
partition.getPartitionId(),
-                                                       
vertex.getCurrentExecutionAttempt().getAttemptId());
-
-                                               // - The test 
----------------------------------------------------------------------
-
-                                               // 1. All execution states
-                                               RequestPartitionProducerState 
request = new RequestPartitionProducerState(
-                                                       jid, rid, partitionId);
-
-                                               for (ExecutionState state : 
ExecutionState.values()) {
-                                                       
ExecutionGraphTestUtils.setVertexState(vertex, state);
-
-                                                       Future<ExecutionState> 
futurePartitionState = jobManagerGateway
-                                                               .ask(request, 
getRemainingTime())
-                                                               
.mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class));
-
-                                                       ExecutionState resp = 
Await.result(futurePartitionState, getRemainingTime());
-                                                       assertEquals(state, 
resp);
-                                               }
-
-                                               // 2. Non-existing execution
-                                               request = new 
RequestPartitionProducerState(jid, rid, new ResultPartitionID());
-
-                                               Future<?> futurePartitionState 
= jobManagerGateway.ask(request, getRemainingTime());
-                                               try {
-                                                       
Await.result(futurePartitionState, getRemainingTime());
-                                                       fail("Did not fail with 
expected RuntimeException");
-                                               } catch (RuntimeException e) {
-                                                       
assertEquals(IllegalArgumentException.class, e.getCause().getClass());
-                                               }
-
-                                               // 3. Non-existing job
-                                               request = new 
RequestPartitionProducerState(new JobID(), rid, new ResultPartitionID());
-                                               futurePartitionState = 
jobManagerGateway.ask(request, getRemainingTime());
-
-                                               try {
-                                                       
Await.result(futurePartitionState, getRemainingTime());
-                                                       fail("Did not fail with 
expected IllegalArgumentException");
-                                               } catch 
(IllegalArgumentException ignored) {
-                                               }
-                                       } catch (Exception e) {
-                                               e.printStackTrace();
-                                               fail(e.getMessage());
-                                       } finally {
-                                               if (cluster != null) {
-                                                       cluster.stop();
-                                               }
-                                       }
-                               }
-                       };
-               }};
-       }
-
-       /**
-        * Tests the JobManager response when the execution is not registered 
with
-        * the ExecutionGraph.
-        */
-       @Test
-       public void testRequestPartitionStateUnregisteredExecution() throws 
Exception {
-               new JavaTestKit(system) {{
-                       new Within(duration("15 seconds")) {
-                               @Override
-                               protected void run() {
-                                       // Setup
-                                       TestingCluster cluster = null;
-
-                                       try {
-                                               cluster = 
startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT());
-
-                                               final IntermediateDataSetID rid 
= new IntermediateDataSetID();
-
-                                               // Create a task
-                                               final JobVertex sender = new 
JobVertex("Sender");
-                                               sender.setParallelism(1);
-                                               
sender.setInvokableClass(NoOpInvokable.class); // just finish
-                                               
sender.createAndAddResultDataSet(rid, PIPELINED);
-
-                                               final JobVertex sender2 = new 
JobVertex("Blocking Sender");
-                                               sender2.setParallelism(1);
-                                               
sender2.setInvokableClass(BlockingNoOpInvokable.class); // just block
-                                               
sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
-
-                                               final JobGraph jobGraph = new 
JobGraph("Fast finishing producer test job", sender, sender2);
-                                               final JobID jid = 
jobGraph.getJobID();
-
-                                               final ActorGateway 
jobManagerGateway = cluster.getLeaderGateway(
-                                                       
TestingUtils.TESTING_DURATION());
-
-                                               // we can set the leader 
session ID to None because we don't use this gateway to send messages
-                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
-
-                                               // Submit the job and wait for 
all vertices to be running
-                                               jobManagerGateway.tell(
-                                                       new SubmitJob(
-                                                               jobGraph,
-                                                               
ListeningBehaviour.EXECUTION_RESULT),
-                                                       testActorGateway);
-                                               
expectMsgClass(JobSubmitSuccess.class);
-
-                                               jobManagerGateway.tell(
-                                                       new 
WaitForAllVerticesToBeRunningOrFinished(jid),
-                                                       testActorGateway);
-
-                                               
expectMsgClass(AllVerticesRunning.class);
-
-                                               Future<Object> egFuture = 
jobManagerGateway.ask(
-                                                       new 
RequestExecutionGraph(jobGraph.getJobID()), remaining());
-
-                                               ExecutionGraphFound egFound = 
(ExecutionGraphFound) Await.result(egFuture, remaining());
-                                               ExecutionGraph eg = 
(ExecutionGraph) egFound.executionGraph();
-
-                                               ExecutionVertex vertex = 
eg.getJobVertex(sender.getID()).getTaskVertices()[0];
-                                               while 
(vertex.getExecutionState() != ExecutionState.FINISHED) {
-                                                       Thread.sleep(1);
-                                               }
-
-                                               IntermediateResultPartition 
partition = vertex.getProducedPartitions()
-                                                       
.values().iterator().next();
-
-                                               ResultPartitionID partitionId = 
new ResultPartitionID(
-                                                       
partition.getPartitionId(),
-                                                       
vertex.getCurrentExecutionAttempt().getAttemptId());
-
-                                               // Producer finished, request 
state
-                                               Object request = new 
RequestPartitionProducerState(jid, rid, partitionId);
-
-                                               Future<ExecutionState> 
producerStateFuture = jobManagerGateway
-                                                       .ask(request, 
getRemainingTime())
-                                                       
.mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class));
-
-                                               
assertEquals(ExecutionState.FINISHED, Await.result(producerStateFuture, 
getRemainingTime()));
-                                       } catch (Exception e) {
-                                               e.printStackTrace();
-                                               fail(e.getMessage());
-                                       } finally {
-                                               if (cluster != null) {
-                                                       cluster.stop();
-                                               }
-                                       }
-                               }
-                       };
-               }};
-       }
-
-       /**
-        * Tests the JobManager response when the execution is not registered 
with
-        * the ExecutionGraph anymore and a new execution attempt is available.
-        */
-       @Test
-       public void testRequestPartitionStateMoreRecentExecutionAttempt() 
throws Exception {
-               new JavaTestKit(system) {{
-                       new Within(duration("15 seconds")) {
-                               @Override
-                               protected void run() {
-                                       // Setup
-                                       TestingCluster cluster = null;
-
-                                       try {
-                                               cluster = 
startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT());
-
-                                               final IntermediateDataSetID rid 
= new IntermediateDataSetID();
-
-                                               // Create a task
-                                               final JobVertex sender = new 
JobVertex("Sender");
-                                               sender.setParallelism(1);
-                                               
sender.setInvokableClass(NoOpInvokable.class); // just finish
-                                               
sender.createAndAddResultDataSet(rid, PIPELINED);
-
-                                               final JobVertex sender2 = new 
JobVertex("Blocking Sender");
-                                               sender2.setParallelism(1);
-                                               
sender2.setInvokableClass(BlockingNoOpInvokable.class); // just block
-                                               
sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
-
-                                               final JobGraph jobGraph = new 
JobGraph("Fast finishing producer test job", sender, sender2);
-                                               final JobID jid = 
jobGraph.getJobID();
-
-                                               final ActorGateway 
jobManagerGateway = cluster.getLeaderGateway(
-                                                       
TestingUtils.TESTING_DURATION());
-
-                                               // we can set the leader 
session ID to None because we don't use this gateway to send messages
-                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
-
-                                               // Submit the job and wait for 
all vertices to be running
-                                               jobManagerGateway.tell(
-                                                       new SubmitJob(
-                                                               jobGraph,
-                                                               
ListeningBehaviour.EXECUTION_RESULT),
-                                                       testActorGateway);
-                                               
expectMsgClass(JobSubmitSuccess.class);
-
-                                               jobManagerGateway.tell(
-                                                       new 
WaitForAllVerticesToBeRunningOrFinished(jid),
-                                                       testActorGateway);
-
-                                               
expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
-
-                                               Future<Object> egFuture = 
jobManagerGateway.ask(
-                                                       new 
RequestExecutionGraph(jobGraph.getJobID()), remaining());
-
-                                               ExecutionGraphFound egFound = 
(ExecutionGraphFound) Await.result(egFuture, remaining());
-                                               ExecutionGraph eg = 
(ExecutionGraph) egFound.executionGraph();
-
-                                               ExecutionVertex vertex = 
eg.getJobVertex(sender.getID()).getTaskVertices()[0];
-                                               while 
(vertex.getExecutionState() != ExecutionState.FINISHED) {
-                                                       Thread.sleep(1);
-                                               }
-
-                                               IntermediateResultPartition 
partition = vertex.getProducedPartitions()
-                                                       
.values().iterator().next();
-
-                                               ResultPartitionID partitionId = 
new ResultPartitionID(
-                                                       
partition.getPartitionId(),
-                                                       
vertex.getCurrentExecutionAttempt().getAttemptId());
-
-                                               // Reset execution => new 
execution attempt
-                                               
vertex.resetForNewExecution(System.currentTimeMillis(), 1L);
-
-                                               // Producer finished, request 
state
-                                               Object request = new 
RequestPartitionProducerState(jid, rid, partitionId);
-
-                                               Future<?> producerStateFuture = 
jobManagerGateway.ask(request, getRemainingTime());
-
-                                               try {
-                                                       
Await.result(producerStateFuture, getRemainingTime());
-                                                       fail("Did not fail with 
expected Exception");
-                                               } catch 
(PartitionProducerDisposedException ignored) {
-                                               }
-                                       } catch (Exception e) {
-                                               e.printStackTrace();
-                                               fail(e.getMessage());
-                                       } finally {
-                                               if (cluster != null) {
-                                                       cluster.stop();
-                                               }
-                                       }
-                               }
-                       };
-               }};
-       }
-
-       @Test
-       public void testStopSignal() throws Exception {
-               new JavaTestKit(system) {{
-                       new Within(duration("15 seconds")) {
-                               @Override
-                               protected void run() {
-                                       // Setup
-                                       TestingCluster cluster = null;
-
-                                       try {
-                                               cluster = 
startTestingCluster(2, 1, DEFAULT_AKKA_ASK_TIMEOUT());
-
-                                               // Create a task
-                                               final JobVertex sender = new 
JobVertex("Sender");
-                                               sender.setParallelism(2);
-                                               
sender.setInvokableClass(StoppableInvokable.class);
-
-                                               final JobGraph jobGraph = new 
JobGraph("Stoppable streaming test job", sender);
-                                               final JobID jid = 
jobGraph.getJobID();
-
-                                               final ActorGateway 
jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
-
-                                               // we can set the leader 
session ID to None because we don't use this gateway to send messages
-                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
-
-                                               // Submit the job and wait for 
all vertices to be running
-                                               jobManagerGateway.tell(
-                                                       new SubmitJob(
-                                                               jobGraph,
-                                                               
ListeningBehaviour.EXECUTION_RESULT),
-                                                       testActorGateway);
-                                               
expectMsgClass(JobSubmitSuccess.class);
-
-                                               jobManagerGateway.tell(new 
WaitForAllVerticesToBeRunning(jid), testActorGateway);
-                                               
expectMsgClass(AllVerticesRunning.class);
-
-                                               jobManagerGateway.tell(new 
StopJob(jid), testActorGateway);
-
-                                               // - The test 
----------------------------------------------------------------------
-                                               
expectMsgClass(StoppingSuccess.class);
-
-                                               
expectMsgClass(JobResultSuccess.class);
-                                       } finally {
-                                               if (cluster != null) {
-                                                       cluster.stop();
-                                               }
-                                       }
-                               }
-                       };
-               }};
-       }
-
-       @Test
-       public void testStopSignalFail() throws Exception {
-               new JavaTestKit(system) {{
-                       new Within(duration("15 seconds")) {
-                               @Override
-                               protected void run() {
-                                       // Setup
-                                       TestingCluster cluster = null;
-
-                                       try {
-                                               cluster = 
startTestingCluster(2, 1, DEFAULT_AKKA_ASK_TIMEOUT());
-
-                                               // Create a task
-                                               final JobVertex sender = new 
JobVertex("Sender");
-                                               sender.setParallelism(1);
-                                               
sender.setInvokableClass(BlockingNoOpInvokable.class); // just block
-
-                                               final JobGraph jobGraph = new 
JobGraph("Non-Stoppable batching test job", sender);
-                                               final JobID jid = 
jobGraph.getJobID();
-
-                                               final ActorGateway 
jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
-
-                                               // we can set the leader 
session ID to None because we don't use this gateway to send messages
-                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
-
-                                               // Submit the job and wait for 
all vertices to be running
-                                               jobManagerGateway.tell(
-                                                       new SubmitJob(
-                                                               jobGraph,
-                                                               
ListeningBehaviour.EXECUTION_RESULT),
-                                                       testActorGateway);
-                                               
expectMsgClass(JobSubmitSuccess.class);
-
-                                               jobManagerGateway.tell(new 
WaitForAllVerticesToBeRunning(jid), testActorGateway);
-                                               
expectMsgClass(AllVerticesRunning.class);
-
-                                               jobManagerGateway.tell(new 
StopJob(jid), testActorGateway);
-
-                                               // - The test 
----------------------------------------------------------------------
-                                               
expectMsgClass(StoppingFailure.class);
-
-                                               jobManagerGateway.tell(new 
RequestExecutionGraph(jid), testActorGateway);
-
-                                               
expectMsgClass(ExecutionGraphFound.class);
-                                       } finally {
-                                               if (cluster != null) {
-                                                       cluster.stop();
-                                               }
-                                       }
-                               }
-                       };
-               }};
-       }
-
-       /**
-        * Tests that the JobManager handles {@link 
org.apache.flink.runtime.query.KvStateMessage}
-        * instances as expected.
-        */
-       @Test
-       public void testKvStateMessages() throws Exception {
-               Deadline deadline = new FiniteDuration(100, 
TimeUnit.SECONDS).fromNow();
-
-               Configuration config = new Configuration();
-               config.setString(AkkaOptions.ASK_TIMEOUT, "100ms");
-
-               ActorRef jobManagerActor = JobManager.startJobManagerActors(
-                       config,
-                       system,
-                       TestingUtils.defaultExecutor(),
-                       TestingUtils.defaultExecutor(),
-                       highAvailabilityServices,
-                       NoOpMetricRegistry.INSTANCE,
-                       Option.empty(),
-                       TestingJobManager.class,
-                       MemoryArchivist.class)._1();
-
-               UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
-                       
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-                       TestingUtils.TESTING_TIMEOUT());
-
-               ActorGateway jobManager = new AkkaActorGateway(
-                               jobManagerActor,
-                               leaderId);
-
-               Configuration tmConfig = new Configuration();
-               tmConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
"4m");
-               tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
-
-               ActorRef taskManager = 
TaskManager.startTaskManagerComponentsAndActor(
-                       tmConfig,
-                       ResourceID.generate(),
-                       system,
-                       highAvailabilityServices,
-                       NoOpMetricRegistry.INSTANCE,
-                       "localhost",
-                       scala.Option.<String>empty(),
-                       true,
-                       TestingTaskManager.class);
-
-               Future<Object> registrationFuture = jobManager
-                               .ask(new 
NotifyWhenAtLeastNumTaskManagerAreRegistered(1), deadline.timeLeft());
-
-               Await.ready(registrationFuture, deadline.timeLeft());
-
-               //
-               // Location lookup
-               //
-               LookupKvStateLocation lookupNonExistingJob = new 
LookupKvStateLocation(
-                               new JobID(),
-                               "any-name");
-
-               Future<KvStateLocation> lookupFuture = jobManager
-                               .ask(lookupNonExistingJob, deadline.timeLeft())
-                               
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class));
-
-               try {
-                       Await.result(lookupFuture, deadline.timeLeft());
-                       fail("Did not throw expected Exception");
-               } catch (FlinkJobNotFoundException ignored) {
-                       // Expected
-               }
-
-               JobGraph jobGraph = new JobGraph("croissant");
-               JobVertex jobVertex1 = new JobVertex("cappuccino");
-               jobVertex1.setParallelism(4);
-               jobVertex1.setMaxParallelism(16);
-               jobVertex1.setInvokableClass(BlockingNoOpInvokable.class);
-
-               JobVertex jobVertex2 = new JobVertex("americano");
-               jobVertex2.setParallelism(4);
-               jobVertex2.setMaxParallelism(16);
-               jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
-
-               jobGraph.addVertex(jobVertex1);
-               jobGraph.addVertex(jobVertex2);
-
-               Future<JobSubmitSuccess> submitFuture = jobManager
-                               .ask(new SubmitJob(jobGraph, 
ListeningBehaviour.DETACHED), deadline.timeLeft())
-                               
.mapTo(ClassTag$.MODULE$.<JobSubmitSuccess>apply(JobSubmitSuccess.class));
-
-               Await.result(submitFuture, deadline.timeLeft());
-
-               Object lookupUnknownRegistrationName = new 
LookupKvStateLocation(
-                               jobGraph.getJobID(),
-                               "unknown");
-
-               lookupFuture = jobManager
-                               .ask(lookupUnknownRegistrationName, 
deadline.timeLeft())
-                               
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class));
-
-               try {
-                       Await.result(lookupFuture, deadline.timeLeft());
-                       fail("Did not throw expected Exception");
-               } catch (UnknownKvStateLocation ignored) {
-                       // Expected
-               }
-
-               //
-               // Registration
-               //
-               NotifyKvStateRegistered registerNonExistingJob = new 
NotifyKvStateRegistered(
-                               new JobID(),
-                               new JobVertexID(),
-                               new KeyGroupRange(0, 0),
-                               "any-name",
-                               new KvStateID(),
-                               new 
InetSocketAddress(InetAddress.getLocalHost(), 1233));
-
-               jobManager.tell(registerNonExistingJob);
-
-               LookupKvStateLocation lookupAfterRegistration = new 
LookupKvStateLocation(
-                               registerNonExistingJob.getJobId(),
-                               registerNonExistingJob.getRegistrationName());
-
-               lookupFuture = jobManager
-                               .ask(lookupAfterRegistration, 
deadline.timeLeft())
-                               
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class));
-
-               try {
-                       Await.result(lookupFuture, deadline.timeLeft());
-                       fail("Did not throw expected Exception");
-               } catch (FlinkJobNotFoundException ignored) {
-                       // Expected
-               }
-
-               NotifyKvStateRegistered registerForExistingJob = new 
NotifyKvStateRegistered(
-                               jobGraph.getJobID(),
-                               jobVertex1.getID(),
-                               new KeyGroupRange(0, 0),
-                               "register-me",
-                               new KvStateID(),
-                               new 
InetSocketAddress(InetAddress.getLocalHost(), 1293));
-
-               jobManager.tell(registerForExistingJob);
-
-               lookupAfterRegistration = new LookupKvStateLocation(
-                               registerForExistingJob.getJobId(),
-                               registerForExistingJob.getRegistrationName());
-
-               lookupFuture = jobManager
-                               .ask(lookupAfterRegistration, 
deadline.timeLeft())
-                               
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class));
-
-               KvStateLocation location = Await.result(lookupFuture, 
deadline.timeLeft());
-               assertNotNull(location);
-
-               assertEquals(jobGraph.getJobID(), location.getJobId());
-               assertEquals(jobVertex1.getID(), location.getJobVertexId());
-               assertEquals(jobVertex1.getMaxParallelism(), 
location.getNumKeyGroups());
-               assertEquals(1, location.getNumRegisteredKeyGroups());
-               KeyGroupRange keyGroupRange = 
registerForExistingJob.getKeyGroupRange();
-               assertEquals(1, keyGroupRange.getNumberOfKeyGroups());
-               assertEquals(registerForExistingJob.getKvStateId(), 
location.getKvStateID(keyGroupRange.getStartKeyGroup()));
-               assertEquals(registerForExistingJob.getKvStateServerAddress(), 
location.getKvStateServerAddress(keyGroupRange.getStartKeyGroup()));
-
-               //
-               // Unregistration
-               //
-               NotifyKvStateUnregistered unregister = new 
NotifyKvStateUnregistered(
-                               registerForExistingJob.getJobId(),
-                               registerForExistingJob.getJobVertexId(),
-                               registerForExistingJob.getKeyGroupRange(),
-                               registerForExistingJob.getRegistrationName());
-
-               jobManager.tell(unregister);
-
-               lookupFuture = jobManager
-                               .ask(lookupAfterRegistration, 
deadline.timeLeft())
-                               
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class));
-
-               try {
-                       Await.result(lookupFuture, deadline.timeLeft());
-                       fail("Did not throw expected Exception");
-               } catch (UnknownKvStateLocation ignored) {
-                       // Expected
-               }
-
-               //
-               // Duplicate registration fails task
-               //
-               NotifyKvStateRegistered register = new NotifyKvStateRegistered(
-                               jobGraph.getJobID(),
-                               jobVertex1.getID(),
-                               new KeyGroupRange(0, 0),
-                               "duplicate-me",
-                               new KvStateID(),
-                               new 
InetSocketAddress(InetAddress.getLocalHost(), 1293));
-
-               NotifyKvStateRegistered duplicate = new NotifyKvStateRegistered(
-                               jobGraph.getJobID(),
-                               jobVertex2.getID(), // <--- different operator, 
but...
-                               new KeyGroupRange(0, 0),
-                               "duplicate-me", // ...same name
-                               new KvStateID(),
-                               new 
InetSocketAddress(InetAddress.getLocalHost(), 1293));
-
-               Future<TestingJobManagerMessages.JobStatusIs> failedFuture = 
jobManager
-                               .ask(new 
NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.FAILED), deadline.timeLeft())
-                               
.mapTo(ClassTag$.MODULE$.<JobStatusIs>apply(JobStatusIs.class));
-
-               jobManager.tell(register);
-               jobManager.tell(duplicate);
-
-               // Wait for failure
-               JobStatusIs jobStatus = Await.result(failedFuture, 
deadline.timeLeft());
-               assertEquals(JobStatus.FAILED, jobStatus.state());
-
-       }
-
-       @Test
-       public void testCancelWithSavepoint() throws Exception {
-               File defaultSavepointDir = tmpFolder.newFolder();
-
-               FiniteDuration timeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
-               Configuration config = new Configuration();
-               config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
defaultSavepointDir.toURI().toString());
-
-               ActorSystem actorSystem = null;
-               ActorGateway jobManager = null;
-               ActorGateway archiver = null;
-               ActorGateway taskManager = null;
-               try {
-                       actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
-
-                       Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
-                               config,
-                               actorSystem,
-                               TestingUtils.defaultExecutor(),
-                               TestingUtils.defaultExecutor(),
-                               highAvailabilityServices,
-                               NoOpMetricRegistry.INSTANCE,
-                               Option.empty(),
-                               Option.apply("jm"),
-                               Option.apply("arch"),
-                               TestingJobManager.class,
-                               TestingMemoryArchivist.class);
-
-                       UUID leaderId = 
LeaderRetrievalUtils.retrieveLeaderSessionId(
-                               
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-                               TestingUtils.TESTING_TIMEOUT());
-
-                       jobManager = new AkkaActorGateway(master._1(), 
leaderId);
-                       archiver = new AkkaActorGateway(master._2(), leaderId);
-
-                       ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
-                               config,
-                               ResourceID.generate(),
-                               actorSystem,
-                               highAvailabilityServices,
-                               NoOpMetricRegistry.INSTANCE,
-                               "localhost",
-                               Option.apply("tm"),
-                               true,
-                               TestingTaskManager.class);
-
-                       taskManager = new AkkaActorGateway(taskManagerRef, 
leaderId);
-
-                       // Wait until connected
-                       Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
-                       Await.ready(taskManager.ask(msg, timeout), timeout);
-
-                       // Create job graph
-                       JobVertex sourceVertex = new JobVertex("Source");
-                       
sourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
-                       sourceVertex.setParallelism(1);
-
-                       JobGraph jobGraph = new JobGraph("TestingJob", 
sourceVertex);
-
-                       JobCheckpointingSettings snapshottingSettings = new 
JobCheckpointingSettings(
-                               Collections.singletonList(sourceVertex.getID()),
-                               Collections.singletonList(sourceVertex.getID()),
-                               Collections.singletonList(sourceVertex.getID()),
-                               new CheckpointCoordinatorConfiguration(
-                                       3600000,
-                                       3600000,
-                                       0,
-                                       Integer.MAX_VALUE,
-                                       
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-                                       true),
-                                       null);
-
-                       jobGraph.setSnapshotSettings(snapshottingSettings);
-
-                       // Submit job graph
-                       msg = new SubmitJob(jobGraph, 
ListeningBehaviour.DETACHED);
-                       Await.result(jobManager.ask(msg, timeout), timeout);
-
-                       // Wait for all tasks to be running
-                       msg = new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
-                       Await.result(jobManager.ask(msg, timeout), timeout);
-
-                       // Notify when cancelled
-                       msg = new NotifyWhenJobStatus(jobGraph.getJobID(), 
JobStatus.CANCELED);
-                       Future<Object> cancelled = jobManager.ask(msg, timeout);
-
-                       // Cancel with savepoint
-                       String savepointPath = null;
-
-                       for (int i = 0; i < 10; i++) {
-                               msg = new 
CancelJobWithSavepoint(jobGraph.getJobID(), null);
-                               CancellationResponse cancelResp = 
(CancellationResponse) Await.result(jobManager.ask(msg, timeout), timeout);
-
-                               if (cancelResp instanceof CancellationFailure) {
-                                       CancellationFailure failure = 
(CancellationFailure) cancelResp;
-                                       if 
(failure.cause().getMessage().contains(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING.message()))
 {
-                                               Thread.sleep(10); // wait and 
retry
-                                       } else {
-                                               
failure.cause().printStackTrace();
-                                               fail("Failed to cancel job: " + 
failure.cause().getMessage());
-                                       }
-                               } else {
-                                       savepointPath = ((CancellationSuccess) 
cancelResp).savepointPath();
-                                       break;
-                               }
-                       }
-
-                       // Verify savepoint path
-                       assertNotNull("Savepoint not triggered", savepointPath);
-
-                       // Wait for job status change
-                       Await.ready(cancelled, timeout);
-
-                       File savepointFile = new File(new 
Path(savepointPath).getPath());
-                       assertTrue(savepointFile.exists());
-               } finally {
-                       if (actorSystem != null) {
-                               actorSystem.terminate();
-                       }
-
-                       if (archiver != null) {
-                               archiver.actor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
-                       }
-
-                       if (jobManager != null) {
-                               
jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
-                       }
-
-                       if (taskManager != null) {
-                               
taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
-                       }
-
-                       if (actorSystem != null) {
-                               Await.result(actorSystem.whenTerminated(), 
TESTING_TIMEOUT());
-                       }
-               }
-       }
-
-       /**
-        * Tests that a failed savepoint does not cancel the job and new 
checkpoints are triggered
-        * after the failed cancel-with-savepoint.
-        */
-       @Test
-       public void testCancelJobWithSavepointFailurePeriodicCheckpoints() 
throws Exception {
-               File savepointTarget = tmpFolder.newFolder();
-
-               // A source that declines savepoints, simulating the behaviour 
of a
-               // failed savepoint.
-               JobVertex sourceVertex = new JobVertex("Source");
-               sourceVertex.setInvokableClass(FailOnSavepointSourceTask.class);
-               sourceVertex.setParallelism(1);
-               JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
-
-               CheckpointCoordinatorConfiguration coordConfig = new 
CheckpointCoordinatorConfiguration(
-                       50,
-                       3600000,
-                       0,
-                       Integer.MAX_VALUE,
-                       
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-                       true);
-
-               JobCheckpointingSettings snapshottingSettings = new 
JobCheckpointingSettings(
-                       Collections.singletonList(sourceVertex.getID()),
-                       Collections.singletonList(sourceVertex.getID()),
-                       Collections.singletonList(sourceVertex.getID()),
-                       coordConfig,
-                       null);
-
-               jobGraph.setSnapshotSettings(snapshottingSettings);
-
-               final TestingCluster testingCluster = new TestingCluster(
-                       new Configuration(),
-                       highAvailabilityServices,
-                       true,
-                       false);
-
-               try {
-                       testingCluster.start(true);
-
-                       FiniteDuration askTimeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
-                       ActorGateway jobManager = 
testingCluster.getLeaderGateway(askTimeout);
-
-                       testingCluster.submitJobDetached(jobGraph);
-
-                       // Wait for the source to be running otherwise the 
savepoint
-                       // barrier will not reach the task.
-                       Future<Object> allTasksAlive = jobManager.ask(
-                               new 
WaitForAllVerticesToBeRunning(jobGraph.getJobID()),
-                               askTimeout);
-                       Await.ready(allTasksAlive, askTimeout);
-
-                       // Cancel with savepoint. The expected outcome is that 
cancellation
-                       // fails due to a failed savepoint. After this, 
periodic checkpoints
-                       // should resume.
-                       Future<Object> cancellationFuture = jobManager.ask(
-                               new CancelJobWithSavepoint(jobGraph.getJobID(), 
savepointTarget.getAbsolutePath()),
-                               askTimeout);
-                       Object cancellationResponse = 
Await.result(cancellationFuture, askTimeout);
-
-                       if (cancellationResponse instanceof 
CancellationFailure) {
-                               if 
(!FailOnSavepointSourceTask.CHECKPOINT_AFTER_SAVEPOINT_LATCH.await(30, 
TimeUnit.SECONDS)) {
-                                       fail("No checkpoint was triggered after 
failed savepoint within expected duration");
-                               }
-                       } else {
-                               fail("Unexpected cancellation response from 
JobManager: " + cancellationResponse);
-                       }
-               } finally {
-                       testingCluster.stop();
-               }
-       }
-
-       /**
-        * Tests that a meaningful exception is returned if no savepoint 
directory is
-        * configured.
-        */
-       @Test
-       public void testCancelWithSavepointNoDirectoriesConfigured() throws 
Exception {
-               FiniteDuration timeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
-               Configuration config = new Configuration();
-
-               ActorSystem actorSystem = null;
-               ActorGateway jobManager = null;
-               ActorGateway archiver = null;
-               ActorGateway taskManager = null;
-               try {
-                       actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
-
-                       Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
-                               config,
-                               actorSystem,
-                               TestingUtils.defaultExecutor(),
-                               TestingUtils.defaultExecutor(),
-                               highAvailabilityServices,
-                               NoOpMetricRegistry.INSTANCE,
-                               Option.empty(),
-                               Option.apply("jm"),
-                               Option.apply("arch"),
-                               TestingJobManager.class,
-                               TestingMemoryArchivist.class);
-
-                       UUID leaderId = 
LeaderRetrievalUtils.retrieveLeaderSessionId(
-                               
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-                               TestingUtils.TESTING_TIMEOUT());
-
-                       jobManager = new AkkaActorGateway(master._1(), 
leaderId);
-                       archiver = new AkkaActorGateway(master._2(), leaderId);
-
-                       ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
-                               config,
-                               ResourceID.generate(),
-                               actorSystem,
-                               highAvailabilityServices,
-                               NoOpMetricRegistry.INSTANCE,
-                               "localhost",
-                               Option.apply("tm"),
-                               true,
-                               TestingTaskManager.class);
-
-                       taskManager = new AkkaActorGateway(taskManagerRef, 
leaderId);
-
-                       // Wait until connected
-                       Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
-                       Await.ready(taskManager.ask(msg, timeout), timeout);
-
-                       // Create job graph
-                       JobVertex sourceVertex = new JobVertex("Source");
-                       
sourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
-                       sourceVertex.setParallelism(1);
-
-                       JobGraph jobGraph = new JobGraph("TestingJob", 
sourceVertex);
-
-                       JobCheckpointingSettings snapshottingSettings = new 
JobCheckpointingSettings(
-                               Collections.singletonList(sourceVertex.getID()),
-                               Collections.singletonList(sourceVertex.getID()),
-                               Collections.singletonList(sourceVertex.getID()),
-                               new CheckpointCoordinatorConfiguration(
-                                       3600000,
-                                       3600000,
-                                       0,
-                                       Integer.MAX_VALUE,
-                                       
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-                                       true),
-                               null);
-
-                       jobGraph.setSnapshotSettings(snapshottingSettings);
-
-                       // Submit job graph
-                       msg = new SubmitJob(jobGraph, 
ListeningBehaviour.DETACHED);
-                       Await.result(jobManager.ask(msg, timeout), timeout);
-
-                       // Wait for all tasks to be running
-                       msg = new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
-                       Await.result(jobManager.ask(msg, timeout), timeout);
-
-                       // Cancel with savepoint
-                       msg = new CancelJobWithSavepoint(jobGraph.getJobID(), 
null);
-                       CancellationResponse cancelResp = 
(CancellationResponse) Await.result(jobManager.ask(msg, timeout), timeout);
-
-                       if (cancelResp instanceof CancellationFailure) {
-                               CancellationFailure failure = 
(CancellationFailure) cancelResp;
-                               assertTrue(failure.cause() instanceof 
IllegalStateException);
-                               
assertTrue(failure.cause().getMessage().contains("savepoint directory"));
-                       } else {
-                               fail("Unexpected cancellation response from 
JobManager: " + cancelResp);
-                       }
-               } finally {
-                       if (actorSystem != null) {
-                               actorSystem.terminate();
-                       }
-
-                       if (archiver != null) {
-                               archiver.actor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
-                       }
-
-                       if (jobManager != null) {
-                               
jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
-                       }
-
-                       if (taskManager != null) {
-                               
taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
-                       }
-               }
-       }
-
-       /**
-        * Tests that we can trigger a savepoint when periodic checkpoints are 
disabled.
-        */
-       @Test
-       public void testSavepointWithDeactivatedPeriodicCheckpointing() throws 
Exception {
-               File defaultSavepointDir = tmpFolder.newFolder();
-
-               FiniteDuration timeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
-               Configuration config = new Configuration();
-               config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
defaultSavepointDir.toURI().toString());
-
-               ActorSystem actorSystem = null;
-               ActorGateway jobManager = null;
-               ActorGateway archiver = null;
-               ActorGateway taskManager = null;
-               try {
-                       actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
-
-                       Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
-                               config,
-                               actorSystem,
-                               TestingUtils.defaultExecutor(),
-                               TestingUtils.defaultExecutor(),
-                               highAvailabilityServices,
-                               NoOpMetricRegistry.INSTANCE,
-                               Option.empty(),
-                               Option.apply("jm"),
-                               Option.apply("arch"),
-                               TestingJobManager.class,
-                               TestingMemoryArchivist.class);
-
-                       UUID leaderId = 
LeaderRetrievalUtils.retrieveLeaderSessionId(
-                               
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-                               TestingUtils.TESTING_TIMEOUT());
-
-                       jobManager = new AkkaActorGateway(master._1(), 
leaderId);
-                       archiver = new AkkaActorGateway(master._2(), leaderId);
-
-                       ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
-                               config,
-                               ResourceID.generate(),
-                               actorSystem,
-                               highAvailabilityServices,
-                               NoOpMetricRegistry.INSTANCE,
-                               "localhost",
-                               Option.apply("tm"),
-                               true,
-                               TestingTaskManager.class);
-
-                       taskManager = new AkkaActorGateway(taskManagerRef, 
leaderId);
-
-                       // Wait until connected
-                       Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
-                       Await.ready(taskManager.ask(msg, timeout), timeout);
-
-                       // Create job graph
-                       JobVertex sourceVertex = new JobVertex("Source");
-                       
sourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
-                       sourceVertex.setParallelism(1);
-
-                       JobGraph jobGraph = new JobGraph("TestingJob", 
sourceVertex);
-
-                       JobCheckpointingSettings snapshottingSettings = new 
JobCheckpointingSettings(
-                                       
Collections.singletonList(sourceVertex.getID()),
-                                       
Collections.singletonList(sourceVertex.getID()),
-                                       
Collections.singletonList(sourceVertex.getID()),
-                                       new CheckpointCoordinatorConfiguration(
-                                               Long.MAX_VALUE, // deactivated 
checkpointing
-                                               360000,
-                                               0,
-                                               Integer.MAX_VALUE,
-                                               
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-                                               true),
-                                       null);
-
-                       jobGraph.setSnapshotSettings(snapshottingSettings);
-
-                       // Submit job graph
-                       msg = new SubmitJob(jobGraph, 
ListeningBehaviour.DETACHED);
-                       Await.result(jobManager.ask(msg, timeout), timeout);
-
-                       // Wait for all tasks to be running
-                       msg = new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
-                       Await.result(jobManager.ask(msg, timeout), timeout);
-
-                       // Cancel with savepoint
-                       File targetDirectory = tmpFolder.newFolder();
-
-                       msg = new TriggerSavepoint(jobGraph.getJobID(), 
Option.apply(targetDirectory.getAbsolutePath()));
-                       Future<Object> future = jobManager.ask(msg, timeout);
-                       Object result = Await.result(future, timeout);
-
-                       assertTrue("Did not trigger savepoint", result 
instanceof TriggerSavepointSuccess);
-                       assertEquals(1, targetDirectory.listFiles().length);
-               } finally {
-                       if (actorSystem != null) {
-                               actorSystem.terminate();
-                       }
-
-                       if (archiver != null) {
-                               archiver.actor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
-                       }
-
-                       if (jobManager != null) {
-                               
jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
-                       }
-
-                       if (taskManager != null) {
-                               
taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
-                       }
-
-                       if (actorSystem != null) {
-                               Await.result(actorSystem.whenTerminated(), 
TestingUtils.TESTING_TIMEOUT());
-                       }
-               }
-       }
-
-       /**
-        * Tests that configured {@link SavepointRestoreSettings} are respected.
-        */
-       @Test
-       public void testSavepointRestoreSettings() throws Exception {
-               FiniteDuration timeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
-
-               ActorSystem actorSystem = null;
-               ActorGateway jobManager = null;
-               ActorGateway archiver = null;
-               ActorGateway taskManager = null;
-               try {
-                       actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
-
-                       Tuple2<ActorRef, ActorRef> master = 
JobManager.startJobManagerActors(
-                               new Configuration(),
-                               actorSystem,
-                               TestingUtils.defaultExecutor(),
-                               TestingUtils.defaultExecutor(),
-                               highAvailabilityServices,
-                               NoOpMetricRegistry.INSTANCE,
-                               Option.empty(),
-                               Option.apply("jm"),
-                               Option.apply("arch"),
-                               TestingJobManager.class,
-                               TestingMemoryArchivist.class);
-
-                       UUID leaderId = 
LeaderRetrievalUtils.retrieveLeaderSessionId(
-                               
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-                               TestingUtils.TESTING_TIMEOUT());
-
-                       jobManager = new AkkaActorGateway(master._1(), 
leaderId);
-                       archiver = new AkkaActorGateway(master._2(), leaderId);
-
-                       Configuration tmConfig = new Configuration();
-                       tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
4);
-
-                       ActorRef taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
-                               tmConfig,
-                               ResourceID.generate(),
-                               actorSystem,
-                               highAvailabilityServices,
-                               NoOpMetricRegistry.INSTANCE,
-                               "localhost",
-                               Option.apply("tm"),
-                               true,
-                               TestingTaskManager.class);
-
-                       taskManager = new AkkaActorGateway(taskManagerRef, 
leaderId);
-
-                       // Wait until connected
-                       Object msg = new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
-                       Await.ready(taskManager.ask(msg, timeout), timeout);
-
-                       // Create job graph
-                       JobVertex sourceVertex = new JobVertex("Source");
-                       
sourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
-                       sourceVertex.setParallelism(1);
-
-                       JobGraph jobGraph = new JobGraph("TestingJob", 
sourceVertex);
-
-                       JobCheckpointingSettings snapshottingSettings = new 
JobCheckpointingSettings(
-                                       
Collections.singletonList(sourceVertex.getID()),
-                                       
Collections.singletonList(sourceVertex.getID()),
-                                       
Collections.singletonList(sourceVertex.getID()),
-                                       new CheckpointCoordinatorConfiguration(
-                                               Long.MAX_VALUE, // deactivated 
checkpointing
-                                               360000,
-                                               0,
-                                               Integer.MAX_VALUE,
-                                               
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-                                               true),
-                                       null);
-
-                       jobGraph.setSnapshotSettings(snapshottingSettings);
-
-                       // Submit job graph
-                       msg = new SubmitJob(jobGraph, 
ListeningBehaviour.DETACHED);
-                       Await.result(jobManager.ask(msg, timeout), timeout);
-
-                       // Wait for all tasks to be running
-                       msg = new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
-                       Await.result(jobManager.ask(msg, timeout), timeout);
-
-                       // Trigger savepoint
-                       File targetDirectory = tmpFolder.newFolder();
-                       msg = new TriggerSavepoint(jobGraph.getJobID(), 
Option.apply(targetDirectory.getAbsolutePath()));
-                       Future<Object> future = jobManager.ask(msg, timeout);
-                       Object result = Await.result(future, timeout);
-
-                       String savepointPath = ((TriggerSavepointSuccess) 
result).savepointPath();
-
-                       // Cancel because of restarts
-                       msg = new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID());
-                       Future<?> removedFuture = jobManager.ask(msg, timeout);
-
-                       Future<?> cancelFuture = jobManager.ask(new 
CancelJob(jobGraph.getJobID()), timeout);
-                       Object response = Await.result(cancelFuture, timeout);
-                       assertTrue("Unexpected response: " + response, response 
instanceof CancellationSuccess);
-
-                       Await.ready(removedFuture, timeout);
-
-                       // Adjust the job (we need a new operator ID)
-                       JobVertex newSourceVertex = new JobVertex("NewSource");
-                       
newSourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
-                       newSourceVertex.setParallelism(1);
-
-                       JobGraph newJobGraph = new JobGraph("NewTestingJob", 
newSourceVertex);
-
-                       JobCheckpointingSettings newSnapshottingSettings = new 
JobCheckpointingSettings(
-                                       
Collections.singletonList(newSourceVertex.getID()),
-                                       
Collections.singletonList(newSourceVertex.getID()),
-                                       
Collections.singletonList(newSourceVertex.getID()),
-                                       new CheckpointCoordinatorConfiguration(
-                                               Long.MAX_VALUE, // deactivated 
checkpointing
-                                               360000,
-                                               0,
-                                               Integer.MAX_VALUE,
-                                               
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-                                               true),
-                                       null);
-
-                       
newJobGraph.setSnapshotSettings(newSnapshottingSettings);
-
-                       SavepointRestoreSettings restoreSettings = 
SavepointRestoreSettings.forPath(savepointPath, false);
-                       
newJobGraph.setSavepointRestoreSettings(restoreSettings);
-
-                       msg = new SubmitJob(newJobGraph, 
ListeningBehaviour.DETACHED);
-                       response = Await.result(jobManager.ask(msg, timeout), 
timeout);
-
-                       assertTrue("Unexpected response: " + response, response 
instanceof JobResultFailure);
-
-                       JobResultFailure failure = (JobResultFailure) response;
-                       Throwable cause = 
failure.cause().deserializeError(ClassLoader.getSystemClassLoader());
-
-                       assertTrue(cause instanceof IllegalStateException);
-                       
assertTrue(cause.getMessage().contains("allowNonRestoredState"));
-
-                       // Wait until removed
-                       msg = new 
TestingJobManagerMessages.NotifyWhenJobRemoved(newJobGraph.getJobID());
-                       Await.ready(jobManager.ask(msg, timeout), timeout);
-
-                       // Resubmit, but allow non restored state now
-                       restoreSettings = 
SavepointRestoreSettings.forPath(savepointPath, true);
-                       
newJobGraph.setSavepointRestoreSettings(restoreSettings);
-
-                       msg = new SubmitJob(newJobGraph, 
ListeningBehaviour.DETACHED);
-                       response = Await.result(jobManager.ask(msg, timeout), 
timeout);
-
-                       assertTrue("Unexpected response: " + response, response 
instanceof JobSubmitSuccess);
-               } finally {
-                       if (actorSystem != null) {
-                               actorSystem.terminate();
-                       }
-
-                       if (archiver != null) {
-                               archiver.actor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
-                       }
-
-                       if (jobManager != null) {
-                               
jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
-                       }
-
-                       if (taskManager != null) {
-                               
taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
-                       }
-
-                       if (actorSystem != null) {
-                               Await.ready(actorSystem.whenTerminated(), 
TestingUtils.TESTING_TIMEOUT());
-                       }
-               }
-       }
-
-       /**
-        * This tests makes sure that triggering a reconnection from the 
ResourceManager will stop after a new
-        * ResourceManager has connected. Furthermore it makes sure that there 
is not endless loop of reconnection
-        * commands (see FLINK-6341).
-        */
-       @Test
-       public void testResourceManagerConnection() throws TimeoutException, 
InterruptedException {
-               FiniteDuration testTimeout = new FiniteDuration(30L, 
TimeUnit.SECONDS);
-               final long reconnectionInterval = 200L;
-
-               final Configuration configuration = new Configuration();
-               
configuration.setLong(JobManagerOptions.RESOURCE_MANAGER_RECONNECT_INTERVAL, 
reconnectionInterval);
-
-               final ActorSystem actorSystem = 
AkkaUtils.createLocalActorSystem(configuration);
-
-               try {
-                       final ActorGateway jmGateway = 
TestingUtils.createJobManager(
-                               actorSystem,
-                               TestingUtils.defaultExecutor(),
-                               TestingUtils.defaultExecutor(),
-                               configuration,
-                               highAvailabilityServices);
-
-                       final TestProbe probe = TestProbe.apply(actorSystem);
-                       final AkkaActorGateway rmGateway = new 
AkkaActorGateway(probe.ref(), HighAvailabilityServices.DEFAULT_LEADER_ID);
-
-                       // wait for the JobManager to become the leader
-                       Future<?> leaderFuture = 
jmGateway.ask(TestingJobManagerMessages.getNotifyWhenLeader(), testTimeout);
-                       Await.ready(leaderFuture, testTimeout);
-
-                       jmGateway.tell(new 
RegisterResourceManager(probe.ref()), rmGateway);
-
-                       LeaderSessionMessage leaderSessionMessage = 
probe.expectMsgClass(LeaderSessionMessage.class);
-
-                       assertEquals(jmGateway.leaderSessionID(), 
leaderSessionMessage.leaderSessionID());
-                       assertTrue(leaderSessionMessage.message() instanceof 
RegisterResourceManagerSuccessful);
-
-                       jmGateway.tell(
-                               new RegistrationMessages.RegisterTaskManager(
-                                       ResourceID.generate(),
-                                       mock(TaskManagerLocation.class),
-                                       new HardwareDescription(1, 1L, 1L, 1L),
-                                       1));
-                       leaderSessionMessage = 
probe.expectMsgClass(LeaderSessionMessage.class);
-
-                       assertTrue(leaderSessionMessage.message() instanceof 
NotifyResourceStarted);
-
-                       // fail the NotifyResourceStarted so that we trigger 
the reconnection process on the JobManager's side
-                       probe.lastSender().tell(new Status.Failure(new 
Exception("Test exception")), ActorRef.noSender());
-
-                       Deadline reconnectionDeadline = new FiniteDuration(5L * 
reconnectionInterval, TimeUnit.MILLISECONDS).fromNow();
-                       boolean registered = false;
-
-                       while (reconnectionDeadline.hasTimeLeft()) {
-                               try {
-                                       leaderSessionMessage = 
probe.expectMsgClass(reconnectionDeadline.timeLeft(), 
LeaderSessionMessage.class);
-                               } catch (AssertionError ignored) {
-                                       // expected timeout after the 
reconnectionDeadline has been exceeded
-                                       continue;
-                               }
-
-                               if (leaderSessionMessage.message() instanceof 
TriggerRegistrationAtJobManager) {
-                                       if (registered) {
-                                               fail("A successful registration 
should not be followed by another TriggerRegistrationAtJobManager message.");
-                                       }
-
-                                       jmGateway.tell(new 
RegisterResourceManager(probe.ref()), rmGateway);
-                               } else if (leaderSessionMessage.message() 
instanceof RegisterResourceManagerSuccessful) {
-                                       // now we should no longer receive 
TriggerRegistrationAtJobManager messages
-                                       registered = true;
-                               } else {
-                                       fail("Received unknown message: " + 
leaderSessionMessage.message() + '.');
-                               }
-                       }
-
-                       assertTrue(registered);
-
-               } finally {
-                       // cleanup the actor system and with it all of the 
started actors if not already terminated
-                       actorSystem.terminate();
-                       Await.ready(actorSystem.whenTerminated(), 
Duration.Inf());
-               }
-       }
-
-       /**
-        * A blocking stateful source task that declines savepoints.
-        */
-       public static class FailOnSavepointSourceTask extends AbstractInvokable 
{
-
-               private static final CountDownLatch 
CHECKPOINT_AFTER_SAVEPOINT_LATCH = new CountDownLatch(1);
-
-               private boolean receivedSavepoint;
-
-               /**
-                * Create an Invokable task and set its environment.
-                *
-                * @param environment The environment assigned to this 
invokable.
-                */
-               public FailOnSavepointSourceTask(Environment environment) {
-                       super(environment);
-               }
-
-               @Override
-               public void invoke() throws Exception {
-                       new CountDownLatch(1).await();
-               }
-
-               @Override
-               public boolean triggerCheckpoint(
-                       CheckpointMetaData checkpointMetaData,
-                       CheckpointOptions checkpointOptions) throws Exception {
-                       if (checkpointOptions.getCheckpointType() == 
CheckpointType.SAVEPOINT) {
-                               receivedSavepoint = true;
-                               return false;
-                       } else if (receivedSavepoint) {
-                               CHECKPOINT_AFTER_SAVEPOINT_LATCH.countDown();
-                               return true;
-                       }
-                       return true;
-               }
-
-               @Override
-               public void triggerCheckpointOnBarrier(
-                       CheckpointMetaData checkpointMetaData,
-                       CheckpointOptions checkpointOptions,
-                       CheckpointMetrics checkpointMetrics) throws Exception {
-                       throw new UnsupportedOperationException("This is meant 
to be used as a source");
-               }
-
-               @Override
-               public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) throws Exception {
-               }
-
-               @Override
-               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-               }
-       }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 462b1d16da9..8f88579434d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -32,6 +32,7 @@
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
@@ -66,7 +67,9 @@
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
@@ -76,6 +79,9 @@
 import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -83,6 +89,7 @@
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
@@ -90,6 +97,7 @@
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
@@ -114,6 +122,8 @@
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -136,6 +146,7 @@
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -836,6 +847,215 @@ public int hashCode() {
                }
        }
 
+       @Test
+       public void testRequestKvStateWithoutRegistration() throws Exception {
+               final JobVertex vertex1 = new JobVertex("v1");
+               vertex1.setParallelism(4);
+               vertex1.setMaxParallelism(16);
+               vertex1.setInvokableClass(BlockingNoOpInvokable.class);
+
+               final JobVertex vertex2 = new JobVertex("v2");
+               vertex2.setParallelism(4);
+               vertex2.setMaxParallelism(16);
+               vertex2.setInvokableClass(BlockingNoOpInvokable.class);
+
+               final JobGraph graph = new JobGraph(vertex1, vertex2);
+
+               final JobMaster jobMaster = createJobMaster(
+                       configuration,
+                       graph,
+                       haServices,
+                       new TestingJobManagerSharedServicesBuilder().build(),
+                       heartbeatServices);
+
+               CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+               try {
+                       // wait for the start to complete
+                       startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       // lookup location
+                       try {
+                               
jobMaster.requestKvStateLocation(graph.getJobID(), "unknown").get();
+                       } catch (Exception e) {
+                               assertTrue(ExceptionUtils.findThrowable(e, 
UnknownKvStateLocation.class).isPresent());
+                       }
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
+       @Test
+       public void testRequestKvStateWithIrrelevantRegistration() throws 
Exception {
+               final JobVertex vertex1 = new JobVertex("v1");
+               vertex1.setParallelism(4);
+               vertex1.setMaxParallelism(16);
+               vertex1.setInvokableClass(BlockingNoOpInvokable.class);
+
+               final JobVertex vertex2 = new JobVertex("v2");
+               vertex2.setParallelism(4);
+               vertex2.setMaxParallelism(16);
+               vertex2.setInvokableClass(BlockingNoOpInvokable.class);
+
+               final JobGraph graph = new JobGraph(vertex1, vertex2);
+
+               final JobMaster jobMaster = createJobMaster(
+                       configuration,
+                       graph,
+                       haServices,
+                       new TestingJobManagerSharedServicesBuilder().build(),
+                       heartbeatServices);
+
+               CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+               try {
+                       // wait for the start to complete
+                       startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       // register an irrelevant KvState
+                       try {
+                               jobMaster.notifyKvStateRegistered(
+                                       new JobID(),
+                                       new JobVertexID(),
+                                       new KeyGroupRange(0, 0),
+                                       "any-name",
+                                       new KvStateID(),
+                                       new 
InetSocketAddress(InetAddress.getLocalHost(), 1233)).get();
+                       } catch (Exception e) {
+                               assertTrue(ExceptionUtils.findThrowable(e, 
FlinkJobNotFoundException.class).isPresent());
+                       }
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
+       @Test
+       public void testRegisterAndUnregisterKvState() throws Exception {
+               final JobVertex vertex1 = new JobVertex("v1");
+               vertex1.setParallelism(4);
+               vertex1.setMaxParallelism(16);
+               vertex1.setInvokableClass(BlockingNoOpInvokable.class);
+
+               final JobVertex vertex2 = new JobVertex("v2");
+               vertex2.setParallelism(4);
+               vertex2.setMaxParallelism(16);
+               vertex2.setInvokableClass(BlockingNoOpInvokable.class);
+
+               final JobGraph graph = new JobGraph(vertex1, vertex2);
+
+               final JobMaster jobMaster = createJobMaster(
+                       configuration,
+                       graph,
+                       haServices,
+                       new TestingJobManagerSharedServicesBuilder().build(),
+                       heartbeatServices);
+
+               CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+               try {
+                       // wait for the start to complete
+                       startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       try {
+                               // register a KvState
+                               final String registrationName = "register-me";
+                               final KvStateID kvStateID = new KvStateID();
+                               final KeyGroupRange keyGroupRange = new 
KeyGroupRange(0, 0);
+                               final InetSocketAddress address = new 
InetSocketAddress(InetAddress.getLocalHost(), 1029);
+
+                               jobMaster.notifyKvStateRegistered(
+                                       graph.getJobID(),
+                                       vertex1.getID(),
+                                       keyGroupRange,
+                                       registrationName,
+                                       kvStateID,
+                                       address).get();
+
+                               final KvStateLocation location = 
jobMaster.requestKvStateLocation(graph.getJobID(), registrationName).get();
+
+                               assertEquals(graph.getJobID(), 
location.getJobId());
+                               assertEquals(vertex1.getID(), 
location.getJobVertexId());
+                               assertEquals(vertex1.getMaxParallelism(), 
location.getNumKeyGroups());
+                               assertEquals(1, 
location.getNumRegisteredKeyGroups());
+                               assertEquals(1, 
keyGroupRange.getNumberOfKeyGroups());
+                               assertEquals(kvStateID, 
location.getKvStateID(keyGroupRange.getStartKeyGroup()));
+                               assertEquals(address, 
location.getKvStateServerAddress(keyGroupRange.getStartKeyGroup()));
+
+                               // unregister the KvState
+                               jobMaster.notifyKvStateUnregistered(
+                                       graph.getJobID(),
+                                       vertex1.getID(),
+                                       keyGroupRange,
+                                       registrationName).get();
+
+                               
jobMaster.requestKvStateLocation(graph.getJobID(), registrationName).get();
+                       } catch (Exception e) {
+                               assertTrue(ExceptionUtils.findThrowable(e, 
UnknownKvStateLocation.class).isPresent());
+                       }
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
+       @Test
+       public void testDuplicatedKvStateRegistrationsFailTask() throws 
Exception {
+               final JobVertex vertex1 = new JobVertex("v1");
+               vertex1.setParallelism(4);
+               vertex1.setMaxParallelism(16);
+               vertex1.setInvokableClass(BlockingNoOpInvokable.class);
+
+               final JobVertex vertex2 = new JobVertex("v2");
+               vertex2.setParallelism(4);
+               vertex2.setMaxParallelism(16);
+               vertex2.setInvokableClass(BlockingNoOpInvokable.class);
+
+               final JobGraph graph = new JobGraph(vertex1, vertex2);
+
+               final JobMaster jobMaster = createJobMaster(
+                       configuration,
+                       graph,
+                       haServices,
+                       new TestingJobManagerSharedServicesBuilder().build(),
+                       heartbeatServices);
+
+               CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
+
+               try {
+                       // wait for the start to complete
+                       startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       // duplicate registration fails task
+                       try {
+                               // register a KvState
+                               final String registrationName = "duplicate-me";
+                               final KvStateID kvStateID = new KvStateID();
+                               final KeyGroupRange keyGroupRange = new 
KeyGroupRange(0, 0);
+                               final InetSocketAddress address = new 
InetSocketAddress(InetAddress.getLocalHost(), 4396);
+
+                               jobMaster.notifyKvStateRegistered(
+                                       graph.getJobID(),
+                                       vertex1.getID(),
+                                       keyGroupRange,
+                                       registrationName,
+                                       kvStateID,
+                                       address).get();
+
+                               jobMaster.notifyKvStateRegistered(
+                                       graph.getJobID(),
+                                       vertex2.getID(), // <--- different 
operator, but...
+                                       keyGroupRange,
+                                       registrationName,  // ...same name
+                                       kvStateID,
+                                       address).get();
+                       } catch (Exception e) {
+                               
assertTrue(ExceptionUtils.findThrowableWithMessage(e, "Registration name 
clash").isPresent());
+                               assertEquals(JobStatus.FAILED, 
jobMaster.getExecutionGraph().getState());
+                       }
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
        /**
         * Tests the {@link 
JobMaster#requestPartitionState(IntermediateDataSetID, ResultPartitionID)}
         * call for a finished result partition.
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index d02a55483bf..784aa7df1e8 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.akka
 
-import java.net.InetSocketAddress
+import java.net.{InetAddress, InetSocketAddress}
 
 import org.apache.flink.configuration.{AkkaOptions, Configuration, 
IllegalConfigurationException}
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
@@ -167,4 +167,12 @@ class AkkaUtilsTest
     akkaConfig.getString("akka.remote.netty.tcp.hostname") should
       equal(NetUtils.unresolvedHostToNormalizedString(hostname))
   }
+
+  test("null hostname should go to localhost") {
+    val configure = AkkaUtils.getAkkaConfig(new Configuration(), Some((null, 
1772)))
+
+    val hostname = configure.getString("akka.remote.netty.tcp.hostname")
+
+    InetAddress.getByName(hostname).isLoopbackAddress should be (true)
+  }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
index 8ff85e05c21..c4139af2c16 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
@@ -39,7 +39,6 @@
 import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.Assume;
-import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -78,8 +77,7 @@
        private MiniClusterClient clusterClient;
        private JobGraph jobGraph;
 
-       @Before
-       public void setUp() throws Exception {
+       private void setUpWithCheckpointInterval(long checkpointInterval) 
throws Exception {
                invokeLatch = new CountDownLatch(1);
                triggerCheckpointLatch = new CountDownLatch(1);
                savepointDirectory = temporaryFolder.newFolder().toPath();
@@ -102,14 +100,13 @@ public void setUp() throws Exception {
                        Collections.singletonList(vertex.getID()),
                        Collections.singletonList(vertex.getID()),
                        new CheckpointCoordinatorConfiguration(
-                               10,
+                               checkpointInterval,
                                60_000,
                                10,
                                1,
                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
                                true),
-                       null
-               ));
+                       null));
 
                clusterClient.submitJob(jobGraph, 
ClassLoader.getSystemClassLoader());
                invokeLatch.await(60, TimeUnit.SECONDS);
@@ -118,6 +115,22 @@ public void setUp() throws Exception {
 
        @Test
        public void testStopJobAfterSavepoint() throws Exception {
+               setUpWithCheckpointInterval(10);
+
+               final String savepointLocation = cancelWithSavepoint();
+               final JobStatus jobStatus = 
clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS);
+
+               assertThat(jobStatus, isOneOf(JobStatus.CANCELED, 
JobStatus.CANCELLING));
+
+               final List<Path> savepoints = 
Files.list(savepointDirectory).map(Path::getFileName).collect(Collectors.toList());
+               assertThat(savepoints, 
hasItem(Paths.get(savepointLocation).getFileName()));
+       }
+
+       @Test
+       public void 
testStopJobAfterSavepointWithDeactivatedPeriodicCheckpointing() throws 
Exception {
+               // set checkpointInterval to Long.MAX_VALUE, which means 
deactivated checkpointing
+               setUpWithCheckpointInterval(Long.MAX_VALUE);
+
                final String savepointLocation = cancelWithSavepoint();
                final JobStatus jobStatus = 
clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS);
 
@@ -129,6 +142,8 @@ public void testStopJobAfterSavepoint() throws Exception {
 
        @Test
        public void testDoNotCancelJobIfSavepointFails() throws Exception {
+               setUpWithCheckpointInterval(10);
+
                try {
                        Files.setPosixFilePermissions(savepointDirectory, 
Collections.emptySet());
                } catch (IOException e) {
@@ -186,7 +201,7 @@ public void invoke() {
                }
 
                @Override
-               public boolean triggerCheckpoint(final CheckpointMetaData 
checkpointMetaData, final CheckpointOptions checkpointOptions) throws Exception 
{
+               public boolean triggerCheckpoint(final CheckpointMetaData 
checkpointMetaData, final CheckpointOptions checkpointOptions) {
                        final TaskStateSnapshot checkpointStateHandles = new 
TaskStateSnapshot();
                        checkpointStateHandles.putSubtaskStateByOperatorID(
                                
OperatorID.fromJobVertexID(getEnvironment().getJobVertexId()),
@@ -203,7 +218,7 @@ public boolean triggerCheckpoint(final CheckpointMetaData 
checkpointMetaData, fi
                }
 
                @Override
-               public void notifyCheckpointComplete(final long checkpointId) 
throws Exception {
+               public void notifyCheckpointComplete(final long checkpointId) {
                }
        }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port JobManagerTest to new code base
> ------------------------------------
>
>                 Key: FLINK-10406
>                 URL: https://issues.apache.org/jira/browse/FLINK-10406
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Tests
>    Affects Versions: 1.7.0
>            Reporter: tison
>            Assignee: tison
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to