xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w… URL: https://github.com/apache/flink/pull/11248#discussion_r386798964
########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ########## @@ -464,7 +472,15 @@ public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> @Override public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) { - // We are not interested in getting container status + // We fetch the status of the container from the previous attempts. + if (containerStatus.getState() == ContainerState.NEW) { Review comment: @TaoYang526 Are you suggesting that calling `NMClientAsync.getContainerStatusAsync` on a `NEW` container might result in `onGetContainerStatusError` on some Hadoop versions while `onContainerStatusReceived` on other versions? If that is the case, I think we can have a common method handling releasing the container and removing it from the worker node map, which should be called in both `onContainerStatusReceived` with `if (containerStatus.getState() == ContaienrState.NEW)` and `onGetContainerStatusError`. One more question, how do we now whether a container is `NEW` or there's some other problems in `onGetContainerStatusError`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services