Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2256#discussion_r71510844 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java --- @@ -243,6 +256,268 @@ public void testJobRecoveryWhenLosingLeadership() throws Exception { } } + /** + * Tests that the persisted job is not removed from the job graph store + * after the postStop method of the JobManager. Furthermore, it checks + * that BLOBs of the JobGraph are recovered properly and cleaned up after + * the job finishes. + */ + @Test + public void testBlobRecoveryAfterLostJobManager() throws Exception { + FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS); + FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, TimeUnit.SECONDS); + Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); + Configuration flinkConfiguration = new Configuration(); + UUID leaderSessionID = UUID.randomUUID(); + UUID newLeaderSessionID = UUID.randomUUID(); + int slots = 2; + ActorRef archiveRef = null; + ActorRef jobManagerRef = null; + ActorRef taskManagerRef = null; + + String haStoragePath = temporaryFolder.newFolder().toString(); + + flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); + flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, haStoragePath); + flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots); + + try { + MySubmittedJobGraphStore mySubmittedJobGraphStore = new MySubmittedJobGraphStore(); + TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService(); + TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService(); + + archiveRef = system.actorOf(Props.create( + MemoryArchivist.class, + 10), "archive"); + + jobManagerRef = createJobManagerActor( + "jobmanager-0", + flinkConfiguration, + myLeaderElectionService, + mySubmittedJobGraphStore, + 3600000, + timeout, + jobRecoveryTimeout, archiveRef); + + ActorGateway jobManager = new AkkaActorGateway(jobManagerRef, leaderSessionID); + + taskManagerRef = TaskManager.startTaskManagerComponentsAndActor( + flinkConfiguration, + ResourceID.generate(), + system, + "localhost", + Option.apply("taskmanager"), + Option.apply((LeaderRetrievalService) myLeaderRetrievalService), + true, + TestingTaskManager.class); + + ActorGateway tmGateway = new AkkaActorGateway(taskManagerRef, leaderSessionID); + + Future<Object> tmAlive = tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft()); + + Await.ready(tmAlive, deadline.timeLeft()); + + JobVertex sourceJobVertex = new JobVertex("Source"); + sourceJobVertex.setInvokableClass(BlockingInvokable.class); + sourceJobVertex.setParallelism(slots); + + JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex); + + // Upload fake JAR file to first JobManager + File jarFile = temporaryFolder.newFile(); + ZipOutputStream out = new ZipOutputStream(new FileOutputStream(jarFile)); + out.close(); + + jobGraph.addJar(new Path(jarFile.toURI())); + JobClient.uploadJarFiles(jobGraph, jobManager, deadline.timeLeft()); + + Future<Object> isLeader = jobManager.ask( + TestingJobManagerMessages.getNotifyWhenLeader(), + deadline.timeLeft()); + + Future<Object> isConnectedToJobManager = tmGateway.ask( + new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef), + deadline.timeLeft()); + + // tell jobManager that he's the leader + myLeaderElectionService.isLeader(leaderSessionID); + // tell taskManager who's the leader + myLeaderRetrievalService.notifyListener(jobManager.path(), leaderSessionID); + + Await.ready(isLeader, deadline.timeLeft()); + Await.ready(isConnectedToJobManager, deadline.timeLeft()); + + // submit blocking job + Future<Object> jobSubmitted = jobManager.ask( + new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), + deadline.timeLeft()); + + Await.ready(jobSubmitted, deadline.timeLeft()); + + // Wait for running + Future<Object> jobRunning = jobManager.ask( + new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING), + deadline.timeLeft()); + + Await.ready(jobRunning, deadline.timeLeft()); + + // terminate the job manager + jobManagerRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); + + Future<Boolean> terminatedFuture = Patterns.gracefulStop(jobManagerRef, deadline.timeLeft()); + Boolean terminated = Await.result(terminatedFuture, deadline.timeLeft()); + assertTrue("Failed to stop job manager", terminated); + + // job stays in the submitted job graph store + assertTrue(mySubmittedJobGraphStore.contains(jobGraph.getJobID())); + + // start new job manager + myLeaderElectionService.reset(); + + jobManagerRef = createJobManagerActor( + "jobmanager-1", + flinkConfiguration, + myLeaderElectionService, + mySubmittedJobGraphStore, + 500, + timeout, + jobRecoveryTimeout, + archiveRef); + + jobManager = new AkkaActorGateway(jobManagerRef, newLeaderSessionID); + + Future<Object> isAlive = jobManager.ask(TestingMessages.getAlive(), deadline.timeLeft()); + + isLeader = jobManager.ask( + TestingJobManagerMessages.getNotifyWhenLeader(), + deadline.timeLeft()); + + isConnectedToJobManager = tmGateway.ask( + new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef), + deadline.timeLeft()); + + Await.ready(isAlive, deadline.timeLeft()); + + // tell new jobManager that he's the leader + myLeaderElectionService.isLeader(newLeaderSessionID); + // tell taskManager who's the leader + myLeaderRetrievalService.notifyListener(jobManager.path(), newLeaderSessionID); + + Await.ready(isLeader, deadline.timeLeft()); + Await.ready(isConnectedToJobManager, deadline.timeLeft()); + + jobRunning = jobManager.ask( + new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING), + deadline.timeLeft()); + + // wait that the job is recovered and reaches state RUNNING + Await.ready(jobRunning, deadline.timeLeft()); + + Future<Object> jobFinished = jobManager.ask( + new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), + deadline.timeLeft()); + + BlockingInvokable.unblock(); + + // wait til the job has finished + Await.ready(jobFinished, deadline.timeLeft()); + + // check that the job has been removed from the submitted job graph store + assertFalse(mySubmittedJobGraphStore.contains(jobGraph.getJobID())); + + // Check that the BLOB store files are removed + File rootPath = new File(haStoragePath); + + boolean cleanedUpFiles = false; + while (deadline.hasTimeLeft()) { + if (listFiles(rootPath).isEmpty()) { --- End diff -- We check that the directory no longer contains files. But we don't check for folders, right? I think that we no longer delete the folders created by the BlobStore. We could maybe check in `BlobStore.cleanUp` whether there are any empty folders which we can delete. Do you think that this could be relevant?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---