xintongsong commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11323#discussion_r410127268
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -237,57 +230,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + final TaskExecutorProcessSpec taskExecutorProcessSpec = + TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec); final String podName = String.format( TASK_MANAGER_POD_FORMAT, clusterId, currentMaxAttemptId, ++currentMaxPodId); + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); + final String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig); - final KubernetesTaskManagerParameters kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters( + return new KubernetesTaskManagerParameters( flinkConfig, podName, dynamicProperties, taskManagerParameters); - - final KubernetesPod taskManagerPod = - KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters); - - log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec); - kubeClient.createTaskManagerPod(taskManagerPod); } /** * Request new pod if pending pods cannot satisfy pending slot requests. */ - private void requestKubernetesPodIfRequired() { - final int requiredTaskManagers = getNumberRequiredTaskManagers(); + private void requestKubernetesPodIfRequired(WorkerResourceSpec workerResourceSpec) { + final int pendingWorkerNum = getNumPendingWorkersFor(workerResourceSpec); + int requiredTaskManagers = getRequiredResources().get(workerResourceSpec); - while (requiredTaskManagers > numPendingPodRequests) { - requestKubernetesPod(); + while (requiredTaskManagers-- > pendingWorkerNum) { + requestKubernetesPod(workerResourceSpec); } } private void removePodIfTerminated(KubernetesPod pod) { if (pod.isTerminated()) { kubeClient.stopPod(pod.getName()); Review comment: I'm actually thinking about similar approach. I think we should do two things for all the stopped / terminated workers. - Check whether `onAdded` is called by looking into `workerNodes`. If it's not called, we should decrease the pending count. - Try to restart the pod if required. As for indicating whether to restart the worker or not, I'm in favor of relying on `getRequiredResources` which always fetch the up-to-date number of required resources from `SlotManager`, rather than relying on `podWorkerResources`. But ideally these two should be consistent if we properly clean-up `podWorkerResources`. I'm trying to rebase this PR onto the latest master for conflict resolving. And there's a newly introduced `internalStopPod` which might be a good place for applying the above two actions for all stopped / terminated workers. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services