tillrohrmann commented on a change in pull request #11323:
URL: https://github.com/apache/flink/pull/11323#discussion_r411374113



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
##########
@@ -239,70 +247,73 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
                        ++currentMaxAttemptId);
        }
 
-       private void requestKubernetesPod() {
-               numPendingPodRequests++;
+       private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+               final KubernetesTaskManagerParameters parameters =
+                       
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+               final KubernetesPod taskManagerPod =
+                       
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+               kubeClient.createTaskManagerPod(taskManagerPod)
+                       .whenComplete(
+                               (ignore, throwable) -> {
+                                       if (throwable != null) {
+                                               final Time retryInterval = 
configuration.getPodCreationRetryInterval();
+                                               log.error("Could not start 
TaskManager in pod {}, retry in {}. ",
+                                                       
taskManagerPod.getName(), retryInterval, throwable);
+                                               scheduleRunAsync(
+                                                       () -> 
requestKubernetesPodIfRequired(workerResourceSpec),
+                                                       retryInterval);
+                                       } else {
+                                               
podWorkerResources.put(parameters.getPodName(), workerResourceSpec);
+                                               final int pendingWorkerNum = 
notifyNewWorkerRequested(workerResourceSpec);

Review comment:
       State changing operations should be run by the main thread.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
##########
@@ -239,70 +247,73 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
                        ++currentMaxAttemptId);
        }
 
-       private void requestKubernetesPod() {
-               numPendingPodRequests++;
+       private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+               final KubernetesTaskManagerParameters parameters =
+                       
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+               final KubernetesPod taskManagerPod =
+                       
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+               kubeClient.createTaskManagerPod(taskManagerPod)
+                       .whenComplete(
+                               (ignore, throwable) -> {
+                                       if (throwable != null) {
+                                               final Time retryInterval = 
configuration.getPodCreationRetryInterval();
+                                               log.error("Could not start 
TaskManager in pod {}, retry in {}. ",

Review comment:
       ```suggestion
                                                log.warn("Could not start 
TaskManager in pod {}, retry in {}. ",
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
##########
@@ -239,70 +247,73 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
                        ++currentMaxAttemptId);
        }
 
-       private void requestKubernetesPod() {
-               numPendingPodRequests++;
+       private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+               final KubernetesTaskManagerParameters parameters =
+                       
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+               final KubernetesPod taskManagerPod =
+                       
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+               kubeClient.createTaskManagerPod(taskManagerPod)
+                       .whenComplete(
+                               (ignore, throwable) -> {
+                                       if (throwable != null) {
+                                               final Time retryInterval = 
configuration.getPodCreationRetryInterval();
+                                               log.error("Could not start 
TaskManager in pod {}, retry in {}. ",
+                                                       
taskManagerPod.getName(), retryInterval, throwable);
+                                               scheduleRunAsync(
+                                                       () -> 
requestKubernetesPodIfRequired(workerResourceSpec),
+                                                       retryInterval);
+                                       } else {
+                                               
podWorkerResources.put(parameters.getPodName(), workerResourceSpec);
+                                               final int pendingWorkerNum = 
notifyNewWorkerRequested(workerResourceSpec);
+
+                                               log.info("Requesting new 
TaskManager pod with <{},{}>. Number pending requests {}.",
+                                                       
parameters.getTaskManagerMemoryMB(),
+                                                       
parameters.getTaskManagerCPU(),
+                                                       pendingWorkerNum);
+                                               log.info("TaskManager {} will 
be started with {}.", parameters.getPodName(), workerResourceSpec);
+                                       }
+                               }
+                       );
+       }
 
-               log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-                       defaultMemoryMB,
-                       defaultCpus,
-                       numPendingPodRequests);
+       private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+               final TaskExecutorProcessSpec taskExecutorProcessSpec =
+                       
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
 
                final String podName = String.format(
                        TASK_MANAGER_POD_FORMAT,
                        clusterId,
                        currentMaxAttemptId,
                        ++currentMaxPodId);
 
+               final ContaineredTaskManagerParameters taskManagerParameters =
+                       ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
+
                final String dynamicProperties =
                        
BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig);
 
-               final KubernetesTaskManagerParameters 
kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(
+               return new KubernetesTaskManagerParameters(
                        flinkConfig,
                        podName,
                        dynamicProperties,
                        taskManagerParameters);
-
-               final KubernetesPod taskManagerPod =
-                       
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);
-
-               log.info("TaskManager {} will be started with {}.", podName, 
taskExecutorProcessSpec);
-               kubeClient.createTaskManagerPod(taskManagerPod)
-                       .whenComplete(
-                               (ignore, throwable) -> {
-                                       if (throwable != null) {
-                                               log.error("Could not start 
TaskManager in pod {}.", podName, throwable);
-                                               scheduleRunAsync(
-                                                       
this::decreasePendingAndRequestKubernetesPodIfRequired,
-                                                       
configuration.getPodCreationRetryInterval());
-                                       }
-                               }
-                       );
-       }
-
-       private void decreasePendingAndRequestKubernetesPodIfRequired() {
-               validateRunsInMainThread();
-               numPendingPodRequests--;
-               requestKubernetesPodIfRequired();
        }
 
        /**
         * Request new pod if pending pods cannot satisfy pending slot requests.
         */
-       private void requestKubernetesPodIfRequired() {
-               final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
+       private void requestKubernetesPodIfRequired(WorkerResourceSpec 
workerResourceSpec) {
+               final int requiredTaskManagers = 
getRequiredResources().get(workerResourceSpec);
 
-               while (requiredTaskManagers > numPendingPodRequests) {
-                       requestKubernetesPod();
+               while (requiredTaskManagers > 
getNumPendingWorkersFor(workerResourceSpec)) {
+                       requestKubernetesPod(workerResourceSpec);
                }
        }
 
        private void removePodIfTerminated(KubernetesPod pod) {

Review comment:
       Unrelated: I think a better name could be 
`removePodAndTryRestartIfFailed` or so.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
##########
@@ -239,70 +247,73 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
                        ++currentMaxAttemptId);
        }
 
-       private void requestKubernetesPod() {
-               numPendingPodRequests++;
+       private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+               final KubernetesTaskManagerParameters parameters =
+                       
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+               final KubernetesPod taskManagerPod =
+                       
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+               kubeClient.createTaskManagerPod(taskManagerPod)
+                       .whenComplete(
+                               (ignore, throwable) -> {
+                                       if (throwable != null) {
+                                               final Time retryInterval = 
configuration.getPodCreationRetryInterval();
+                                               log.error("Could not start 
TaskManager in pod {}, retry in {}. ",
+                                                       
taskManagerPod.getName(), retryInterval, throwable);
+                                               scheduleRunAsync(
+                                                       () -> 
requestKubernetesPodIfRequired(workerResourceSpec),
+                                                       retryInterval);
+                                       } else {
+                                               
podWorkerResources.put(parameters.getPodName(), workerResourceSpec);
+                                               final int pendingWorkerNum = 
notifyNewWorkerRequested(workerResourceSpec);

Review comment:
       Are we sure that this callback will be executed before `onAdded` will be 
called? If not, then we will fail at one of the check state statements.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
##########
@@ -239,70 +247,73 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
                        ++currentMaxAttemptId);
        }
 
-       private void requestKubernetesPod() {
-               numPendingPodRequests++;
+       private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+               final KubernetesTaskManagerParameters parameters =
+                       
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+               final KubernetesPod taskManagerPod =
+                       
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+               kubeClient.createTaskManagerPod(taskManagerPod)
+                       .whenComplete(
+                               (ignore, throwable) -> {
+                                       if (throwable != null) {
+                                               final Time retryInterval = 
configuration.getPodCreationRetryInterval();
+                                               log.error("Could not start 
TaskManager in pod {}, retry in {}. ",
+                                                       
taskManagerPod.getName(), retryInterval, throwable);
+                                               scheduleRunAsync(
+                                                       () -> 
requestKubernetesPodIfRequired(workerResourceSpec),
+                                                       retryInterval);
+                                       } else {
+                                               
podWorkerResources.put(parameters.getPodName(), workerResourceSpec);
+                                               final int pendingWorkerNum = 
notifyNewWorkerRequested(workerResourceSpec);
+
+                                               log.info("Requesting new 
TaskManager pod with <{},{}>. Number pending requests {}.",
+                                                       
parameters.getTaskManagerMemoryMB(),
+                                                       
parameters.getTaskManagerCPU(),
+                                                       pendingWorkerNum);
+                                               log.info("TaskManager {} will 
be started with {}.", parameters.getPodName(), workerResourceSpec);
+                                       }
+                               }
+                       );
+       }
 
-               log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-                       defaultMemoryMB,
-                       defaultCpus,
-                       numPendingPodRequests);
+       private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+               final TaskExecutorProcessSpec taskExecutorProcessSpec =
+                       
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
 
                final String podName = String.format(
                        TASK_MANAGER_POD_FORMAT,
                        clusterId,
                        currentMaxAttemptId,
                        ++currentMaxPodId);
 
+               final ContaineredTaskManagerParameters taskManagerParameters =
+                       ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
+
                final String dynamicProperties =
                        
BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig);
 
-               final KubernetesTaskManagerParameters 
kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(
+               return new KubernetesTaskManagerParameters(
                        flinkConfig,
                        podName,
                        dynamicProperties,
                        taskManagerParameters);
-
-               final KubernetesPod taskManagerPod =
-                       
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);
-
-               log.info("TaskManager {} will be started with {}.", podName, 
taskExecutorProcessSpec);
-               kubeClient.createTaskManagerPod(taskManagerPod)
-                       .whenComplete(
-                               (ignore, throwable) -> {
-                                       if (throwable != null) {
-                                               log.error("Could not start 
TaskManager in pod {}.", podName, throwable);
-                                               scheduleRunAsync(
-                                                       
this::decreasePendingAndRequestKubernetesPodIfRequired,
-                                                       
configuration.getPodCreationRetryInterval());
-                                       }
-                               }
-                       );
-       }
-
-       private void decreasePendingAndRequestKubernetesPodIfRequired() {
-               validateRunsInMainThread();
-               numPendingPodRequests--;
-               requestKubernetesPodIfRequired();
        }
 
        /**
         * Request new pod if pending pods cannot satisfy pending slot requests.
         */
-       private void requestKubernetesPodIfRequired() {
-               final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
+       private void requestKubernetesPodIfRequired(WorkerResourceSpec 
workerResourceSpec) {
+               final int requiredTaskManagers = 
getRequiredResources().get(workerResourceSpec);
 
-               while (requiredTaskManagers > numPendingPodRequests) {
-                       requestKubernetesPod();
+               while (requiredTaskManagers > 
getNumPendingWorkersFor(workerResourceSpec)) {

Review comment:
       I think this will be a busy loop until we call 
`notifyNewWorkerRequested` in the asynchronous call back.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
##########
@@ -368,6 +377,133 @@ public void testCreateTaskManagerPodFailedAndRetry() 
throws Exception {
                }};
        }
 
+       @Test
+       public void testStartAndRecoverVariousResourceSpec() throws Exception {
+               new Context() {{
+                       final WorkerResourceSpec workerResourceSpec1 = new 
WorkerResourceSpec.Builder().setTaskHeapMemoryMB(100).build();
+                       final WorkerResourceSpec workerResourceSpec2 = new 
WorkerResourceSpec.Builder().setTaskHeapMemoryMB(99).build();
+                       slotManager = new TestingSlotManagerBuilder()
+                               .setGetRequiredResourcesSupplier(() -> 
Collections.singletonMap(workerResourceSpec1, 1))
+                               .createSlotManager();
+
+                       runTest(() -> {
+                               // Start two workers with different resources
+                               
resourceManager.startNewWorker(workerResourceSpec1);
+                               
resourceManager.startNewWorker(workerResourceSpec2);
+
+                               // Verify two pods with both worker resources 
are started
+                               final PodList initialPodList = 
kubeClient.pods().list();
+                               assertEquals(2, 
initialPodList.getItems().size());
+                               final Pod initialPod1 = 
getPodContainsStrInArgs(initialPodList, 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20));
+                               final Pod initialPod2 = 
getPodContainsStrInArgs(initialPodList, 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20));
+
+                               // Notify resource manager about pods added.
+                               final KubernetesPod initialKubernetesPod1 = new 
KubernetesPod(initialPod1);
+                               final KubernetesPod initialKubernetesPod2 = new 
KubernetesPod(initialPod2);
+                               
resourceManager.onAdded(ImmutableList.of(initialKubernetesPod1, 
initialKubernetesPod2));
+
+                               // Terminate pod1.
+                               terminatePod(initialPod1);
+                               
resourceManager.onModified(Collections.singletonList(initialKubernetesPod1));
+
+                               // Verify original pod1 is removed, a new pod1 
with the same worker resource is requested.
+                               // Meantime, pod2 is not changes.
+                               final PodList activePodList = 
kubeClient.pods().list();
+                               assertEquals(2, 
activePodList.getItems().size());
+                               
assertFalse(activePodList.getItems().contains(initialPod1));
+                               
assertTrue(activePodList.getItems().contains(initialPod2));
+                               getPodContainsStrInArgs(initialPodList, 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20));
+                       });
+               }};
+       }
+
+       @Test
+       public void testPreviousAttemptPodAdded() throws Exception {
+               new Context() {{
+                       runTest(() -> {
+                               // Prepare previous attempt pod
+                               final String previousAttemptPodName = 
CLUSTER_ID + "-taskmanager-1-1";
+                               final Pod previousAttemptPod = new PodBuilder()
+                                       .editOrNewMetadata()
+                                       .withName(previousAttemptPodName)
+                                       
.withLabels(KubernetesUtils.getTaskManagerLabels(CLUSTER_ID))
+                                       .endMetadata()
+                                       .editOrNewSpec()
+                                       .endSpec()
+                                       .build();
+                               flinkKubeClient.createTaskManagerPod(new 
KubernetesPod(previousAttemptPod));
+                               assertEquals(1, 
kubeClient.pods().list().getItems().size());
+
+                               // Call initialize method to recover worker 
nodes from previous attempt.
+                               resourceManager.initialize();
+
+                               registerSlotRequest();
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1));
+
+                               // adding previous attempt pod should not 
decrease pending worker count
+                               
resourceManager.onAdded(Collections.singletonList(new 
KubernetesPod(previousAttemptPod)));
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1));
+
+                               final Optional<Pod> currentAttemptPodOpt = 
kubeClient.pods().list().getItems().stream()
+                                       .filter(pod -> 
pod.getMetadata().getName().contains("-taskmanager-2-1"))
+                                       .findAny();
+                               assertTrue(currentAttemptPodOpt.isPresent());
+                               final Pod currentAttemptPod = 
currentAttemptPodOpt.get();
+
+                               // adding current attempt pod should decrease 
the pending worker count
+                               
resourceManager.onAdded(Collections.singletonList(new 
KubernetesPod(currentAttemptPod)));
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(0));
+                       });
+               }};
+       }
+
+       @Test
+       public void testDuplicatedPodAdded() throws Exception {
+               new Context() {{
+                       runTest(() -> {
+                               registerSlotRequest();
+                               registerSlotRequest();
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(2));
+
+                               
assertThat(kubeClient.pods().list().getItems().size(), is(2));
+                               final Pod pod1 = 
kubeClient.pods().list().getItems().get(0);
+                               final Pod pod2 = 
kubeClient.pods().list().getItems().get(1);
+
+                               
resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod1)));
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1));
+
+                               // Adding duplicated pod should not increase 
pending worker count
+                               
resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod1)));
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1));
+
+                               
resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod2)));
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(0));
+                       });
+               }};
+       }
+
+       @Test
+       public void testPodTerminatedBeforeAdded() throws Exception {
+               new Context() {{
+                       runTest(() -> {
+                               registerSlotRequest();
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1));
+
+                               final Pod pod = 
kubeClient.pods().list().getItems().get(0);
+                               terminatePod(pod);
+
+                               
resourceManager.onModified(Collections.singletonList(new KubernetesPod(pod)));
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1));
+
+                               
resourceManager.onDeleted(Collections.singletonList(new KubernetesPod(pod)));
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1));
+
+                               
resourceManager.onError(Collections.singletonList(new KubernetesPod(pod)));
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1));

Review comment:
       I think we should also assert that a new pod is being requested.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
##########
@@ -320,5 +333,16 @@ private void internalStopPod(String podName) {
                                        }
                                }
                        );
+
+               final KubernetesWorkerNode kubernetesWorkerNode = 
workerNodes.remove(resourceId);
+               final WorkerResourceSpec workerResourceSpec = 
podWorkerResources.remove(podName);
+
+               // If the stopped pod is requested in the current attempt 
(workerResourceSpec is known) and is not yet added,
+               // we need to notify ActiveResourceManager to decrease the 
pending worker count.
+               if (workerResourceSpec != null && kubernetesWorkerNode == null) 
{

Review comment:
       I think this logic won't play nicely together with recovered pods. The 
problem is that `workerResourceSpec` is null for a recovered pod.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
##########
@@ -320,5 +333,16 @@ private void internalStopPod(String podName) {
                                        }

Review comment:
       Unrelated: Not being able to stop a container should not be an error log 
statement because Flink can still work afterwards.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
##########
@@ -320,5 +333,16 @@ private void internalStopPod(String podName) {
                                        }
                                }
                        );
+
+               final KubernetesWorkerNode kubernetesWorkerNode = 
workerNodes.remove(resourceId);
+               final WorkerResourceSpec workerResourceSpec = 
podWorkerResources.remove(podName);

Review comment:
       I think this won't work with updating `podWorkerResources` in the 
asynchronous callback in `requestKubernetesPod`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
##########
@@ -239,70 +247,73 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
                        ++currentMaxAttemptId);
        }
 
-       private void requestKubernetesPod() {
-               numPendingPodRequests++;
+       private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+               final KubernetesTaskManagerParameters parameters =
+                       
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+               final KubernetesPod taskManagerPod =
+                       
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+               kubeClient.createTaskManagerPod(taskManagerPod)
+                       .whenComplete(
+                               (ignore, throwable) -> {
+                                       if (throwable != null) {
+                                               final Time retryInterval = 
configuration.getPodCreationRetryInterval();
+                                               log.error("Could not start 
TaskManager in pod {}, retry in {}. ",
+                                                       
taskManagerPod.getName(), retryInterval, throwable);
+                                               scheduleRunAsync(
+                                                       () -> 
requestKubernetesPodIfRequired(workerResourceSpec),
+                                                       retryInterval);
+                                       } else {
+                                               
podWorkerResources.put(parameters.getPodName(), workerResourceSpec);
+                                               final int pendingWorkerNum = 
notifyNewWorkerRequested(workerResourceSpec);
+
+                                               log.info("Requesting new 
TaskManager pod with <{},{}>. Number pending requests {}.",
+                                                       
parameters.getTaskManagerMemoryMB(),
+                                                       
parameters.getTaskManagerCPU(),
+                                                       pendingWorkerNum);
+                                               log.info("TaskManager {} will 
be started with {}.", parameters.getPodName(), workerResourceSpec);
+                                       }
+                               }
+                       );
+       }
 
-               log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-                       defaultMemoryMB,
-                       defaultCpus,
-                       numPendingPodRequests);
+       private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+               final TaskExecutorProcessSpec taskExecutorProcessSpec =
+                       
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
 
                final String podName = String.format(
                        TASK_MANAGER_POD_FORMAT,
                        clusterId,
                        currentMaxAttemptId,
                        ++currentMaxPodId);
 
+               final ContaineredTaskManagerParameters taskManagerParameters =
+                       ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
+
                final String dynamicProperties =
                        
BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig);
 
-               final KubernetesTaskManagerParameters 
kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(
+               return new KubernetesTaskManagerParameters(
                        flinkConfig,
                        podName,
                        dynamicProperties,
                        taskManagerParameters);
-
-               final KubernetesPod taskManagerPod =
-                       
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);
-
-               log.info("TaskManager {} will be started with {}.", podName, 
taskExecutorProcessSpec);
-               kubeClient.createTaskManagerPod(taskManagerPod)
-                       .whenComplete(
-                               (ignore, throwable) -> {
-                                       if (throwable != null) {
-                                               log.error("Could not start 
TaskManager in pod {}.", podName, throwable);
-                                               scheduleRunAsync(
-                                                       
this::decreasePendingAndRequestKubernetesPodIfRequired,
-                                                       
configuration.getPodCreationRetryInterval());
-                                       }
-                               }
-                       );
-       }
-
-       private void decreasePendingAndRequestKubernetesPodIfRequired() {
-               validateRunsInMainThread();
-               numPendingPodRequests--;
-               requestKubernetesPodIfRequired();
        }
 
        /**
         * Request new pod if pending pods cannot satisfy pending slot requests.
         */
-       private void requestKubernetesPodIfRequired() {
-               final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
+       private void requestKubernetesPodIfRequired(WorkerResourceSpec 
workerResourceSpec) {
+               final int requiredTaskManagers = 
getRequiredResources().get(workerResourceSpec);

Review comment:
       Instead of asking for a specific resource here, we could ask the 
`SlotManager` for all of its required resources and request new pods if needed. 
This would also solve the problem with recovered pods which don't have a 
`WorkerResourceSpec`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
##########
@@ -320,5 +333,16 @@ private void internalStopPod(String podName) {
                                        }
                                }
                        );
+
+               final KubernetesWorkerNode kubernetesWorkerNode = 
workerNodes.remove(resourceId);
+               final WorkerResourceSpec workerResourceSpec = 
podWorkerResources.remove(podName);
+
+               // If the stopped pod is requested in the current attempt 
(workerResourceSpec is known) and is not yet added,
+               // we need to notify ActiveResourceManager to decrease the 
pending worker count.
+               if (workerResourceSpec != null && kubernetesWorkerNode == null) 
{

Review comment:
       Could we give this condition a name? Something like 
`!hasPodBeenAdded(podName)` or `!hasPodBeenStarted(podName)`?

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
##########
@@ -368,6 +377,133 @@ public void testCreateTaskManagerPodFailedAndRetry() 
throws Exception {
                }};
        }
 
+       @Test
+       public void testStartAndRecoverVariousResourceSpec() throws Exception {
+               new Context() {{
+                       final WorkerResourceSpec workerResourceSpec1 = new 
WorkerResourceSpec.Builder().setTaskHeapMemoryMB(100).build();
+                       final WorkerResourceSpec workerResourceSpec2 = new 
WorkerResourceSpec.Builder().setTaskHeapMemoryMB(99).build();
+                       slotManager = new TestingSlotManagerBuilder()
+                               .setGetRequiredResourcesSupplier(() -> 
Collections.singletonMap(workerResourceSpec1, 1))
+                               .createSlotManager();
+
+                       runTest(() -> {
+                               // Start two workers with different resources
+                               
resourceManager.startNewWorker(workerResourceSpec1);
+                               
resourceManager.startNewWorker(workerResourceSpec2);
+
+                               // Verify two pods with both worker resources 
are started
+                               final PodList initialPodList = 
kubeClient.pods().list();
+                               assertEquals(2, 
initialPodList.getItems().size());
+                               final Pod initialPod1 = 
getPodContainsStrInArgs(initialPodList, 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20));
+                               final Pod initialPod2 = 
getPodContainsStrInArgs(initialPodList, 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20));
+
+                               // Notify resource manager about pods added.
+                               final KubernetesPod initialKubernetesPod1 = new 
KubernetesPod(initialPod1);
+                               final KubernetesPod initialKubernetesPod2 = new 
KubernetesPod(initialPod2);
+                               
resourceManager.onAdded(ImmutableList.of(initialKubernetesPod1, 
initialKubernetesPod2));
+
+                               // Terminate pod1.
+                               terminatePod(initialPod1);
+                               
resourceManager.onModified(Collections.singletonList(initialKubernetesPod1));
+
+                               // Verify original pod1 is removed, a new pod1 
with the same worker resource is requested.
+                               // Meantime, pod2 is not changes.
+                               final PodList activePodList = 
kubeClient.pods().list();
+                               assertEquals(2, 
activePodList.getItems().size());
+                               
assertFalse(activePodList.getItems().contains(initialPod1));
+                               
assertTrue(activePodList.getItems().contains(initialPod2));
+                               getPodContainsStrInArgs(initialPodList, 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20));
+                       });
+               }};
+       }
+
+       @Test
+       public void testPreviousAttemptPodAdded() throws Exception {
+               new Context() {{
+                       runTest(() -> {
+                               // Prepare previous attempt pod
+                               final String previousAttemptPodName = 
CLUSTER_ID + "-taskmanager-1-1";
+                               final Pod previousAttemptPod = new PodBuilder()
+                                       .editOrNewMetadata()
+                                       .withName(previousAttemptPodName)
+                                       
.withLabels(KubernetesUtils.getTaskManagerLabels(CLUSTER_ID))
+                                       .endMetadata()
+                                       .editOrNewSpec()
+                                       .endSpec()
+                                       .build();
+                               flinkKubeClient.createTaskManagerPod(new 
KubernetesPod(previousAttemptPod));
+                               assertEquals(1, 
kubeClient.pods().list().getItems().size());
+
+                               // Call initialize method to recover worker 
nodes from previous attempt.
+                               resourceManager.initialize();
+
+                               registerSlotRequest();
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1));
+
+                               // adding previous attempt pod should not 
decrease pending worker count
+                               
resourceManager.onAdded(Collections.singletonList(new 
KubernetesPod(previousAttemptPod)));
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1));
+
+                               final Optional<Pod> currentAttemptPodOpt = 
kubeClient.pods().list().getItems().stream()
+                                       .filter(pod -> 
pod.getMetadata().getName().contains("-taskmanager-2-1"))
+                                       .findAny();
+                               assertTrue(currentAttemptPodOpt.isPresent());
+                               final Pod currentAttemptPod = 
currentAttemptPodOpt.get();
+
+                               // adding current attempt pod should decrease 
the pending worker count
+                               
resourceManager.onAdded(Collections.singletonList(new 
KubernetesPod(currentAttemptPod)));
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(0));
+                       });
+               }};
+       }
+
+       @Test
+       public void testDuplicatedPodAdded() throws Exception {
+               new Context() {{
+                       runTest(() -> {
+                               registerSlotRequest();
+                               registerSlotRequest();
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(2));
+
+                               
assertThat(kubeClient.pods().list().getItems().size(), is(2));
+                               final Pod pod1 = 
kubeClient.pods().list().getItems().get(0);
+                               final Pod pod2 = 
kubeClient.pods().list().getItems().get(1);
+
+                               
resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod1)));
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1));
+
+                               // Adding duplicated pod should not increase 
pending worker count
+                               
resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod1)));
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1));
+
+                               
resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod2)));
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(0));
+                       });
+               }};
+       }
+
+       @Test
+       public void testPodTerminatedBeforeAdded() throws Exception {
+               new Context() {{
+                       runTest(() -> {
+                               registerSlotRequest();
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1));
+
+                               final Pod pod = 
kubeClient.pods().list().getItems().get(0);
+                               terminatePod(pod);
+
+                               
resourceManager.onModified(Collections.singletonList(new KubernetesPod(pod)));
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1));
+
+                               
resourceManager.onDeleted(Collections.singletonList(new KubernetesPod(pod)));
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1));
+
+                               
resourceManager.onError(Collections.singletonList(new KubernetesPod(pod)));
+                               
assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1));

Review comment:
       That way we would not have to rely on internal method such as 
`getNumPendingWorkersForTesting`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
##########
@@ -311,7 +322,9 @@ protected double getCpuCores(Configuration configuration) {
                return 
TaskExecutorProcessUtils.getCpuCoresWithFallbackConfigOption(configuration, 
KubernetesConfigOptions.TASK_MANAGER_CPU);
        }
 
-       private void internalStopPod(String podName) {
+       private Optional<WorkerResourceSpec> internalStopPod(String podName) {

Review comment:
       I think this method needs more tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to