[ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527034#comment-16527034
 ] 

ASF GitHub Bot commented on FLINK-9567:
---------------------------------------

Github user Clarkkkkk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6192#discussion_r199035271
  
    --- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
    @@ -421,4 +425,139 @@ public void testDeleteApplicationFiles() throws 
Exception {
                        assertFalse("YARN application directory was not 
removed", Files.exists(applicationDir.toPath()));
                }};
        }
    +
    +   @Test
    +   public void testOnContainerCompleted() throws Exception {
    +           new Context() {{
    +                   startResourceManager();
    +                   CompletableFuture<?> registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
    +                           rmServices.slotManager.registerSlotRequest(
    +                                   new SlotRequest(new JobID(), new 
AllocationID(), resourceProfile1, taskHost));
    +                           return null;
    +                   });
    +                   // wait for the registerSlotRequest completion
    +                   registerSlotRequestFuture.get();
    +                   // Callback from YARN when container is allocated.
    +                   Container testingContainer = mock(Container.class);
    +                   when(testingContainer.getId()).thenReturn(
    +                           ContainerId.newInstance(
    +                                   ApplicationAttemptId.newInstance(
    +                                           
ApplicationId.newInstance(System.currentTimeMillis(), 1),
    +                                           1),
    +                                   1));
    +                   
when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 
1234));
    +                   
when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
    +                   
when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
    +
    +                   ImmutableList<Container> testingContainerList = 
ImmutableList.of(testingContainer);
    +                   
resourceManager.onContainersAllocated(testingContainerList);
    +                   
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
    +                   
verify(mockNMClient).startContainer(eq(testingContainer), 
any(ContainerLaunchContext.class));
    +
    +                   // Remote task executor registers with 
YarnResourceManager.
    +                   TaskExecutorGateway mockTaskExecutorGateway = 
mock(TaskExecutorGateway.class);
    --- End diff --
    
    Sure, I will modify it later.


> Flink does not release resource in Yarn Cluster mode
> ----------------------------------------------------
>
>                 Key: FLINK-9567
>                 URL: https://issues.apache.org/jira/browse/FLINK-9567
>             Project: Flink
>          Issue Type: Bug
>          Components: Cluster Management, YARN
>    Affects Versions: 1.5.0
>            Reporter: Shimin Yang
>            Assignee: Shimin Yang
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.6.0
>
>         Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_000024. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required. Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> In the full log, the job ended with 7 containers while only 3 are running 
> TaskManagers.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to