tillrohrmann commented on a change in pull request #11323: URL: https://github.com/apache/flink/pull/11323#discussion_r411374113
########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -239,70 +247,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod) + .whenComplete( + (ignore, throwable) -> { + if (throwable != null) { + final Time retryInterval = configuration.getPodCreationRetryInterval(); + log.error("Could not start TaskManager in pod {}, retry in {}. ", + taskManagerPod.getName(), retryInterval, throwable); + scheduleRunAsync( + () -> requestKubernetesPodIfRequired(workerResourceSpec), + retryInterval); + } else { + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); Review comment: State changing operations should be run by the main thread. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -239,70 +247,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod) + .whenComplete( + (ignore, throwable) -> { + if (throwable != null) { + final Time retryInterval = configuration.getPodCreationRetryInterval(); + log.error("Could not start TaskManager in pod {}, retry in {}. ", Review comment: ```suggestion log.warn("Could not start TaskManager in pod {}, retry in {}. ", ``` ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -239,70 +247,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod) + .whenComplete( + (ignore, throwable) -> { + if (throwable != null) { + final Time retryInterval = configuration.getPodCreationRetryInterval(); + log.error("Could not start TaskManager in pod {}, retry in {}. ", + taskManagerPod.getName(), retryInterval, throwable); + scheduleRunAsync( + () -> requestKubernetesPodIfRequired(workerResourceSpec), + retryInterval); + } else { + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); + + log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + } + } + ); + } - log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + final TaskExecutorProcessSpec taskExecutorProcessSpec = + TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec); final String podName = String.format( TASK_MANAGER_POD_FORMAT, clusterId, currentMaxAttemptId, ++currentMaxPodId); + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); + final String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig); - final KubernetesTaskManagerParameters kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters( + return new KubernetesTaskManagerParameters( flinkConfig, podName, dynamicProperties, taskManagerParameters); - - final KubernetesPod taskManagerPod = - KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters); - - log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec); - kubeClient.createTaskManagerPod(taskManagerPod) - .whenComplete( - (ignore, throwable) -> { - if (throwable != null) { - log.error("Could not start TaskManager in pod {}.", podName, throwable); - scheduleRunAsync( - this::decreasePendingAndRequestKubernetesPodIfRequired, - configuration.getPodCreationRetryInterval()); - } - } - ); - } - - private void decreasePendingAndRequestKubernetesPodIfRequired() { - validateRunsInMainThread(); - numPendingPodRequests--; - requestKubernetesPodIfRequired(); } /** * Request new pod if pending pods cannot satisfy pending slot requests. */ - private void requestKubernetesPodIfRequired() { - final int requiredTaskManagers = getNumberRequiredTaskManagers(); + private void requestKubernetesPodIfRequired(WorkerResourceSpec workerResourceSpec) { + final int requiredTaskManagers = getRequiredResources().get(workerResourceSpec); - while (requiredTaskManagers > numPendingPodRequests) { - requestKubernetesPod(); + while (requiredTaskManagers > getNumPendingWorkersFor(workerResourceSpec)) { + requestKubernetesPod(workerResourceSpec); } } private void removePodIfTerminated(KubernetesPod pod) { Review comment: Unrelated: I think a better name could be `removePodAndTryRestartIfFailed` or so. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -239,70 +247,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod) + .whenComplete( + (ignore, throwable) -> { + if (throwable != null) { + final Time retryInterval = configuration.getPodCreationRetryInterval(); + log.error("Could not start TaskManager in pod {}, retry in {}. ", + taskManagerPod.getName(), retryInterval, throwable); + scheduleRunAsync( + () -> requestKubernetesPodIfRequired(workerResourceSpec), + retryInterval); + } else { + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); Review comment: Are we sure that this callback will be executed before `onAdded` will be called? If not, then we will fail at one of the check state statements. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -239,70 +247,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod) + .whenComplete( + (ignore, throwable) -> { + if (throwable != null) { + final Time retryInterval = configuration.getPodCreationRetryInterval(); + log.error("Could not start TaskManager in pod {}, retry in {}. ", + taskManagerPod.getName(), retryInterval, throwable); + scheduleRunAsync( + () -> requestKubernetesPodIfRequired(workerResourceSpec), + retryInterval); + } else { + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); + + log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + } + } + ); + } - log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + final TaskExecutorProcessSpec taskExecutorProcessSpec = + TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec); final String podName = String.format( TASK_MANAGER_POD_FORMAT, clusterId, currentMaxAttemptId, ++currentMaxPodId); + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); + final String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig); - final KubernetesTaskManagerParameters kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters( + return new KubernetesTaskManagerParameters( flinkConfig, podName, dynamicProperties, taskManagerParameters); - - final KubernetesPod taskManagerPod = - KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters); - - log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec); - kubeClient.createTaskManagerPod(taskManagerPod) - .whenComplete( - (ignore, throwable) -> { - if (throwable != null) { - log.error("Could not start TaskManager in pod {}.", podName, throwable); - scheduleRunAsync( - this::decreasePendingAndRequestKubernetesPodIfRequired, - configuration.getPodCreationRetryInterval()); - } - } - ); - } - - private void decreasePendingAndRequestKubernetesPodIfRequired() { - validateRunsInMainThread(); - numPendingPodRequests--; - requestKubernetesPodIfRequired(); } /** * Request new pod if pending pods cannot satisfy pending slot requests. */ - private void requestKubernetesPodIfRequired() { - final int requiredTaskManagers = getNumberRequiredTaskManagers(); + private void requestKubernetesPodIfRequired(WorkerResourceSpec workerResourceSpec) { + final int requiredTaskManagers = getRequiredResources().get(workerResourceSpec); - while (requiredTaskManagers > numPendingPodRequests) { - requestKubernetesPod(); + while (requiredTaskManagers > getNumPendingWorkersFor(workerResourceSpec)) { Review comment: I think this will be a busy loop until we call `notifyNewWorkerRequested` in the asynchronous call back. ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ########## @@ -368,6 +377,133 @@ public void testCreateTaskManagerPodFailedAndRetry() throws Exception { }}; } + @Test + public void testStartAndRecoverVariousResourceSpec() throws Exception { + new Context() {{ + final WorkerResourceSpec workerResourceSpec1 = new WorkerResourceSpec.Builder().setTaskHeapMemoryMB(100).build(); + final WorkerResourceSpec workerResourceSpec2 = new WorkerResourceSpec.Builder().setTaskHeapMemoryMB(99).build(); + slotManager = new TestingSlotManagerBuilder() + .setGetRequiredResourcesSupplier(() -> Collections.singletonMap(workerResourceSpec1, 1)) + .createSlotManager(); + + runTest(() -> { + // Start two workers with different resources + resourceManager.startNewWorker(workerResourceSpec1); + resourceManager.startNewWorker(workerResourceSpec2); + + // Verify two pods with both worker resources are started + final PodList initialPodList = kubeClient.pods().list(); + assertEquals(2, initialPodList.getItems().size()); + final Pod initialPod1 = getPodContainsStrInArgs(initialPodList, TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20)); + final Pod initialPod2 = getPodContainsStrInArgs(initialPodList, TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20)); + + // Notify resource manager about pods added. + final KubernetesPod initialKubernetesPod1 = new KubernetesPod(initialPod1); + final KubernetesPod initialKubernetesPod2 = new KubernetesPod(initialPod2); + resourceManager.onAdded(ImmutableList.of(initialKubernetesPod1, initialKubernetesPod2)); + + // Terminate pod1. + terminatePod(initialPod1); + resourceManager.onModified(Collections.singletonList(initialKubernetesPod1)); + + // Verify original pod1 is removed, a new pod1 with the same worker resource is requested. + // Meantime, pod2 is not changes. + final PodList activePodList = kubeClient.pods().list(); + assertEquals(2, activePodList.getItems().size()); + assertFalse(activePodList.getItems().contains(initialPod1)); + assertTrue(activePodList.getItems().contains(initialPod2)); + getPodContainsStrInArgs(initialPodList, TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20)); + }); + }}; + } + + @Test + public void testPreviousAttemptPodAdded() throws Exception { + new Context() {{ + runTest(() -> { + // Prepare previous attempt pod + final String previousAttemptPodName = CLUSTER_ID + "-taskmanager-1-1"; + final Pod previousAttemptPod = new PodBuilder() + .editOrNewMetadata() + .withName(previousAttemptPodName) + .withLabels(KubernetesUtils.getTaskManagerLabels(CLUSTER_ID)) + .endMetadata() + .editOrNewSpec() + .endSpec() + .build(); + flinkKubeClient.createTaskManagerPod(new KubernetesPod(previousAttemptPod)); + assertEquals(1, kubeClient.pods().list().getItems().size()); + + // Call initialize method to recover worker nodes from previous attempt. + resourceManager.initialize(); + + registerSlotRequest(); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1)); + + // adding previous attempt pod should not decrease pending worker count + resourceManager.onAdded(Collections.singletonList(new KubernetesPod(previousAttemptPod))); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1)); + + final Optional<Pod> currentAttemptPodOpt = kubeClient.pods().list().getItems().stream() + .filter(pod -> pod.getMetadata().getName().contains("-taskmanager-2-1")) + .findAny(); + assertTrue(currentAttemptPodOpt.isPresent()); + final Pod currentAttemptPod = currentAttemptPodOpt.get(); + + // adding current attempt pod should decrease the pending worker count + resourceManager.onAdded(Collections.singletonList(new KubernetesPod(currentAttemptPod))); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(0)); + }); + }}; + } + + @Test + public void testDuplicatedPodAdded() throws Exception { + new Context() {{ + runTest(() -> { + registerSlotRequest(); + registerSlotRequest(); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(2)); + + assertThat(kubeClient.pods().list().getItems().size(), is(2)); + final Pod pod1 = kubeClient.pods().list().getItems().get(0); + final Pod pod2 = kubeClient.pods().list().getItems().get(1); + + resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod1))); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1)); + + // Adding duplicated pod should not increase pending worker count + resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod1))); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1)); + + resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod2))); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(0)); + }); + }}; + } + + @Test + public void testPodTerminatedBeforeAdded() throws Exception { + new Context() {{ + runTest(() -> { + registerSlotRequest(); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1)); + + final Pod pod = kubeClient.pods().list().getItems().get(0); + terminatePod(pod); + + resourceManager.onModified(Collections.singletonList(new KubernetesPod(pod))); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1)); + + resourceManager.onDeleted(Collections.singletonList(new KubernetesPod(pod))); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1)); + + resourceManager.onError(Collections.singletonList(new KubernetesPod(pod))); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1)); Review comment: I think we should also assert that a new pod is being requested. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -320,5 +333,16 @@ private void internalStopPod(String podName) { } } ); + + final KubernetesWorkerNode kubernetesWorkerNode = workerNodes.remove(resourceId); + final WorkerResourceSpec workerResourceSpec = podWorkerResources.remove(podName); + + // If the stopped pod is requested in the current attempt (workerResourceSpec is known) and is not yet added, + // we need to notify ActiveResourceManager to decrease the pending worker count. + if (workerResourceSpec != null && kubernetesWorkerNode == null) { Review comment: I think this logic won't play nicely together with recovered pods. The problem is that `workerResourceSpec` is null for a recovered pod. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -320,5 +333,16 @@ private void internalStopPod(String podName) { } Review comment: Unrelated: Not being able to stop a container should not be an error log statement because Flink can still work afterwards. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -320,5 +333,16 @@ private void internalStopPod(String podName) { } } ); + + final KubernetesWorkerNode kubernetesWorkerNode = workerNodes.remove(resourceId); + final WorkerResourceSpec workerResourceSpec = podWorkerResources.remove(podName); Review comment: I think this won't work with updating `podWorkerResources` in the asynchronous callback in `requestKubernetesPod`. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -239,70 +247,73 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod) + .whenComplete( + (ignore, throwable) -> { + if (throwable != null) { + final Time retryInterval = configuration.getPodCreationRetryInterval(); + log.error("Could not start TaskManager in pod {}, retry in {}. ", + taskManagerPod.getName(), retryInterval, throwable); + scheduleRunAsync( + () -> requestKubernetesPodIfRequired(workerResourceSpec), + retryInterval); + } else { + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = notifyNewWorkerRequested(workerResourceSpec); + + log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + } + } + ); + } - log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + final TaskExecutorProcessSpec taskExecutorProcessSpec = + TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec); final String podName = String.format( TASK_MANAGER_POD_FORMAT, clusterId, currentMaxAttemptId, ++currentMaxPodId); + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); + final String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig); - final KubernetesTaskManagerParameters kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters( + return new KubernetesTaskManagerParameters( flinkConfig, podName, dynamicProperties, taskManagerParameters); - - final KubernetesPod taskManagerPod = - KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters); - - log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec); - kubeClient.createTaskManagerPod(taskManagerPod) - .whenComplete( - (ignore, throwable) -> { - if (throwable != null) { - log.error("Could not start TaskManager in pod {}.", podName, throwable); - scheduleRunAsync( - this::decreasePendingAndRequestKubernetesPodIfRequired, - configuration.getPodCreationRetryInterval()); - } - } - ); - } - - private void decreasePendingAndRequestKubernetesPodIfRequired() { - validateRunsInMainThread(); - numPendingPodRequests--; - requestKubernetesPodIfRequired(); } /** * Request new pod if pending pods cannot satisfy pending slot requests. */ - private void requestKubernetesPodIfRequired() { - final int requiredTaskManagers = getNumberRequiredTaskManagers(); + private void requestKubernetesPodIfRequired(WorkerResourceSpec workerResourceSpec) { + final int requiredTaskManagers = getRequiredResources().get(workerResourceSpec); Review comment: Instead of asking for a specific resource here, we could ask the `SlotManager` for all of its required resources and request new pods if needed. This would also solve the problem with recovered pods which don't have a `WorkerResourceSpec`. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -320,5 +333,16 @@ private void internalStopPod(String podName) { } } ); + + final KubernetesWorkerNode kubernetesWorkerNode = workerNodes.remove(resourceId); + final WorkerResourceSpec workerResourceSpec = podWorkerResources.remove(podName); + + // If the stopped pod is requested in the current attempt (workerResourceSpec is known) and is not yet added, + // we need to notify ActiveResourceManager to decrease the pending worker count. + if (workerResourceSpec != null && kubernetesWorkerNode == null) { Review comment: Could we give this condition a name? Something like `!hasPodBeenAdded(podName)` or `!hasPodBeenStarted(podName)`? ########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ########## @@ -368,6 +377,133 @@ public void testCreateTaskManagerPodFailedAndRetry() throws Exception { }}; } + @Test + public void testStartAndRecoverVariousResourceSpec() throws Exception { + new Context() {{ + final WorkerResourceSpec workerResourceSpec1 = new WorkerResourceSpec.Builder().setTaskHeapMemoryMB(100).build(); + final WorkerResourceSpec workerResourceSpec2 = new WorkerResourceSpec.Builder().setTaskHeapMemoryMB(99).build(); + slotManager = new TestingSlotManagerBuilder() + .setGetRequiredResourcesSupplier(() -> Collections.singletonMap(workerResourceSpec1, 1)) + .createSlotManager(); + + runTest(() -> { + // Start two workers with different resources + resourceManager.startNewWorker(workerResourceSpec1); + resourceManager.startNewWorker(workerResourceSpec2); + + // Verify two pods with both worker resources are started + final PodList initialPodList = kubeClient.pods().list(); + assertEquals(2, initialPodList.getItems().size()); + final Pod initialPod1 = getPodContainsStrInArgs(initialPodList, TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20)); + final Pod initialPod2 = getPodContainsStrInArgs(initialPodList, TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20)); + + // Notify resource manager about pods added. + final KubernetesPod initialKubernetesPod1 = new KubernetesPod(initialPod1); + final KubernetesPod initialKubernetesPod2 = new KubernetesPod(initialPod2); + resourceManager.onAdded(ImmutableList.of(initialKubernetesPod1, initialKubernetesPod2)); + + // Terminate pod1. + terminatePod(initialPod1); + resourceManager.onModified(Collections.singletonList(initialKubernetesPod1)); + + // Verify original pod1 is removed, a new pod1 with the same worker resource is requested. + // Meantime, pod2 is not changes. + final PodList activePodList = kubeClient.pods().list(); + assertEquals(2, activePodList.getItems().size()); + assertFalse(activePodList.getItems().contains(initialPod1)); + assertTrue(activePodList.getItems().contains(initialPod2)); + getPodContainsStrInArgs(initialPodList, TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20)); + }); + }}; + } + + @Test + public void testPreviousAttemptPodAdded() throws Exception { + new Context() {{ + runTest(() -> { + // Prepare previous attempt pod + final String previousAttemptPodName = CLUSTER_ID + "-taskmanager-1-1"; + final Pod previousAttemptPod = new PodBuilder() + .editOrNewMetadata() + .withName(previousAttemptPodName) + .withLabels(KubernetesUtils.getTaskManagerLabels(CLUSTER_ID)) + .endMetadata() + .editOrNewSpec() + .endSpec() + .build(); + flinkKubeClient.createTaskManagerPod(new KubernetesPod(previousAttemptPod)); + assertEquals(1, kubeClient.pods().list().getItems().size()); + + // Call initialize method to recover worker nodes from previous attempt. + resourceManager.initialize(); + + registerSlotRequest(); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1)); + + // adding previous attempt pod should not decrease pending worker count + resourceManager.onAdded(Collections.singletonList(new KubernetesPod(previousAttemptPod))); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1)); + + final Optional<Pod> currentAttemptPodOpt = kubeClient.pods().list().getItems().stream() + .filter(pod -> pod.getMetadata().getName().contains("-taskmanager-2-1")) + .findAny(); + assertTrue(currentAttemptPodOpt.isPresent()); + final Pod currentAttemptPod = currentAttemptPodOpt.get(); + + // adding current attempt pod should decrease the pending worker count + resourceManager.onAdded(Collections.singletonList(new KubernetesPod(currentAttemptPod))); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(0)); + }); + }}; + } + + @Test + public void testDuplicatedPodAdded() throws Exception { + new Context() {{ + runTest(() -> { + registerSlotRequest(); + registerSlotRequest(); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(2)); + + assertThat(kubeClient.pods().list().getItems().size(), is(2)); + final Pod pod1 = kubeClient.pods().list().getItems().get(0); + final Pod pod2 = kubeClient.pods().list().getItems().get(1); + + resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod1))); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1)); + + // Adding duplicated pod should not increase pending worker count + resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod1))); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1)); + + resourceManager.onAdded(Collections.singletonList(new KubernetesPod(pod2))); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(0)); + }); + }}; + } + + @Test + public void testPodTerminatedBeforeAdded() throws Exception { + new Context() {{ + runTest(() -> { + registerSlotRequest(); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1)); + + final Pod pod = kubeClient.pods().list().getItems().get(0); + terminatePod(pod); + + resourceManager.onModified(Collections.singletonList(new KubernetesPod(pod))); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1)); + + resourceManager.onDeleted(Collections.singletonList(new KubernetesPod(pod))); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1)); + + resourceManager.onError(Collections.singletonList(new KubernetesPod(pod))); + assertThat(resourceManager.getNumPendingWorkersForTesting(), is(1)); Review comment: That way we would not have to rely on internal method such as `getNumPendingWorkersForTesting`. ########## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ########## @@ -311,7 +322,9 @@ protected double getCpuCores(Configuration configuration) { return TaskExecutorProcessUtils.getCpuCoresWithFallbackConfigOption(configuration, KubernetesConfigOptions.TASK_MANAGER_CPU); } - private void internalStopPod(String podName) { + private Optional<WorkerResourceSpec> internalStopPod(String podName) { Review comment: I think this method needs more tests. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org