rkhachatryan commented on code in PR #22169:
URL: https://github.com/apache/flink/pull/22169#discussion_r1149689034


##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##########
@@ -951,6 +949,138 @@ void testConsistentMaxParallelism() throws Exception {
         
assertThat(resubmittedArchivedVertex.getMaxParallelism()).isEqualTo(expectedMaxParallelism);
     }
 
+    @Test
+    void testRequirementIncreaseTriggersScaleUp() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .setJobMasterConfiguration(configuration)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        final int scaledUpParallelism = PARALLELISM * 2;
+
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                
createSubmissionBufferingTaskManagerGateway(scaledUpParallelism, scheduler);
+
+        startJobWithSlotsMatchingParallelism(
+                scheduler, declarativeSlotPool, taskManagerGateway, 
PARALLELISM);
+        awaitJobReachingParallelism(taskManagerGateway, scheduler, 
PARALLELISM);
+
+        JobResourceRequirements newJobResourceRequirements =
+                createRequirementsWithUpperParallelism(scaledUpParallelism);
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    // first update requirements as otherwise slots are 
rejected!
+                    
scheduler.updateJobResourceRequirements(newJobResourceRequirements);
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
PARALLELISM)),
+                            taskManagerGateway);
+                });
+
+        awaitJobReachingParallelism(taskManagerGateway, scheduler, 
scaledUpParallelism);
+    }
+
+    @Test
+    void testRequirementDecreaseTriggersScaleDown() throws Exception {

Review Comment:
   This test duplicates most of `testRequirementIncreaseTriggersScaleUp` logic 
above.
   How about extracting common code from these two tests?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##########
@@ -951,6 +949,138 @@ void testConsistentMaxParallelism() throws Exception {
         
assertThat(resubmittedArchivedVertex.getMaxParallelism()).isEqualTo(expectedMaxParallelism);
     }
 
+    @Test
+    void testRequirementIncreaseTriggersScaleUp() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .setJobMasterConfiguration(configuration)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        final int scaledUpParallelism = PARALLELISM * 2;
+
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                
createSubmissionBufferingTaskManagerGateway(scaledUpParallelism, scheduler);
+
+        startJobWithSlotsMatchingParallelism(
+                scheduler, declarativeSlotPool, taskManagerGateway, 
PARALLELISM);
+        awaitJobReachingParallelism(taskManagerGateway, scheduler, 
PARALLELISM);
+
+        JobResourceRequirements newJobResourceRequirements =
+                createRequirementsWithUpperParallelism(scaledUpParallelism);
+        singleThreadMainThreadExecutor.execute(
+                () -> {
+                    // first update requirements as otherwise slots are 
rejected!
+                    
scheduler.updateJobResourceRequirements(newJobResourceRequirements);
+                    offerSlots(
+                            declarativeSlotPool,
+                            createSlotOffersForResourceRequirements(
+                                    ResourceCounter.withResource(
+                                            ResourceProfile.UNKNOWN, 
PARALLELISM)),
+                            taskManagerGateway);
+                });
+
+        awaitJobReachingParallelism(taskManagerGateway, scheduler, 
scaledUpParallelism);
+    }
+
+    @Test
+    void testRequirementDecreaseTriggersScaleDown() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID());
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(1L));
+
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .setJobMasterConfiguration(configuration)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                createSubmissionBufferingTaskManagerGateway(PARALLELISM, 
scheduler);
+
+        startJobWithSlotsMatchingParallelism(
+                scheduler, declarativeSlotPool, taskManagerGateway, 
PARALLELISM);
+        awaitJobReachingParallelism(taskManagerGateway, scheduler, 
PARALLELISM);
+
+        int scaledDownParallelism = PARALLELISM / 2;

Review Comment:
   Decreasing parallelism just a little bit would cover more cases, WDYT?
   
   i.e.
   ```suggestion
           int scaledDownParallelism = PARALLELISM - 1;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to