Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2256#discussion_r71510844
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 ---
    @@ -243,6 +256,268 @@ public void testJobRecoveryWhenLosingLeadership() 
throws Exception {
                }
        }
     
    +   /**
    +    * Tests that the persisted job is not removed from the job graph store
    +    * after the postStop method of the JobManager. Furthermore, it checks
    +    * that BLOBs of the JobGraph are recovered properly and cleaned up 
after
    +    * the job finishes.
    +    */
    +   @Test
    +   public void testBlobRecoveryAfterLostJobManager() throws Exception {
    +           FiniteDuration timeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
    +           FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, 
TimeUnit.SECONDS);
    +           Deadline deadline = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();
    +           Configuration flinkConfiguration = new Configuration();
    +           UUID leaderSessionID = UUID.randomUUID();
    +           UUID newLeaderSessionID = UUID.randomUUID();
    +           int slots = 2;
    +           ActorRef archiveRef = null;
    +           ActorRef jobManagerRef = null;
    +           ActorRef taskManagerRef = null;
    +
    +           String haStoragePath = temporaryFolder.newFolder().toString();
    +
    +           flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
    +           
flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, 
haStoragePath);
    +           
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
slots);
    +
    +           try {
    +                   MySubmittedJobGraphStore mySubmittedJobGraphStore = new 
MySubmittedJobGraphStore();
    +                   TestingLeaderElectionService myLeaderElectionService = 
new TestingLeaderElectionService();
    +                   TestingLeaderRetrievalService myLeaderRetrievalService 
= new TestingLeaderRetrievalService();
    +
    +                   archiveRef = system.actorOf(Props.create(
    +                                   MemoryArchivist.class,
    +                                   10), "archive");
    +
    +                   jobManagerRef = createJobManagerActor(
    +                                   "jobmanager-0",
    +                                   flinkConfiguration,
    +                                   myLeaderElectionService,
    +                                   mySubmittedJobGraphStore,
    +                                   3600000,
    +                                   timeout,
    +                                   jobRecoveryTimeout, archiveRef);
    +
    +                   ActorGateway jobManager = new 
AkkaActorGateway(jobManagerRef, leaderSessionID);
    +
    +                   taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
    +                                   flinkConfiguration,
    +                                   ResourceID.generate(),
    +                                   system,
    +                                   "localhost",
    +                                   Option.apply("taskmanager"),
    +                                   Option.apply((LeaderRetrievalService) 
myLeaderRetrievalService),
    +                                   true,
    +                                   TestingTaskManager.class);
    +
    +                   ActorGateway tmGateway = new 
AkkaActorGateway(taskManagerRef, leaderSessionID);
    +
    +                   Future<Object> tmAlive = 
tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft());
    +
    +                   Await.ready(tmAlive, deadline.timeLeft());
    +
    +                   JobVertex sourceJobVertex = new JobVertex("Source");
    +                   
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
    +                   sourceJobVertex.setParallelism(slots);
    +
    +                   JobGraph jobGraph = new JobGraph("TestingJob", 
sourceJobVertex);
    +
    +                   // Upload fake JAR file to first JobManager
    +                   File jarFile = temporaryFolder.newFile();
    +                   ZipOutputStream out = new ZipOutputStream(new 
FileOutputStream(jarFile));
    +                   out.close();
    +
    +                   jobGraph.addJar(new Path(jarFile.toURI()));
    +                   JobClient.uploadJarFiles(jobGraph, jobManager, 
deadline.timeLeft());
    +
    +                   Future<Object> isLeader = jobManager.ask(
    +                                   
TestingJobManagerMessages.getNotifyWhenLeader(),
    +                                   deadline.timeLeft());
    +
    +                   Future<Object> isConnectedToJobManager = tmGateway.ask(
    +                                   new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef),
    +                                   deadline.timeLeft());
    +
    +                   // tell jobManager that he's the leader
    +                   myLeaderElectionService.isLeader(leaderSessionID);
    +                   // tell taskManager who's the leader
    +                   
myLeaderRetrievalService.notifyListener(jobManager.path(), leaderSessionID);
    +
    +                   Await.ready(isLeader, deadline.timeLeft());
    +                   Await.ready(isConnectedToJobManager, 
deadline.timeLeft());
    +
    +                   // submit blocking job
    +                   Future<Object> jobSubmitted = jobManager.ask(
    +                                   new 
JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
    +                                   deadline.timeLeft());
    +
    +                   Await.ready(jobSubmitted, deadline.timeLeft());
    +
    +                   // Wait for running
    +                   Future<Object> jobRunning = jobManager.ask(
    +                                   new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), 
JobStatus.RUNNING),
    +                                   deadline.timeLeft());
    +
    +                   Await.ready(jobRunning, deadline.timeLeft());
    +
    +                   // terminate the job manager
    +                   jobManagerRef.tell(PoisonPill.getInstance(), 
ActorRef.noSender());
    +
    +                   Future<Boolean> terminatedFuture = 
Patterns.gracefulStop(jobManagerRef, deadline.timeLeft());
    +                   Boolean terminated = Await.result(terminatedFuture, 
deadline.timeLeft());
    +                   assertTrue("Failed to stop job manager", terminated);
    +
    +                   // job stays in the submitted job graph store
    +                   
assertTrue(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
    +
    +                   // start new job manager
    +                   myLeaderElectionService.reset();
    +
    +                   jobManagerRef = createJobManagerActor(
    +                                   "jobmanager-1",
    +                                   flinkConfiguration,
    +                                   myLeaderElectionService,
    +                                   mySubmittedJobGraphStore,
    +                                   500,
    +                                   timeout,
    +                                   jobRecoveryTimeout,
    +                                   archiveRef);
    +
    +                   jobManager = new AkkaActorGateway(jobManagerRef, 
newLeaderSessionID);
    +
    +                   Future<Object> isAlive = 
jobManager.ask(TestingMessages.getAlive(), deadline.timeLeft());
    +
    +                   isLeader = jobManager.ask(
    +                                   
TestingJobManagerMessages.getNotifyWhenLeader(),
    +                                   deadline.timeLeft());
    +
    +                   isConnectedToJobManager = tmGateway.ask(
    +                                   new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef),
    +                                   deadline.timeLeft());
    +
    +                   Await.ready(isAlive, deadline.timeLeft());
    +
    +                   // tell new jobManager that he's the leader
    +                   myLeaderElectionService.isLeader(newLeaderSessionID);
    +                   // tell taskManager who's the leader
    +                   
myLeaderRetrievalService.notifyListener(jobManager.path(), newLeaderSessionID);
    +
    +                   Await.ready(isLeader, deadline.timeLeft());
    +                   Await.ready(isConnectedToJobManager, 
deadline.timeLeft());
    +
    +                   jobRunning = jobManager.ask(
    +                                   new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), 
JobStatus.RUNNING),
    +                                   deadline.timeLeft());
    +
    +                   // wait that the job is recovered and reaches state 
RUNNING
    +                   Await.ready(jobRunning, deadline.timeLeft());
    +
    +                   Future<Object> jobFinished = jobManager.ask(
    +                                   new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
    +                                   deadline.timeLeft());
    +
    +                   BlockingInvokable.unblock();
    +
    +                   // wait til the job has finished
    +                   Await.ready(jobFinished, deadline.timeLeft());
    +
    +                   // check that the job has been removed from the 
submitted job graph store
    +                   
assertFalse(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
    +
    +                   // Check that the BLOB store files are removed
    +                   File rootPath = new File(haStoragePath);
    +
    +                   boolean cleanedUpFiles = false;
    +                   while (deadline.hasTimeLeft()) {
    +                           if (listFiles(rootPath).isEmpty()) {
    --- End diff --
    
    We check that the directory no longer contains files. But we don't check 
for folders, right? I think that we no longer delete the folders created by the 
BlobStore. We could maybe check in `BlobStore.cleanUp` whether there are any 
empty folders which we can delete. Do you think that this could be relevant? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to