xintongsong commented on a change in pull request #11323: [FLINK-16439][k8s] Make KubernetesResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11323#discussion_r409451251
########## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ########## @@ -337,6 +342,46 @@ public void testGetCpuCoresNumSlots() throws Exception { }}; } + @Test + public void testStartAndRecoverVariousResourceSpec() throws Exception { + new Context() {{ + final WorkerResourceSpec workerResourceSpec1 = new WorkerResourceSpec.Builder().setTaskHeapMemoryMB(100).build(); + final WorkerResourceSpec workerResourceSpec2 = new WorkerResourceSpec.Builder().setTaskHeapMemoryMB(99).build(); + slotManager = new TestingSlotManagerBuilder() + .setGetRequiredResourcesSupplier(() -> Collections.singletonMap(workerResourceSpec1, 1)) + .createSlotManager(); + + runTest(() -> { + // Start two workers with different resources + resourceManager.startNewWorker(workerResourceSpec1); + resourceManager.startNewWorker(workerResourceSpec2); + + // Verify two pods with both worker resources are started + final PodList initialPodList = kubeClient.pods().list(); + assertEquals(2, initialPodList.getItems().size()); + final Pod initialPod1 = getPodContainsStrInArgs(initialPodList, TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20)); + final Pod initialPod2 = getPodContainsStrInArgs(initialPodList, TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20)); + + // Notify resource manager about pods added. + final KubernetesPod initialKubernetesPod1 = new KubernetesPod(initialPod1); + final KubernetesPod initialKubernetesPod2 = new KubernetesPod(initialPod2); + resourceManager.onAdded(ImmutableList.of(initialKubernetesPod1, initialKubernetesPod2)); + + // Terminate pod1. + terminatePod(initialPod1); + resourceManager.onModified(Collections.singletonList(initialKubernetesPod1)); + + // Verify original pod1 is removed, a new pod1 with the same worker resource is requested. + // Meantime, pod2 is not changes. + final PodList terminatedPodList = kubeClient.pods().list(); + assertEquals(2, terminatedPodList.getItems().size()); + assertFalse(terminatedPodList.getItems().contains(initialPod1)); + assertTrue(terminatedPodList.getItems().contains(initialPod2)); Review comment: I meant "pod list after the `terminatePod` action". I should choose better words. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services