xintongsong commented on a change in pull request #11323: [FLINK-16439][k8s] 
Make KubernetesResourceManager starts workers using WorkerResourceSpec 
requested by SlotManager
URL: https://github.com/apache/flink/pull/11323#discussion_r409451251
 
 

 ##########
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##########
 @@ -337,6 +342,46 @@ public void testGetCpuCoresNumSlots() throws Exception {
                }};
        }
 
+       @Test
+       public void testStartAndRecoverVariousResourceSpec() throws Exception {
+               new Context() {{
+                       final WorkerResourceSpec workerResourceSpec1 = new 
WorkerResourceSpec.Builder().setTaskHeapMemoryMB(100).build();
+                       final WorkerResourceSpec workerResourceSpec2 = new 
WorkerResourceSpec.Builder().setTaskHeapMemoryMB(99).build();
+                       slotManager = new TestingSlotManagerBuilder()
+                               .setGetRequiredResourcesSupplier(() -> 
Collections.singletonMap(workerResourceSpec1, 1))
+                               .createSlotManager();
+
+                       runTest(() -> {
+                               // Start two workers with different resources
+                               
resourceManager.startNewWorker(workerResourceSpec1);
+                               
resourceManager.startNewWorker(workerResourceSpec2);
+
+                               // Verify two pods with both worker resources 
are started
+                               final PodList initialPodList = 
kubeClient.pods().list();
+                               assertEquals(2, 
initialPodList.getItems().size());
+                               final Pod initialPod1 = 
getPodContainsStrInArgs(initialPodList, 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20));
+                               final Pod initialPod2 = 
getPodContainsStrInArgs(initialPodList, 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20));
+
+                               // Notify resource manager about pods added.
+                               final KubernetesPod initialKubernetesPod1 = new 
KubernetesPod(initialPod1);
+                               final KubernetesPod initialKubernetesPod2 = new 
KubernetesPod(initialPod2);
+                               
resourceManager.onAdded(ImmutableList.of(initialKubernetesPod1, 
initialKubernetesPod2));
+
+                               // Terminate pod1.
+                               terminatePod(initialPod1);
+                               
resourceManager.onModified(Collections.singletonList(initialKubernetesPod1));
+
+                               // Verify original pod1 is removed, a new pod1 
with the same worker resource is requested.
+                               // Meantime, pod2 is not changes.
+                               final PodList terminatedPodList = 
kubeClient.pods().list();
+                               assertEquals(2, 
terminatedPodList.getItems().size());
+                               
assertFalse(terminatedPodList.getItems().contains(initialPod1));
+                               
assertTrue(terminatedPodList.getItems().contains(initialPod2));
 
 Review comment:
   I meant "pod list after the `terminatePod` action". I should choose better 
words.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to