zentol commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1499111780
##########
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java:
##########
@@ -594,6 +601,68 @@ private void
testNotificationAboutNotEnoughResources(boolean withNotificationGra
};
}
+ @Test
+ void testCloseWithRequirementsCheckBeingScheduled() throws Exception {
+ testCallingCloseWhileHavingTasksScheduled(
+ FineGrainedSlotManager::triggerResourceRequirementsCheck, 2);
+ }
+
+ @Test
+ void testCloseWithNeededResourcesDeclarationBeingScheduled() throws
Exception {
+ testCallingCloseWhileHavingTasksScheduled(
+ FineGrainedSlotManager::declareNeededResourcesWithDelay, 2);
+ }
+
+ @Test
+ void testCloseWithClusterReconciliationCheckBeingScheduled() throws
Exception {
+ testCallingCloseWhileHavingTasksScheduled(
+ slotManager -> {
+ // nothing to do - resource allocation is scheduled during
startup
+ },
+ 1);
+ }
+
+ private void testCallingCloseWhileHavingTasksScheduled(
+ Consumer<FineGrainedSlotManager> triggerScheduledTask, int
expectedScheduledTasks)
+ throws Exception {
+ new Context() {
+ {
+ final ManuallyTriggeredScheduledExecutorService
testScheduledExecutorService =
+ new ManuallyTriggeredScheduledExecutorService();
+ final ScheduledExecutorService testMainThreadExecutorService =
+ Executors.newSingleThreadScheduledExecutor();
+
+ scheduledExecutor =
+ new
ScheduledExecutorServiceAdapter(testScheduledExecutorService);
+ mainThreadExecutor =
+ new TestingComponentMainThreadExecutor(
+
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+ testMainThreadExecutorService));
+
+ runTest(() -> runInMainThread(() ->
triggerScheduledTask.accept(getSlotManager())));
+
+ // waiting for the close call to complete ensures that the
task was scheduled
+ // because close is called on the main thread after scheduling
the task
+ FlinkAssertions.assertThatFuture(closeFuture)
+ .as(
+ "The test run should have been terminated
before proceeding with the checks.")
+ .eventuallySucceeds();
+
+ assertThat(testScheduledExecutorService.getAllScheduledTasks())
+ .as("The expected number of tasks should have been
scheduled.")
+ .hasSize(expectedScheduledTasks);
+
+ // simulate shutting down the RpcEndpoint
+ testMainThreadExecutorService.shutdown();
+
+ assertThatNoException()
+ .as(
+ "There shouldn't occur any error due to the
shutdown of the MainThreadExecutorService.")
+
.isThrownBy(testScheduledExecutorService::triggerScheduledTasks);
Review Comment:
I don't think you can use this to verify the fix.
It's too optimistic; since we force the task to be in the queue when the
future gets cancelled this succeeds, but in real life it can also happen that
the executor is already executing something while the shutdown is happening,
where afaik the cancel doesn't do anything.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]