xintongsong commented on a change in pull request #11323: URL: https://github.com/apache/flink/pull/11323#discussion_r414444236
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -320,5 +333,16 @@ private void internalStopPod(String podName) { } } ); + + final KubernetesWorkerNode kubernetesWorkerNode = workerNodes.remove(resourceId); + final WorkerResourceSpec workerResourceSpec = podWorkerResources.remove(podName); + + // If the stopped pod is requested in the current attempt (workerResourceSpec is known) and is not yet added, + // we need to notify ActiveResourceManager to decrease the pending worker count. + if (workerResourceSpec != null && kubernetesWorkerNode == null) { Review comment: If a recovered pod is being used by a job, there could be two kinds of situations. - If the slots are already offered to JM, then JM will find out that the TM/slots are lost, and re-request the slots if needed. - If the slots are already assigned by SM but not yet offered to JM, RM will discover the TM failure and notify JM about the allocation failure. If needed, JM will re-request the slot. This behavior is covered by `SlotManagerImplTest#testNotifyFailedAllocationWhenTaskManagerTerminated`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org