Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5881#discussion_r183243055 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -356,17 +395,25 @@ public void onContainersAllocated(List<Container> containers) { workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container)); + ResourceID resourceID = new ResourceID(containerIdStr); + try { // Context information used to start a TaskExecutor Java process ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext( container.getResource(), containerIdStr, container.getNodeId().getHost()); + // remember the pending container that need to be registered with ResourceManager. + pendingContainersExpectedToRegister.put(resourceID, container); + nodeManagerClient.startContainer(container, taskExecutorLaunchContext); } catch (Throwable t) { log.error("Could not start TaskManager in container {}.", container.getId(), t); + // remove the failed container + pendingContainersExpectedToRegister.remove(resourceID); --- End diff -- I think we also need to call `workerNodeMap.remove(resourceID);`
---