Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2256#discussion_r71711908
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 ---
    @@ -243,6 +256,268 @@ public void testJobRecoveryWhenLosingLeadership() 
throws Exception {
                }
        }
     
    +   /**
    +    * Tests that the persisted job is not removed from the job graph store
    +    * after the postStop method of the JobManager. Furthermore, it checks
    +    * that BLOBs of the JobGraph are recovered properly and cleaned up 
after
    +    * the job finishes.
    +    */
    +   @Test
    +   public void testBlobRecoveryAfterLostJobManager() throws Exception {
    +           FiniteDuration timeout = new FiniteDuration(30, 
TimeUnit.SECONDS);
    +           FiniteDuration jobRecoveryTimeout = new FiniteDuration(3, 
TimeUnit.SECONDS);
    +           Deadline deadline = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();
    +           Configuration flinkConfiguration = new Configuration();
    +           UUID leaderSessionID = UUID.randomUUID();
    +           UUID newLeaderSessionID = UUID.randomUUID();
    +           int slots = 2;
    +           ActorRef archiveRef = null;
    +           ActorRef jobManagerRef = null;
    +           ActorRef taskManagerRef = null;
    +
    +           String haStoragePath = temporaryFolder.newFolder().toString();
    +
    +           flinkConfiguration.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
    +           
flinkConfiguration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, 
haStoragePath);
    +           
flinkConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
slots);
    +
    +           try {
    +                   MySubmittedJobGraphStore mySubmittedJobGraphStore = new 
MySubmittedJobGraphStore();
    +                   TestingLeaderElectionService myLeaderElectionService = 
new TestingLeaderElectionService();
    +                   TestingLeaderRetrievalService myLeaderRetrievalService 
= new TestingLeaderRetrievalService();
    +
    +                   archiveRef = system.actorOf(Props.create(
    +                                   MemoryArchivist.class,
    +                                   10), "archive");
    +
    +                   jobManagerRef = createJobManagerActor(
    +                                   "jobmanager-0",
    +                                   flinkConfiguration,
    +                                   myLeaderElectionService,
    +                                   mySubmittedJobGraphStore,
    +                                   3600000,
    +                                   timeout,
    +                                   jobRecoveryTimeout, archiveRef);
    +
    +                   ActorGateway jobManager = new 
AkkaActorGateway(jobManagerRef, leaderSessionID);
    +
    +                   taskManagerRef = 
TaskManager.startTaskManagerComponentsAndActor(
    +                                   flinkConfiguration,
    +                                   ResourceID.generate(),
    +                                   system,
    +                                   "localhost",
    +                                   Option.apply("taskmanager"),
    +                                   Option.apply((LeaderRetrievalService) 
myLeaderRetrievalService),
    +                                   true,
    +                                   TestingTaskManager.class);
    +
    +                   ActorGateway tmGateway = new 
AkkaActorGateway(taskManagerRef, leaderSessionID);
    +
    +                   Future<Object> tmAlive = 
tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft());
    +
    +                   Await.ready(tmAlive, deadline.timeLeft());
    +
    +                   JobVertex sourceJobVertex = new JobVertex("Source");
    +                   
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
    +                   sourceJobVertex.setParallelism(slots);
    +
    +                   JobGraph jobGraph = new JobGraph("TestingJob", 
sourceJobVertex);
    +
    +                   // Upload fake JAR file to first JobManager
    +                   File jarFile = temporaryFolder.newFile();
    +                   ZipOutputStream out = new ZipOutputStream(new 
FileOutputStream(jarFile));
    +                   out.close();
    +
    +                   jobGraph.addJar(new Path(jarFile.toURI()));
    +                   JobClient.uploadJarFiles(jobGraph, jobManager, 
deadline.timeLeft());
    +
    +                   Future<Object> isLeader = jobManager.ask(
    +                                   
TestingJobManagerMessages.getNotifyWhenLeader(),
    +                                   deadline.timeLeft());
    +
    +                   Future<Object> isConnectedToJobManager = tmGateway.ask(
    +                                   new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef),
    +                                   deadline.timeLeft());
    +
    +                   // tell jobManager that he's the leader
    +                   myLeaderElectionService.isLeader(leaderSessionID);
    +                   // tell taskManager who's the leader
    +                   
myLeaderRetrievalService.notifyListener(jobManager.path(), leaderSessionID);
    +
    +                   Await.ready(isLeader, deadline.timeLeft());
    +                   Await.ready(isConnectedToJobManager, 
deadline.timeLeft());
    +
    +                   // submit blocking job
    +                   Future<Object> jobSubmitted = jobManager.ask(
    +                                   new 
JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
    +                                   deadline.timeLeft());
    +
    +                   Await.ready(jobSubmitted, deadline.timeLeft());
    +
    +                   // Wait for running
    +                   Future<Object> jobRunning = jobManager.ask(
    +                                   new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), 
JobStatus.RUNNING),
    +                                   deadline.timeLeft());
    +
    +                   Await.ready(jobRunning, deadline.timeLeft());
    +
    +                   // terminate the job manager
    +                   jobManagerRef.tell(PoisonPill.getInstance(), 
ActorRef.noSender());
    +
    +                   Future<Boolean> terminatedFuture = 
Patterns.gracefulStop(jobManagerRef, deadline.timeLeft());
    +                   Boolean terminated = Await.result(terminatedFuture, 
deadline.timeLeft());
    +                   assertTrue("Failed to stop job manager", terminated);
    +
    +                   // job stays in the submitted job graph store
    +                   
assertTrue(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
    +
    +                   // start new job manager
    +                   myLeaderElectionService.reset();
    +
    +                   jobManagerRef = createJobManagerActor(
    +                                   "jobmanager-1",
    +                                   flinkConfiguration,
    +                                   myLeaderElectionService,
    +                                   mySubmittedJobGraphStore,
    +                                   500,
    +                                   timeout,
    +                                   jobRecoveryTimeout,
    +                                   archiveRef);
    +
    +                   jobManager = new AkkaActorGateway(jobManagerRef, 
newLeaderSessionID);
    +
    +                   Future<Object> isAlive = 
jobManager.ask(TestingMessages.getAlive(), deadline.timeLeft());
    +
    +                   isLeader = jobManager.ask(
    +                                   
TestingJobManagerMessages.getNotifyWhenLeader(),
    +                                   deadline.timeLeft());
    +
    +                   isConnectedToJobManager = tmGateway.ask(
    +                                   new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManagerRef),
    +                                   deadline.timeLeft());
    +
    +                   Await.ready(isAlive, deadline.timeLeft());
    +
    +                   // tell new jobManager that he's the leader
    +                   myLeaderElectionService.isLeader(newLeaderSessionID);
    +                   // tell taskManager who's the leader
    +                   
myLeaderRetrievalService.notifyListener(jobManager.path(), newLeaderSessionID);
    +
    +                   Await.ready(isLeader, deadline.timeLeft());
    +                   Await.ready(isConnectedToJobManager, 
deadline.timeLeft());
    +
    +                   jobRunning = jobManager.ask(
    +                                   new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), 
JobStatus.RUNNING),
    +                                   deadline.timeLeft());
    +
    +                   // wait that the job is recovered and reaches state 
RUNNING
    +                   Await.ready(jobRunning, deadline.timeLeft());
    +
    +                   Future<Object> jobFinished = jobManager.ask(
    +                                   new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
    +                                   deadline.timeLeft());
    +
    +                   BlockingInvokable.unblock();
    +
    +                   // wait til the job has finished
    +                   Await.ready(jobFinished, deadline.timeLeft());
    +
    +                   // check that the job has been removed from the 
submitted job graph store
    +                   
assertFalse(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
    +
    +                   // Check that the BLOB store files are removed
    +                   File rootPath = new File(haStoragePath);
    +
    +                   boolean cleanedUpFiles = false;
    +                   while (deadline.hasTimeLeft()) {
    +                           if (listFiles(rootPath).isEmpty()) {
    --- End diff --
    
    Yes, that is true. We will for example have empty folders 
`<root>/blob/cache` in this test. I've added a method to try to delete the 
parent directory when deleting a BLOB (same as what are currently doing in 
`AbstractFileStateHandle`). I will adjust this check to check that the 
directory is empty.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to