tillrohrmann commented on a change in pull request #15159:
URL: https://github.com/apache/flink/pull/15159#discussion_r593157108



##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
##########
@@ -411,6 +411,25 @@
                                     .text(
                                             "Increasing this value will make 
the cluster more resilient against temporary resources shortages (e.g., there 
is more time for a failed TaskManager to be restarted), "
                                                     + "while decreasing this 
value reduces downtime of a job (provided that enough slots are available to 
still run the job).")
+                                    .linebreak()
+                                    .text(
+                                            "Setting a negative duration will 
disable the resource timeout: The JobManager will wait indefinitely for 
resources to appear.")
+                                    .build());
+
+    @Documentation.Section({
+        Documentation.Sections.EXPERT_SCHEDULING,
+        Documentation.Sections.ALL_JOB_MANAGER
+    })
+    public static final ConfigOption<Duration> RESOURCE_STABILIZATION_TIMEOUT =

Review comment:
       I think we need to document the different default values when using the 
reactive mode. Also for `RESOURCE_WAIT_TIMEOUT`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ReactiveModeUtils.java
##########
@@ -47,5 +51,20 @@ public static void configureJobGraphForReactiveMode(JobGraph 
jobGraph) {
         }
     }
 
+    public static void configureClusterForReactiveMode(Configuration 
configuration) {
+        LOG.info("Modifying Cluster configuration for reactive mode");
+
+        if 
(!configuration.contains(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT)) {
+            // Configure adaptive scheduler to schedule job even if desired 
resources are not
+            // available (but sufficient resources)
+            configuration.set(
+                    JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, 
Duration.ofMillis(0));
+        }
+        if (!configuration.contains(JobManagerOptions.RESOURCE_WAIT_TIMEOUT)) {
+            // configure adaptive scheduler to wait forever for TaskManagers 
to register
+            configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(-1));
+        }
+    }

Review comment:
       I am not saying that we have to change it but I wanted to make an 
observation about this kind of pattern (changing the `Configuration` so that 
the right configuration values are being picked up later):
   
   The problem I see with this pattern is that it decouples the instantiation 
of the required components and their configuration. Consequently, we as the 
developers have to remember that at every entrypoint which might use the 
`AdaptiveScheduler` we have to call 
`ReactiveModeUtils.configureclusterForReactiveMode` (here we have the 
`ClusterEntrypoint` and the `MiniCluster`). If we forget it, then the reactive 
mode might not really work.
   
   If these context sensitive configuration option overwrites would be moved 
closer to the instantiation of the components,  then it is easier to reuse the 
"reactive mode" in another setting. E.g. when seeing in the 
`DefaultJobManagerRunnerFactory` that we are running a job in reactive mode, 
then one could read `RESOURCE_STABILIZATION_TIMEOUT` and 
`RESOURCE_WAIT_TIMEOUT` with different default values (`0` and `-1`) which are 
then passed to the factory of the `AdaptiveScheduler`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java
##########
@@ -110,41 +115,164 @@ public void testNotEnoughResources() throws Exception {
     @Test
     public void testNotifyNewResourcesAvailable() throws Exception {
         try (MockContext ctx = new MockContext()) {
-            ctx.setHasEnoughResources(() -> false); // initially, not enough 
resources
+            ctx.setHasDesiredResources(() -> false); // initially, not enough 
resources
             WaitingForResources wfr =
-                    new WaitingForResources(ctx, log, RESOURCE_COUNTER, 
Duration.ZERO);
-            ctx.setHasEnoughResources(() -> true); // make resources available
+                    new WaitingForResources(
+                            ctx, log, RESOURCE_COUNTER, Duration.ZERO, 
STABILIZATION_TIMEOUT);
+            ctx.setHasDesiredResources(() -> true); // make resources available
             ctx.setExpectExecuting(assertNonNull());
             wfr.notifyNewResourcesAvailable(); // .. and notify
         }
     }
 
     @Test
-    public void testResourceTimeout() throws Exception {
+    public void 
testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() throws 
Exception {
+        try (MockContext ctx = new MockContext()) {
+
+            Duration noStabilizationTimeout = Duration.ofMillis(0);
+            WaitingForResources wfr =
+                    new WaitingForResources(
+                            ctx,
+                            log,
+                            RESOURCE_COUNTER,
+                            Duration.ofSeconds(1000),
+                            noStabilizationTimeout);
+
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> true);
+            ctx.setExpectExecuting(assertNonNull());
+            wfr.notifyNewResourcesAvailable();
+        }
+    }
+
+    @Test
+    public void testNoSchedulingIfStabilizationTimeoutIsConfigured() throws 
Exception {
         try (MockContext ctx = new MockContext()) {
-            ctx.setHasEnoughResources(() -> false);
+
+            Duration stabilizationTimeout = Duration.ofMillis(50000);
+
             WaitingForResources wfr =
-                    new WaitingForResources(ctx, log, RESOURCE_COUNTER, 
Duration.ZERO);
+                    new WaitingForResources(
+                            ctx,
+                            log,
+                            RESOURCE_COUNTER,
+                            Duration.ofSeconds(1000),
+                            stabilizationTimeout);
+
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> true);
+            wfr.notifyNewResourcesAvailable();
+            // we are not triggering the scheduled tasks, to simulate a long 
stabilization timeout
+
+            assertThat(ctx.hasStateTransition(), is(false));
+        }
+    }
+
+    @Test
+    public void testSchedulingIfStabilizationTimeoutIsConfigured() throws 
Exception {
+        try (MockContext ctx = new MockContext()) {
+
+            Duration initialResourceTimeout = Duration.ofMillis(120948);
+            Duration stabilizationTimeout = Duration.ofMillis(50000);
+
+            TestingWaitingForResources wfr =
+                    new TestingWaitingForResources(
+                            ctx,
+                            log,
+                            RESOURCE_COUNTER,
+                            initialResourceTimeout,
+                            stabilizationTimeout);
+
+            ctx.setHasDesiredResources(() -> false);
+            ctx.setHasSufficientResources(() -> true);
+            wfr.setStabilizationTimeoutOver(false);
 
             ctx.setExpectExecuting(assertNonNull());
+            wfr.notifyNewResourcesAvailable();
+
+            wfr.setStabilizationTimeoutOver(true);
 
-            // immediately execute all scheduled runnables
-            assertThat(ctx.getScheduledRunnables().size(), greaterThan(0));
+            // we are executing all scheduled runnables, except those with the
+            // initialResourceTimeout or a ZERO timeout to make sure we are 
testing the
+            // stabilization timeout
             for (ScheduledRunnable scheduledRunnable : 
ctx.getScheduledRunnables()) {
-                if (scheduledRunnable.getExpectedState() == wfr) {
+                if (scheduledRunnable.getExpectedState() == wfr
+                        && 
!scheduledRunnable.getDelay().equals(initialResourceTimeout)
+                        && 
!scheduledRunnable.getDelay().equals(Duration.ZERO)) {
                     scheduledRunnable.runAction();

Review comment:
       This looks a bit hacky. Would it make sense to add something like 
advance time which runs all the runnables which are eligible for running? Right 
now we effectively test a scenario which is not possible because we leave the 0 
delay runnables out.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
##########
@@ -90,13 +102,35 @@ public Logger getLogger() {
 
     @Override
     public void notifyNewResourcesAvailable() {
-        if (context.hasEnoughResources(desiredResources)) {
+        checkDesiredOrSufficientResourcesAvailable();
+    }
+
+    private void checkDesiredOrSufficientResourcesAvailable() {
+        if (context.hasDesiredResources(desiredResources)) {
             createExecutionGraphWithAvailableResources();
+            return;
+        }
+
+        if (context.hasSufficientResources()) {
+            if (isResourceStabilizationDeadlineOverdue()) {
+                createExecutionGraphWithAvailableResources();
+            } else {
+                // schedule next resource check
+                context.runIfState(
+                        this,
+                        this::checkDesiredOrSufficientResourcesAvailable,
+                        resourceStabilizationDeadline.timeLeft());
+            }
         }
     }

Review comment:
       I think the implementation of the resource stabilization timeout differs 
from the description of the option. According to the description of the option 
I would have expected that the timeout only starts after we have acquired 
enough resources to run the job. With the current implementation we already 
start the stabilization timeout when we enter the state.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ReactiveModeUtils.java
##########
@@ -47,5 +51,20 @@ public static void configureJobGraphForReactiveMode(JobGraph 
jobGraph) {
         }
     }
 
+    public static void configureClusterForReactiveMode(Configuration 
configuration) {
+        LOG.info("Modifying Cluster configuration for reactive mode");
+
+        if 
(!configuration.contains(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT)) {
+            // Configure adaptive scheduler to schedule job even if desired 
resources are not
+            // available (but sufficient resources)
+            configuration.set(
+                    JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, 
Duration.ofMillis(0));
+        }
+        if (!configuration.contains(JobManagerOptions.RESOURCE_WAIT_TIMEOUT)) {
+            // configure adaptive scheduler to wait forever for TaskManagers 
to register
+            configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, 
Duration.ofMillis(-1));
+        }
+    }

Review comment:
       I think the key observation is that if we keep related things closer 
together, then it is easier to reuse them and it is easier to maintain them 
because they are not spread all over the place.

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
##########
@@ -411,6 +411,25 @@
                                     .text(
                                             "Increasing this value will make 
the cluster more resilient against temporary resources shortages (e.g., there 
is more time for a failed TaskManager to be restarted), "
                                                     + "while decreasing this 
value reduces downtime of a job (provided that enough slots are available to 
still run the job).")
+                                    .linebreak()
+                                    .text(
+                                            "Setting a negative duration will 
disable the resource timeout: The JobManager will wait indefinitely for 
resources to appear.")
+                                    .build());
+
+    @Documentation.Section({
+        Documentation.Sections.EXPERT_SCHEDULING,
+        Documentation.Sections.ALL_JOB_MANAGER
+    })
+    public static final ConfigOption<Duration> RESOURCE_STABILIZATION_TIMEOUT =
+            key("jobmanager.adaptive-scheduler.resource-stabilization-timeout")
+                    .durationType()
+                    .defaultValue(RESOURCE_WAIT_TIMEOUT.defaultValue())
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The resource stabilization 
timeout defines the time the JobManager will wait "
+                                                    + "if fewer than the 
required resources are available, but sufficient resources for execution are 
there."

Review comment:
       ```suggestion
                                                       + "if fewer than the 
required resources are available, but sufficient resources for execution are 
there. "
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to