XComp commented on code in PR #24309:
URL: https://github.com/apache/flink/pull/24309#discussion_r1489646969


##########
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java:
##########
@@ -318,11 +319,22 @@ public void triggerNonPeriodicScheduledTasks(Class<?> 
taskClazz) {
     public void triggerPeriodicScheduledTasks() {
         for (ScheduledTask<?> scheduledTask : periodicScheduledTasks) {
             if (!scheduledTask.isCancelled()) {
-                scheduledTask.execute();
+                executeScheduledTask(scheduledTask);
             }
         }
     }
 
+    private static void executeScheduledTask(ScheduledTask<?> scheduledTask) {
+        scheduledTask.execute();
+        try {
+            // try to retrieve result of scheduled task to avoid swallowing 
any exceptions that
+            // occurred
+            scheduledTask.get();

Review Comment:
   True, I missed that we're not retrieving the result in case of periodic 
tasks in 
[ScheduledTask#execute](https://github.com/apache/flink/blob/5405239dec0884dff746129c73955c90f455c465/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ScheduledTask.java#L76).
   
   I changed the behavior: Instead of collecting the error in the future, we're 
going to throw the error rightaway:
   > ScheduledTask only serves as a container for a Callable. The error 
handling should be done by an Executor (e.g. the main thread). Therefore, 
explicitly handling errors inside #execute is out of scope for ScheduledTask.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -594,18 +597,19 @@ private void checkResourceRequirementsWithDelay() {
         if (requirementsCheckDelay.toMillis() <= 0) {
             checkResourceRequirements();
         } else {
-            if (requirementsCheckFuture == null || 
requirementsCheckFuture.isDone()) {
-                requirementsCheckFuture = new CompletableFuture<>();
-                scheduledExecutor.schedule(
-                        () ->
-                                mainThreadExecutor.execute(
-                                        () -> {
-                                            checkResourceRequirements();
-                                            
Preconditions.checkNotNull(requirementsCheckFuture)
-                                                    .complete(null);
-                                        }),
-                        requirementsCheckDelay.toMillis(),
-                        TimeUnit.MILLISECONDS);
+            if (requirementsCheckFuture.isDone()) {
+                requirementsCheckFuture =
+                        scheduledExecutor.schedule(
+                                () -> {
+                                    if (started) {

Review Comment:
   yeah, I had to revisit the PR. I looked into the synchronization of 
`started` yesterday and was kind of puzzled why we haven't had to synchronize 
the field till now. But that's due to the fact that all methods that rely on 
the `started` field are actually called from within the ResourceManager's main 
thread. In the end, I forgot to consider this when actually accessing the 
`SlotManager`'s state from another thread :facepalm: 
   
   Anyway, I reiterated over the PR and added some assertions to make it 
clearer that the `FineGrainedSlotManager` should be handled from within the 
`ResourceManager`'s main thread. Please clarify if I came up with the wrong 
conclusion and missed something here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to