tillrohrmann commented on a change in pull request #15159: URL: https://github.com/apache/flink/pull/15159#discussion_r593157108
########## File path: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java ########## @@ -411,6 +411,25 @@ .text( "Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted), " + "while decreasing this value reduces downtime of a job (provided that enough slots are available to still run the job).") + .linebreak() + .text( + "Setting a negative duration will disable the resource timeout: The JobManager will wait indefinitely for resources to appear.") + .build()); + + @Documentation.Section({ + Documentation.Sections.EXPERT_SCHEDULING, + Documentation.Sections.ALL_JOB_MANAGER + }) + public static final ConfigOption<Duration> RESOURCE_STABILIZATION_TIMEOUT = Review comment: I think we need to document the different default values when using the reactive mode. Also for `RESOURCE_WAIT_TIMEOUT`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ReactiveModeUtils.java ########## @@ -47,5 +51,20 @@ public static void configureJobGraphForReactiveMode(JobGraph jobGraph) { } } + public static void configureClusterForReactiveMode(Configuration configuration) { + LOG.info("Modifying Cluster configuration for reactive mode"); + + if (!configuration.contains(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT)) { + // Configure adaptive scheduler to schedule job even if desired resources are not + // available (but sufficient resources) + configuration.set( + JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, Duration.ofMillis(0)); + } + if (!configuration.contains(JobManagerOptions.RESOURCE_WAIT_TIMEOUT)) { + // configure adaptive scheduler to wait forever for TaskManagers to register + configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(-1)); + } + } Review comment: I am not saying that we have to change it but I wanted to make an observation about this kind of pattern (changing the `Configuration` so that the right configuration values are being picked up later): The problem I see with this pattern is that it decouples the instantiation of the required components and their configuration. Consequently, we as the developers have to remember that at every entrypoint which might use the `AdaptiveScheduler` we have to call `ReactiveModeUtils.configureclusterForReactiveMode` (here we have the `ClusterEntrypoint` and the `MiniCluster`). If we forget it, then the reactive mode might not really work. If these context sensitive configuration option overwrites would be moved closer to the instantiation of the components, then it is easier to reuse the "reactive mode" in another setting. E.g. when seeing in the `DefaultJobManagerRunnerFactory` that we are running a job in reactive mode, then one could read `RESOURCE_STABILIZATION_TIMEOUT` and `RESOURCE_WAIT_TIMEOUT` with different default values (`0` and `-1`) which are then passed to the factory of the `AdaptiveScheduler`. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResourcesTest.java ########## @@ -110,41 +115,164 @@ public void testNotEnoughResources() throws Exception { @Test public void testNotifyNewResourcesAvailable() throws Exception { try (MockContext ctx = new MockContext()) { - ctx.setHasEnoughResources(() -> false); // initially, not enough resources + ctx.setHasDesiredResources(() -> false); // initially, not enough resources WaitingForResources wfr = - new WaitingForResources(ctx, log, RESOURCE_COUNTER, Duration.ZERO); - ctx.setHasEnoughResources(() -> true); // make resources available + new WaitingForResources( + ctx, log, RESOURCE_COUNTER, Duration.ZERO, STABILIZATION_TIMEOUT); + ctx.setHasDesiredResources(() -> true); // make resources available ctx.setExpectExecuting(assertNonNull()); wfr.notifyNewResourcesAvailable(); // .. and notify } } @Test - public void testResourceTimeout() throws Exception { + public void testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() throws Exception { + try (MockContext ctx = new MockContext()) { + + Duration noStabilizationTimeout = Duration.ofMillis(0); + WaitingForResources wfr = + new WaitingForResources( + ctx, + log, + RESOURCE_COUNTER, + Duration.ofSeconds(1000), + noStabilizationTimeout); + + ctx.setHasDesiredResources(() -> false); + ctx.setHasSufficientResources(() -> true); + ctx.setExpectExecuting(assertNonNull()); + wfr.notifyNewResourcesAvailable(); + } + } + + @Test + public void testNoSchedulingIfStabilizationTimeoutIsConfigured() throws Exception { try (MockContext ctx = new MockContext()) { - ctx.setHasEnoughResources(() -> false); + + Duration stabilizationTimeout = Duration.ofMillis(50000); + WaitingForResources wfr = - new WaitingForResources(ctx, log, RESOURCE_COUNTER, Duration.ZERO); + new WaitingForResources( + ctx, + log, + RESOURCE_COUNTER, + Duration.ofSeconds(1000), + stabilizationTimeout); + + ctx.setHasDesiredResources(() -> false); + ctx.setHasSufficientResources(() -> true); + wfr.notifyNewResourcesAvailable(); + // we are not triggering the scheduled tasks, to simulate a long stabilization timeout + + assertThat(ctx.hasStateTransition(), is(false)); + } + } + + @Test + public void testSchedulingIfStabilizationTimeoutIsConfigured() throws Exception { + try (MockContext ctx = new MockContext()) { + + Duration initialResourceTimeout = Duration.ofMillis(120948); + Duration stabilizationTimeout = Duration.ofMillis(50000); + + TestingWaitingForResources wfr = + new TestingWaitingForResources( + ctx, + log, + RESOURCE_COUNTER, + initialResourceTimeout, + stabilizationTimeout); + + ctx.setHasDesiredResources(() -> false); + ctx.setHasSufficientResources(() -> true); + wfr.setStabilizationTimeoutOver(false); ctx.setExpectExecuting(assertNonNull()); + wfr.notifyNewResourcesAvailable(); + + wfr.setStabilizationTimeoutOver(true); - // immediately execute all scheduled runnables - assertThat(ctx.getScheduledRunnables().size(), greaterThan(0)); + // we are executing all scheduled runnables, except those with the + // initialResourceTimeout or a ZERO timeout to make sure we are testing the + // stabilization timeout for (ScheduledRunnable scheduledRunnable : ctx.getScheduledRunnables()) { - if (scheduledRunnable.getExpectedState() == wfr) { + if (scheduledRunnable.getExpectedState() == wfr + && !scheduledRunnable.getDelay().equals(initialResourceTimeout) + && !scheduledRunnable.getDelay().equals(Duration.ZERO)) { scheduledRunnable.runAction(); Review comment: This looks a bit hacky. Would it make sense to add something like advance time which runs all the runnables which are eligible for running? Right now we effectively test a scenario which is not possible because we leave the 0 delay runnables out. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java ########## @@ -90,13 +102,35 @@ public Logger getLogger() { @Override public void notifyNewResourcesAvailable() { - if (context.hasEnoughResources(desiredResources)) { + checkDesiredOrSufficientResourcesAvailable(); + } + + private void checkDesiredOrSufficientResourcesAvailable() { + if (context.hasDesiredResources(desiredResources)) { createExecutionGraphWithAvailableResources(); + return; + } + + if (context.hasSufficientResources()) { + if (isResourceStabilizationDeadlineOverdue()) { + createExecutionGraphWithAvailableResources(); + } else { + // schedule next resource check + context.runIfState( + this, + this::checkDesiredOrSufficientResourcesAvailable, + resourceStabilizationDeadline.timeLeft()); + } } } Review comment: I think the implementation of the resource stabilization timeout differs from the description of the option. According to the description of the option I would have expected that the timeout only starts after we have acquired enough resources to run the job. With the current implementation we already start the stabilization timeout when we enter the state. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/ReactiveModeUtils.java ########## @@ -47,5 +51,20 @@ public static void configureJobGraphForReactiveMode(JobGraph jobGraph) { } } + public static void configureClusterForReactiveMode(Configuration configuration) { + LOG.info("Modifying Cluster configuration for reactive mode"); + + if (!configuration.contains(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT)) { + // Configure adaptive scheduler to schedule job even if desired resources are not + // available (but sufficient resources) + configuration.set( + JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, Duration.ofMillis(0)); + } + if (!configuration.contains(JobManagerOptions.RESOURCE_WAIT_TIMEOUT)) { + // configure adaptive scheduler to wait forever for TaskManagers to register + configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(-1)); + } + } Review comment: I think the key observation is that if we keep related things closer together, then it is easier to reuse them and it is easier to maintain them because they are not spread all over the place. ########## File path: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java ########## @@ -411,6 +411,25 @@ .text( "Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted), " + "while decreasing this value reduces downtime of a job (provided that enough slots are available to still run the job).") + .linebreak() + .text( + "Setting a negative duration will disable the resource timeout: The JobManager will wait indefinitely for resources to appear.") + .build()); + + @Documentation.Section({ + Documentation.Sections.EXPERT_SCHEDULING, + Documentation.Sections.ALL_JOB_MANAGER + }) + public static final ConfigOption<Duration> RESOURCE_STABILIZATION_TIMEOUT = + key("jobmanager.adaptive-scheduler.resource-stabilization-timeout") + .durationType() + .defaultValue(RESOURCE_WAIT_TIMEOUT.defaultValue()) + .withDescription( + Description.builder() + .text( + "The resource stabilization timeout defines the time the JobManager will wait " + + "if fewer than the required resources are available, but sufficient resources for execution are there." Review comment: ```suggestion + "if fewer than the required resources are available, but sufficient resources for execution are there. " ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org