zentol commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1499111780


##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java:
##########
@@ -594,6 +601,68 @@ private void 
testNotificationAboutNotEnoughResources(boolean withNotificationGra
         };
     }
 
+    @Test
+    void testCloseWithRequirementsCheckBeingScheduled() throws Exception {
+        testCallingCloseWhileHavingTasksScheduled(
+                FineGrainedSlotManager::triggerResourceRequirementsCheck, 2);
+    }
+
+    @Test
+    void testCloseWithNeededResourcesDeclarationBeingScheduled() throws 
Exception {
+        testCallingCloseWhileHavingTasksScheduled(
+                FineGrainedSlotManager::declareNeededResourcesWithDelay, 2);
+    }
+
+    @Test
+    void testCloseWithClusterReconciliationCheckBeingScheduled() throws 
Exception {
+        testCallingCloseWhileHavingTasksScheduled(
+                slotManager -> {
+                    // nothing to do - resource allocation is scheduled during 
startup
+                },
+                1);
+    }
+
+    private void testCallingCloseWhileHavingTasksScheduled(
+            Consumer<FineGrainedSlotManager> triggerScheduledTask, int 
expectedScheduledTasks)
+            throws Exception {
+        new Context() {
+            {
+                final ManuallyTriggeredScheduledExecutorService 
testScheduledExecutorService =
+                        new ManuallyTriggeredScheduledExecutorService();
+                final ScheduledExecutorService testMainThreadExecutorService =
+                        Executors.newSingleThreadScheduledExecutor();
+
+                scheduledExecutor =
+                        new 
ScheduledExecutorServiceAdapter(testScheduledExecutorService);
+                mainThreadExecutor =
+                        new TestingComponentMainThreadExecutor(
+                                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                                        testMainThreadExecutorService));
+
+                runTest(() -> runInMainThread(() -> 
triggerScheduledTask.accept(getSlotManager())));
+
+                // waiting for the close call to complete ensures that the 
task was scheduled
+                // because close is called on the main thread after scheduling 
the task
+                FlinkAssertions.assertThatFuture(closeFuture)
+                        .as(
+                                "The test run should have been terminated 
before proceeding with the checks.")
+                        .eventuallySucceeds();
+
+                assertThat(testScheduledExecutorService.getAllScheduledTasks())
+                        .as("The expected number of tasks should have been 
scheduled.")
+                        .hasSize(expectedScheduledTasks);
+
+                // simulate shutting down the RpcEndpoint
+                testMainThreadExecutorService.shutdown();
+
+                assertThatNoException()
+                        .as(
+                                "There shouldn't occur any error due to the 
shutdown of the MainThreadExecutorService.")
+                        
.isThrownBy(testScheduledExecutorService::triggerScheduledTasks);

Review Comment:
   I don't think you can use this to verify the fix.
   It's too optimistic; since we force the task to be in the queue when the 
future gets cancelled this succeeds, but in real life it can also happen that 
the executor is already executing something while the shutdown is happening, 
where afaik the cancel doesn't do anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to