zentol commented on code in PR #24309: URL: https://github.com/apache/flink/pull/24309#discussion_r1499111780
########## flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java: ########## @@ -594,6 +601,68 @@ private void testNotificationAboutNotEnoughResources(boolean withNotificationGra }; } + @Test + void testCloseWithRequirementsCheckBeingScheduled() throws Exception { + testCallingCloseWhileHavingTasksScheduled( + FineGrainedSlotManager::triggerResourceRequirementsCheck, 2); + } + + @Test + void testCloseWithNeededResourcesDeclarationBeingScheduled() throws Exception { + testCallingCloseWhileHavingTasksScheduled( + FineGrainedSlotManager::declareNeededResourcesWithDelay, 2); + } + + @Test + void testCloseWithClusterReconciliationCheckBeingScheduled() throws Exception { + testCallingCloseWhileHavingTasksScheduled( + slotManager -> { + // nothing to do - resource allocation is scheduled during startup + }, + 1); + } + + private void testCallingCloseWhileHavingTasksScheduled( + Consumer<FineGrainedSlotManager> triggerScheduledTask, int expectedScheduledTasks) + throws Exception { + new Context() { + { + final ManuallyTriggeredScheduledExecutorService testScheduledExecutorService = + new ManuallyTriggeredScheduledExecutorService(); + final ScheduledExecutorService testMainThreadExecutorService = + Executors.newSingleThreadScheduledExecutor(); + + scheduledExecutor = + new ScheduledExecutorServiceAdapter(testScheduledExecutorService); + mainThreadExecutor = + new TestingComponentMainThreadExecutor( + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( + testMainThreadExecutorService)); + + runTest(() -> runInMainThread(() -> triggerScheduledTask.accept(getSlotManager()))); + + // waiting for the close call to complete ensures that the task was scheduled + // because close is called on the main thread after scheduling the task + FlinkAssertions.assertThatFuture(closeFuture) + .as( + "The test run should have been terminated before proceeding with the checks.") + .eventuallySucceeds(); + + assertThat(testScheduledExecutorService.getAllScheduledTasks()) + .as("The expected number of tasks should have been scheduled.") + .hasSize(expectedScheduledTasks); + + // simulate shutting down the RpcEndpoint + testMainThreadExecutorService.shutdown(); + + assertThatNoException() + .as( + "There shouldn't occur any error due to the shutdown of the MainThreadExecutorService.") + .isThrownBy(testScheduledExecutorService::triggerScheduledTasks); Review Comment: I don't think you can use this to verify the fix. It's too optimistic; since we force the task to be in the queue when the future gets cancelled this succeeds, but in real life it can also happen that the executor is already executing something while the shutdown is happening, where afaik the cancel doesn't do anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org