rkhachatryan commented on code in PR #22169: URL: https://github.com/apache/flink/pull/22169#discussion_r1149689034
########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ########## @@ -951,6 +949,138 @@ void testConsistentMaxParallelism() throws Exception { assertThat(resubmittedArchivedVertex.getMaxParallelism()).isEqualTo(expectedMaxParallelism); } + @Test + void testRequirementIncreaseTriggersScaleUp() throws Exception { + final JobGraph jobGraph = createJobGraph(); + + final DefaultDeclarativeSlotPool declarativeSlotPool = + createDeclarativeSlotPool(jobGraph.getJobID()); + + final Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L)); + + final AdaptiveScheduler scheduler = + new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor) + .setDeclarativeSlotPool(declarativeSlotPool) + .setJobMasterConfiguration(configuration) + .build(EXECUTOR_RESOURCE.getExecutor()); + + final int scaledUpParallelism = PARALLELISM * 2; + + final SubmissionBufferingTaskManagerGateway taskManagerGateway = + createSubmissionBufferingTaskManagerGateway(scaledUpParallelism, scheduler); + + startJobWithSlotsMatchingParallelism( + scheduler, declarativeSlotPool, taskManagerGateway, PARALLELISM); + awaitJobReachingParallelism(taskManagerGateway, scheduler, PARALLELISM); + + JobResourceRequirements newJobResourceRequirements = + createRequirementsWithUpperParallelism(scaledUpParallelism); + singleThreadMainThreadExecutor.execute( + () -> { + // first update requirements as otherwise slots are rejected! + scheduler.updateJobResourceRequirements(newJobResourceRequirements); + offerSlots( + declarativeSlotPool, + createSlotOffersForResourceRequirements( + ResourceCounter.withResource( + ResourceProfile.UNKNOWN, PARALLELISM)), + taskManagerGateway); + }); + + awaitJobReachingParallelism(taskManagerGateway, scheduler, scaledUpParallelism); + } + + @Test + void testRequirementDecreaseTriggersScaleDown() throws Exception { Review Comment: This test duplicates most of `testRequirementIncreaseTriggersScaleUp` logic above. How about extracting common code from these two tests? ########## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java: ########## @@ -951,6 +949,138 @@ void testConsistentMaxParallelism() throws Exception { assertThat(resubmittedArchivedVertex.getMaxParallelism()).isEqualTo(expectedMaxParallelism); } + @Test + void testRequirementIncreaseTriggersScaleUp() throws Exception { + final JobGraph jobGraph = createJobGraph(); + + final DefaultDeclarativeSlotPool declarativeSlotPool = + createDeclarativeSlotPool(jobGraph.getJobID()); + + final Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L)); + + final AdaptiveScheduler scheduler = + new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor) + .setDeclarativeSlotPool(declarativeSlotPool) + .setJobMasterConfiguration(configuration) + .build(EXECUTOR_RESOURCE.getExecutor()); + + final int scaledUpParallelism = PARALLELISM * 2; + + final SubmissionBufferingTaskManagerGateway taskManagerGateway = + createSubmissionBufferingTaskManagerGateway(scaledUpParallelism, scheduler); + + startJobWithSlotsMatchingParallelism( + scheduler, declarativeSlotPool, taskManagerGateway, PARALLELISM); + awaitJobReachingParallelism(taskManagerGateway, scheduler, PARALLELISM); + + JobResourceRequirements newJobResourceRequirements = + createRequirementsWithUpperParallelism(scaledUpParallelism); + singleThreadMainThreadExecutor.execute( + () -> { + // first update requirements as otherwise slots are rejected! + scheduler.updateJobResourceRequirements(newJobResourceRequirements); + offerSlots( + declarativeSlotPool, + createSlotOffersForResourceRequirements( + ResourceCounter.withResource( + ResourceProfile.UNKNOWN, PARALLELISM)), + taskManagerGateway); + }); + + awaitJobReachingParallelism(taskManagerGateway, scheduler, scaledUpParallelism); + } + + @Test + void testRequirementDecreaseTriggersScaleDown() throws Exception { + final JobGraph jobGraph = createJobGraph(); + + final DefaultDeclarativeSlotPool declarativeSlotPool = + createDeclarativeSlotPool(jobGraph.getJobID()); + + final Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L)); + + final AdaptiveScheduler scheduler = + new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor) + .setDeclarativeSlotPool(declarativeSlotPool) + .setJobMasterConfiguration(configuration) + .build(EXECUTOR_RESOURCE.getExecutor()); + + final SubmissionBufferingTaskManagerGateway taskManagerGateway = + createSubmissionBufferingTaskManagerGateway(PARALLELISM, scheduler); + + startJobWithSlotsMatchingParallelism( + scheduler, declarativeSlotPool, taskManagerGateway, PARALLELISM); + awaitJobReachingParallelism(taskManagerGateway, scheduler, PARALLELISM); + + int scaledDownParallelism = PARALLELISM / 2; Review Comment: Decreasing parallelism just a little bit would cover more cases, WDYT? i.e. ```suggestion int scaledDownParallelism = PARALLELISM - 1; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org