Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5931#discussion_r187227128
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -293,21 +293,16 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
        }
     
        @Override
    -   public boolean stopWorker(YarnWorkerNode workerNode) {
    -           if (workerNode != null) {
    -                   Container container = workerNode.getContainer();
    -                   log.info("Stopping container {}.", container.getId());
    -                   // release the container on the node manager
    -                   try {
    -                           
nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
    -                   } catch (Throwable t) {
    -                           log.warn("Error while calling YARN Node Manager 
to stop container", t);
    -                   }
    -                   
resourceManagerClient.releaseAssignedContainer(container.getId());
    -                   workerNodeMap.remove(workerNode.getResourceID());
    -           } else {
    -                   log.error("Can not find container for null 
workerNode.");
    +   public boolean stopWorker(final YarnWorkerNode workerNode) {
    +           final Container container = workerNode.getContainer();
    +           log.info("Stopping container {}.", container.getId());
    +           try {
    +                   nodeManagerClient.stopContainer(container.getId(), 
container.getNodeId());
    +           } catch (final Exception e) {
    +                   log.warn("Error while calling YARN Node Manager to stop 
container", e);
                }
    +           
resourceManagerClient.releaseAssignedContainer(container.getId());
    +           workerNodeMap.remove(workerNode.getResourceID());
                return true;
    --- End diff --
    
    Ah, I think I am a bit confused here, even though the `stopWorker` method 
only called from the main thread, but the `onContainersCompleted` is not. 
    
    So, imagine that if we insert a `Thread.MILLISECOND.sleep(1000)`(this 
represents the latency of code execution in reality)  between line 304 and 305, 
then `onContainersCompleted` would be called during the main thread was 
sleeping. Then, we require a new unnecessary container in 
`onContainersCompleted`, because the 
`workerNodeMap.remove(workerNode.getResourceID())` hasn't been called 
yet(during to the sleep). Am I misunderstand something? What do you think? 


---

Reply via email to