xintongsong commented on a change in pull request #15047:
URL: https://github.com/apache/flink/pull/15047#discussion_r584508780



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
##########
@@ -100,6 +103,8 @@
 
     @Nullable private ScheduledFuture<?> taskManagerTimeoutsCheck;
 
+    @Nullable private ScheduledFuture<?> resourceRequirementsCheck;

Review comment:
       ```suggestion
       @Nullable private ScheduledFuture<?> lastResourceRequirementsCheck;
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
##########
@@ -485,6 +497,16 @@ private void checkResourceRequirements() {
         }
     }
 
+    private void triggerCheckResourceRequirements() {

Review comment:
       I would suggest to call this `checkResourceRequirementsWithDelay`, and 
explain the purpose in comments.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
##########
@@ -115,7 +114,8 @@ static TaskExecutorConnection 
createTaskExecutorConnection() {
         private final TaskManagerTracker taskManagerTracker = new 
FineGrainedTaskManagerTracker();
         private final SlotStatusSyncer slotStatusSyncer =
                 new DefaultSlotStatusSyncer(Time.seconds(10L));
-        private final ScheduledExecutor scheduledExecutor = 
TestingUtils.defaultScheduledExecutor();
+        private final ManuallyTriggeredScheduledExecutor scheduledExecutor =
+                new ManuallyTriggeredScheduledExecutor();

Review comment:
       Not sure if we need to manually trigger the scheduled events for all the 
tests.
   I think we can keep the default `TestingUtils.defaultScheduledExecutor()`, 
and allows the test cases to provide a `ManuallyTriggeredScheduledExecutor` 
when it's needed.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
##########
@@ -485,6 +497,16 @@ private void checkResourceRequirements() {
         }
     }
 
+    private void triggerCheckResourceRequirements() {

Review comment:
       And maybe comment for `checkResourceRequirements` that is should only be 
called from `checkResourceRequirementsWithDelay`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
##########
@@ -470,6 +474,43 @@ private void 
testNotificationAboutNotEnoughResources(boolean withNotificationGra
         };
     }
 
+    /**
+     * Test that checkResourceRequirements will only be triggered once after 
multiple trigger
+     * function calls.
+     */
+    @Test
+    public void testRequirementCheckOnlyTriggeredOnce() throws Exception {
+        new Context() {
+            {
+                final AtomicInteger checkTimes = new AtomicInteger();
+                
resourceAllocationStrategyBuilder.setTryFulfillRequirementsFunction(
+                        (ignored1, ignored2) -> {
+                            checkTimes.addAndGet(1);
+                            return ResourceAllocationResult.builder().build();
+                        });
+                runTest(
+                        () -> {
+                            final ResourceRequirements resourceRequirements1 =
+                                    createResourceRequirementsForSingleSlot();
+                            final ResourceRequirements resourceRequirements2 =
+                                    createResourceRequirementsForSingleSlot();
+                            final TaskExecutorConnection 
taskExecutionConnection =
+                                    createTaskExecutorConnection();
+                            
getSlotManager().processResourceRequirements(resourceRequirements1);
+                            
getSlotManager().processResourceRequirements(resourceRequirements2);
+                            getSlotManager()
+                                    .registerTaskManager(
+                                            taskExecutionConnection,
+                                            new SlotReport(),
+                                            DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                            DEFAULT_SLOT_RESOURCE_PROFILE);
+                            
getScheduledExecutor().triggerNonPeriodicScheduledTasks();
+                            assertThat(checkTimes.get(), is(1));
+                        });

Review comment:
       Maybe also verify that:
   * `checkTimes` increase again if there's another 
`processResourceRequirements` after the first `triggerNonPeriodicScheduledTasks`
   * `checkTimes` does not increase when there's no events between two 
`triggerNonPeriodicScheduledTasks`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to