zentol commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1499111780


##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java:
##########
@@ -594,6 +601,68 @@ private void 
testNotificationAboutNotEnoughResources(boolean withNotificationGra
         };
     }
 
+    @Test
+    void testCloseWithRequirementsCheckBeingScheduled() throws Exception {
+        testCallingCloseWhileHavingTasksScheduled(
+                FineGrainedSlotManager::triggerResourceRequirementsCheck, 2);
+    }
+
+    @Test
+    void testCloseWithNeededResourcesDeclarationBeingScheduled() throws 
Exception {
+        testCallingCloseWhileHavingTasksScheduled(
+                FineGrainedSlotManager::declareNeededResourcesWithDelay, 2);
+    }
+
+    @Test
+    void testCloseWithClusterReconciliationCheckBeingScheduled() throws 
Exception {
+        testCallingCloseWhileHavingTasksScheduled(
+                slotManager -> {
+                    // nothing to do - resource allocation is scheduled during 
startup
+                },
+                1);
+    }
+
+    private void testCallingCloseWhileHavingTasksScheduled(
+            Consumer<FineGrainedSlotManager> triggerScheduledTask, int 
expectedScheduledTasks)
+            throws Exception {
+        new Context() {
+            {
+                final ManuallyTriggeredScheduledExecutorService 
testScheduledExecutorService =
+                        new ManuallyTriggeredScheduledExecutorService();
+                final ScheduledExecutorService testMainThreadExecutorService =
+                        Executors.newSingleThreadScheduledExecutor();
+
+                scheduledExecutor =
+                        new 
ScheduledExecutorServiceAdapter(testScheduledExecutorService);
+                mainThreadExecutor =
+                        new TestingComponentMainThreadExecutor(
+                                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                                        testMainThreadExecutorService));
+
+                runTest(() -> runInMainThread(() -> 
triggerScheduledTask.accept(getSlotManager())));
+
+                // waiting for the close call to complete ensures that the 
task was scheduled
+                // because close is called on the main thread after scheduling 
the task
+                FlinkAssertions.assertThatFuture(closeFuture)
+                        .as(
+                                "The test run should have been terminated 
before proceeding with the checks.")
+                        .eventuallySucceeds();
+
+                assertThat(testScheduledExecutorService.getAllScheduledTasks())
+                        .as("The expected number of tasks should have been 
scheduled.")
+                        .hasSize(expectedScheduledTasks);
+
+                // simulate shutting down the RpcEndpoint
+                testMainThreadExecutorService.shutdown();
+
+                assertThatNoException()
+                        .as(
+                                "There shouldn't occur any error due to the 
shutdown of the MainThreadExecutorService.")
+                        
.isThrownBy(testScheduledExecutorService::triggerScheduledTasks);

Review Comment:
   I don't think you can use this to verify the fix.
   It's too _neat_; since we force the task to be in the queue when we close 
things you eliminate concurrency, but in real life it can also happen that the 
executor is already executing something while the shutdown is happening, where 
afaik the cancel doesn't do anything.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java:
##########
@@ -594,6 +601,68 @@ private void 
testNotificationAboutNotEnoughResources(boolean withNotificationGra
         };
     }
 
+    @Test
+    void testCloseWithRequirementsCheckBeingScheduled() throws Exception {
+        testCallingCloseWhileHavingTasksScheduled(
+                FineGrainedSlotManager::triggerResourceRequirementsCheck, 2);
+    }
+
+    @Test
+    void testCloseWithNeededResourcesDeclarationBeingScheduled() throws 
Exception {
+        testCallingCloseWhileHavingTasksScheduled(
+                FineGrainedSlotManager::declareNeededResourcesWithDelay, 2);
+    }
+
+    @Test
+    void testCloseWithClusterReconciliationCheckBeingScheduled() throws 
Exception {
+        testCallingCloseWhileHavingTasksScheduled(
+                slotManager -> {
+                    // nothing to do - resource allocation is scheduled during 
startup
+                },
+                1);
+    }
+
+    private void testCallingCloseWhileHavingTasksScheduled(
+            Consumer<FineGrainedSlotManager> triggerScheduledTask, int 
expectedScheduledTasks)
+            throws Exception {
+        new Context() {
+            {
+                final ManuallyTriggeredScheduledExecutorService 
testScheduledExecutorService =
+                        new ManuallyTriggeredScheduledExecutorService();
+                final ScheduledExecutorService testMainThreadExecutorService =
+                        Executors.newSingleThreadScheduledExecutor();
+
+                scheduledExecutor =
+                        new 
ScheduledExecutorServiceAdapter(testScheduledExecutorService);
+                mainThreadExecutor =
+                        new TestingComponentMainThreadExecutor(
+                                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                                        testMainThreadExecutorService));
+
+                runTest(() -> runInMainThread(() -> 
triggerScheduledTask.accept(getSlotManager())));
+
+                // waiting for the close call to complete ensures that the 
task was scheduled
+                // because close is called on the main thread after scheduling 
the task
+                FlinkAssertions.assertThatFuture(closeFuture)
+                        .as(
+                                "The test run should have been terminated 
before proceeding with the checks.")
+                        .eventuallySucceeds();
+
+                assertThat(testScheduledExecutorService.getAllScheduledTasks())
+                        .as("The expected number of tasks should have been 
scheduled.")
+                        .hasSize(expectedScheduledTasks);
+
+                // simulate shutting down the RpcEndpoint
+                testMainThreadExecutorService.shutdown();
+
+                assertThatNoException()
+                        .as(
+                                "There shouldn't occur any error due to the 
shutdown of the MainThreadExecutorService.")
+                        
.isThrownBy(testScheduledExecutorService::triggerScheduledTasks);

Review Comment:
   I don't think you can use this to verify the fix.
   It's too _neat_; since we force the task to be in the queue when we close 
things you eliminate concurrency, but in real life it can also happen that the 
executor is already executing something while the shutdown is happening.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to