tillrohrmann commented on a change in pull request #11323: URL: https://github.com/apache/flink/pull/11323#discussion_r413779728
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -251,29 +250,32 @@ private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { final KubernetesTaskManagerParameters parameters = createKubernetesTaskManagerParameters(workerResourceSpec); + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); + + log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + final KubernetesPod taskManagerPod = KubernetesTaskManagerFactory.buildTaskManagerKubernetesPod(parameters); kubeClient.createTaskManagerPod(taskManagerPod) .whenComplete( - (ignore, throwable) -> { + (ignore, throwable) -> runAsync(() -> { if (throwable != null) { final Time retryInterval = configuration.getPodCreationRetryInterval(); - log.error("Could not start TaskManager in pod {}, retry in {}. ", + log.warn("Could not start TaskManager in pod {}, retry in {}. ", taskManagerPod.getName(), retryInterval, throwable); + podWorkerResources.remove(parameters.getPodName()); + notifyNewWorkerAllocationFailed(workerResourceSpec); scheduleRunAsync( - () -> requestKubernetesPodIfRequired(workerResourceSpec), + this::requestKubernetesPodIfRequired, retryInterval); } else { - podWorkerResources.put(parameters.getPodName(), workerResourceSpec); - final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); - - log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - parameters.getTaskManagerMemoryMB(), - parameters.getTaskManagerCPU(), - pendingWorkerNum); log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); } - } + }) Review comment: ```suggestion }, getMainThreadExecutor()) ``` ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -251,29 +250,32 @@ private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { final KubernetesTaskManagerParameters parameters = createKubernetesTaskManagerParameters(workerResourceSpec); + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); + + log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + final KubernetesPod taskManagerPod = KubernetesTaskManagerFactory.buildTaskManagerKubernetesPod(parameters); kubeClient.createTaskManagerPod(taskManagerPod) .whenComplete( Review comment: ```suggestion .whenCompleteAsync( ``` ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -251,29 +250,32 @@ private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { final KubernetesTaskManagerParameters parameters = createKubernetesTaskManagerParameters(workerResourceSpec); + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); + + log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + final KubernetesPod taskManagerPod = KubernetesTaskManagerFactory.buildTaskManagerKubernetesPod(parameters); kubeClient.createTaskManagerPod(taskManagerPod) .whenComplete( - (ignore, throwable) -> { + (ignore, throwable) -> runAsync(() -> { Review comment: ```suggestion (ignore, throwable) -> { ``` ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -251,29 +250,32 @@ private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { final KubernetesTaskManagerParameters parameters = createKubernetesTaskManagerParameters(workerResourceSpec); + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); + + log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + final KubernetesPod taskManagerPod = KubernetesTaskManagerFactory.buildTaskManagerKubernetesPod(parameters); kubeClient.createTaskManagerPod(taskManagerPod) .whenComplete( - (ignore, throwable) -> { + (ignore, throwable) -> runAsync(() -> { if (throwable != null) { final Time retryInterval = configuration.getPodCreationRetryInterval(); - log.error("Could not start TaskManager in pod {}, retry in {}. ", + log.warn("Could not start TaskManager in pod {}, retry in {}. ", taskManagerPod.getName(), retryInterval, throwable); + podWorkerResources.remove(parameters.getPodName()); + notifyNewWorkerAllocationFailed(workerResourceSpec); scheduleRunAsync( - () -> requestKubernetesPodIfRequired(workerResourceSpec), + this::requestKubernetesPodIfRequired, retryInterval); } else { Review comment: I think in order to properly guard this change we need a test where we request a pod, the kube client takes some time to complete the returned future and in the mean time we call `onAdded`. Before, we did not test this and did not catch the problem. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org