xintongsong commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11323#discussion_r409447429
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -189,15 +180,17 @@ public boolean stopWorker(final KubernetesWorkerNode worker) { public void onAdded(List<KubernetesPod> pods) { runAsync(() -> { for (KubernetesPod pod : pods) { - if (numPendingPodRequests > 0) { - numPendingPodRequests--; + WorkerResourceSpec workerResourceSpec = podWorkerResources.get(pod.getName()); + final int pendingNum = getNumPendingWorkersFor(workerResourceSpec); + if (pendingNum > 0) { Review comment: I double checked with @wangyang0918 about on what conditions will `onAdded` be called. There are two possible reasons that `onAdded` is called while there's no pending worker request. - A duplicated `onAdded` message is received. In such cases, `workerNodes` should already contains the pod name. We can check that and simply ignore the duplicated message. - `onAdded` message for a pod requested from the previous attempt. It is guaranteed that such pods is already added to `workerNodes` in `recoverWorkerNodesFromPreviousAttempts` before the `onAdded` is called, so we can apply the same check and ignore the duplicated pods. I'll update this method, adding the duplication check and `checkState` for `pendingNum > 0` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services