xintongsong commented on a change in pull request #11323:
URL: https://github.com/apache/flink/pull/11323#discussion_r413842746



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
##########
@@ -320,5 +333,16 @@ private void internalStopPod(String podName) {
                                        }
                                }
                        );
+
+               final KubernetesWorkerNode kubernetesWorkerNode = 
workerNodes.remove(resourceId);
+               final WorkerResourceSpec workerResourceSpec = 
podWorkerResources.remove(podName);
+
+               // If the stopped pod is requested in the current attempt 
(workerResourceSpec is known) and is not yet added,
+               // we need to notify ActiveResourceManager to decrease the 
pending worker count.
+               if (workerResourceSpec != null && kubernetesWorkerNode == null) 
{

Review comment:
       I think what we need to restart is the workers that are requested by the 
slot manager, but not yet registered to the slot manager. Because in such 
cases, the slot manager does not know that these pods are failed and will keep 
expecting them to register.
   
   For a worker that is already registered, in case of failure:
   - If there are slots on the failed worker that are already in use, JM will 
realize that the slots are lost, and send out new slot requests, which will 
trigger starting of new workers if needed.
   - If none of the slots on the failed worker is in use, then we don't loose 
anything. New workers will be started when new slot requests are received and 
cannot be satisfied by registered free slots.
   
   For a pod recovered from the previous attempt, if it is not yet registered, 
slot manager will not be aware of it at all. Thus, slot manager should already 
requested enough workers for satisfying the slot requests even without this 
failed pod.
   
   Moreover, we have not increased the `pendingWorkerCounter` for the recovered 
pod and we cannot do that because we don't know its `workerResourceSpec` until 
it is registered. So we should also not decrease the `pendingWorkerCounter` 
when the recovered pod is added/failed, otherwise the `pendingWorkerCounter` 
will go out of sync. I guess that's exactly the reason why we also decrease the 
pending count for recovered pod before these changes, because previously we 
assume all the pods are identical. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to