[ https://issues.apache.org/jira/browse/FLINK-4150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15385722#comment-15385722 ]
ASF GitHub Bot commented on FLINK-4150: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2256#discussion_r71510844 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java --- @@ -243,6 +256,268 @@ public void testJobRecoveryWhenLosingLeadership() throws Exception { } } + /** + * Tests that the persisted job is not removed from the job graph store + * after the postStop method of the JobManager. Furthermore, it checks + * that BLOBs of the JobGraph are recovered properly and cleaned up after + * the job finishes. + */ + @Test + public void testBlobRecoveryAfterLostJobManager() throws Exception { + FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS); + FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, TimeUnit.SECONDS); + Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow(); + Configuration flinkConfiguration = new Configuration(); + UUID leaderSessionID = UUID.randomUUID(); + UUID newLeaderSessionID = UUID.randomUUID(); + int slots = 2; + ActorRef archiveRef = null; + ActorRef jobManagerRef = null; + ActorRef taskManagerRef = null; + + String haStoragePath = temporaryFolder.newFolder().toString(); + + flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); + flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, haStoragePath); + flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slots); + + try { + MySubmittedJobGraphStore mySubmittedJobGraphStore = new MySubmittedJobGraphStore(); + TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService(); + TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService(); + + archiveRef = system.actorOf(Props.create( + MemoryArchivist.class, + 10), "archive"); + + jobManagerRef = createJobManagerActor( + "jobmanager-0", + flinkConfiguration, + myLeaderElectionService, + mySubmittedJobGraphStore, + 3600000, + timeout, + jobRecoveryTimeout, archiveRef); + + ActorGateway jobManager = new AkkaActorGateway(jobManagerRef, leaderSessionID); + + taskManagerRef = TaskManager.startTaskManagerComponentsAndActor( + flinkConfiguration, + ResourceID.generate(), + system, + "localhost", + Option.apply("taskmanager"), + Option.apply((LeaderRetrievalService) myLeaderRetrievalService), + true, + TestingTaskManager.class); + + ActorGateway tmGateway = new AkkaActorGateway(taskManagerRef, leaderSessionID); + + Future<Object> tmAlive = tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft()); + + Await.ready(tmAlive, deadline.timeLeft()); + + JobVertex sourceJobVertex = new JobVertex("Source"); + sourceJobVertex.setInvokableClass(BlockingInvokable.class); + sourceJobVertex.setParallelism(slots); + + JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex); + + // Upload fake JAR file to first JobManager + File jarFile = temporaryFolder.newFile(); + ZipOutputStream out = new ZipOutputStream(new FileOutputStream(jarFile)); + out.close(); + + jobGraph.addJar(new Path(jarFile.toURI())); + JobClient.uploadJarFiles(jobGraph, jobManager, deadline.timeLeft()); + + Future<Object> isLeader = jobManager.ask( + TestingJobManagerMessages.getNotifyWhenLeader(), + deadline.timeLeft()); + + Future<Object> isConnectedToJobManager = tmGateway.ask( + new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef), + deadline.timeLeft()); + + // tell jobManager that he's the leader + myLeaderElectionService.isLeader(leaderSessionID); + // tell taskManager who's the leader + myLeaderRetrievalService.notifyListener(jobManager.path(), leaderSessionID); + + Await.ready(isLeader, deadline.timeLeft()); + Await.ready(isConnectedToJobManager, deadline.timeLeft()); + + // submit blocking job + Future<Object> jobSubmitted = jobManager.ask( + new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), + deadline.timeLeft()); + + Await.ready(jobSubmitted, deadline.timeLeft()); + + // Wait for running + Future<Object> jobRunning = jobManager.ask( + new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING), + deadline.timeLeft()); + + Await.ready(jobRunning, deadline.timeLeft()); + + // terminate the job manager + jobManagerRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); + + Future<Boolean> terminatedFuture = Patterns.gracefulStop(jobManagerRef, deadline.timeLeft()); + Boolean terminated = Await.result(terminatedFuture, deadline.timeLeft()); + assertTrue("Failed to stop job manager", terminated); + + // job stays in the submitted job graph store + assertTrue(mySubmittedJobGraphStore.contains(jobGraph.getJobID())); + + // start new job manager + myLeaderElectionService.reset(); + + jobManagerRef = createJobManagerActor( + "jobmanager-1", + flinkConfiguration, + myLeaderElectionService, + mySubmittedJobGraphStore, + 500, + timeout, + jobRecoveryTimeout, + archiveRef); + + jobManager = new AkkaActorGateway(jobManagerRef, newLeaderSessionID); + + Future<Object> isAlive = jobManager.ask(TestingMessages.getAlive(), deadline.timeLeft()); + + isLeader = jobManager.ask( + TestingJobManagerMessages.getNotifyWhenLeader(), + deadline.timeLeft()); + + isConnectedToJobManager = tmGateway.ask( + new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef), + deadline.timeLeft()); + + Await.ready(isAlive, deadline.timeLeft()); + + // tell new jobManager that he's the leader + myLeaderElectionService.isLeader(newLeaderSessionID); + // tell taskManager who's the leader + myLeaderRetrievalService.notifyListener(jobManager.path(), newLeaderSessionID); + + Await.ready(isLeader, deadline.timeLeft()); + Await.ready(isConnectedToJobManager, deadline.timeLeft()); + + jobRunning = jobManager.ask( + new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING), + deadline.timeLeft()); + + // wait that the job is recovered and reaches state RUNNING + Await.ready(jobRunning, deadline.timeLeft()); + + Future<Object> jobFinished = jobManager.ask( + new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), + deadline.timeLeft()); + + BlockingInvokable.unblock(); + + // wait til the job has finished + Await.ready(jobFinished, deadline.timeLeft()); + + // check that the job has been removed from the submitted job graph store + assertFalse(mySubmittedJobGraphStore.contains(jobGraph.getJobID())); + + // Check that the BLOB store files are removed + File rootPath = new File(haStoragePath); + + boolean cleanedUpFiles = false; + while (deadline.hasTimeLeft()) { + if (listFiles(rootPath).isEmpty()) { --- End diff -- We check that the directory no longer contains files. But we don't check for folders, right? I think that we no longer delete the folders created by the BlobStore. We could maybe check in `BlobStore.cleanUp` whether there are any empty folders which we can delete. Do you think that this could be relevant? > Problem with Blobstore in Yarn HA setting on recovery after cluster shutdown > ---------------------------------------------------------------------------- > > Key: FLINK-4150 > URL: https://issues.apache.org/jira/browse/FLINK-4150 > Project: Flink > Issue Type: Bug > Components: Job-Submission > Reporter: Stefan Richter > Assignee: Ufuk Celebi > Priority: Blocker > Fix For: 1.1.0 > > > Submitting a job in Yarn with HA can lead to the following exception: > {code} > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load > user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 > ClassLoader info: URL ClassLoader: > file: > '/tmp/blobStore-ccec0f4a-3e07-455f-945b-4fcd08f5bac1/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0' > (invalid JAR: zip file is empty) > Class not resolvable through given classloader. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:222) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:588) > at java.lang.Thread.run(Thread.java:745) > {code} > Some job information, including the Blob ids, are stored in Zookeeper. The > actual Blobs are stored in a dedicated BlobStore, if the recovery mode is set > to Zookeeper. This BlobStore is typically located in a FS like HDFS. When the > cluster is shut down, the path for the BlobStore is deleted. When the cluster > is then restarted, recovering jobs cannot restore because it's Blob ids > stored in Zookeeper now point to deleted files. -- This message was sent by Atlassian JIRA (v6.3.4#6332)