1996fanrui commented on code in PR #22985: URL: https://github.com/apache/flink/pull/22985#discussion_r1356383787
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ########## @@ -124,23 +157,74 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { - maybeRescale(); + rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { - maybeRescale(); + rescaleWhenCooldownPeriodIsOver(); + } + + /** Force rescaling as long as the target parallelism is different from the current one. */ + private void forceRescale() { + if (context.shouldRescale(getExecutionGraph(), true)) { + getLogger() + .info( + "Added resources are still there after {} time({}), force a rescale.", + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), + scalingIntervalMax); + context.goToRestarting( + getExecutionGraph(), + getExecutionGraphHandler(), + getOperatorCoordinatorHandler(), + Duration.ofMillis(0L), + getFailures()); + } } + /** + * Rescale the job if added resource meets {@link JobManagerOptions#MIN_PARALLELISM_INCREASE}. + * Otherwise, force a rescale after {@link JobManagerOptions#SCHEDULER_SCALING_INTERVAL_MAX} if + * the resource is still there. + */ private void maybeRescale() { - if (context.shouldRescale(getExecutionGraph())) { - getLogger().info("Can change the parallelism of job. Restarting job."); + rescaleScheduled = false; + if (context.shouldRescale( + getExecutionGraph(), false)) { // JobManagerOptions#MIN_PARALLELISM_INCREASE met + getLogger().info("Can change the parallelism of the job. Restarting the job."); context.goToRestarting( getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler(), Duration.ofMillis(0L), getFailures()); + } else if (scalingIntervalMax + != null) { // JobManagerOptions#MIN_PARALLELISM_INCREASE not met Review Comment: I see all comments and code aren't same line in flink repo. Please move these comments before the code, and update the comments using the `forceRescale`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ########## @@ -124,23 +157,74 @@ private void handleDeploymentFailure(ExecutionVertex executionVertex, JobExcepti @Override public void onNewResourcesAvailable() { - maybeRescale(); + rescaleWhenCooldownPeriodIsOver(); } @Override public void onNewResourceRequirements() { - maybeRescale(); + rescaleWhenCooldownPeriodIsOver(); + } + + /** Force rescaling as long as the target parallelism is different from the current one. */ + private void forceRescale() { + if (context.shouldRescale(getExecutionGraph(), true)) { + getLogger() + .info( + "Added resources are still there after {} time({}), force a rescale.", + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), + scalingIntervalMax); + context.goToRestarting( + getExecutionGraph(), + getExecutionGraphHandler(), + getOperatorCoordinatorHandler(), + Duration.ofMillis(0L), + getFailures()); + } } + /** + * Rescale the job if added resource meets {@link JobManagerOptions#MIN_PARALLELISM_INCREASE}. + * Otherwise, force a rescale after {@link JobManagerOptions#SCHEDULER_SCALING_INTERVAL_MAX} if + * the resource is still there. + */ private void maybeRescale() { - if (context.shouldRescale(getExecutionGraph())) { - getLogger().info("Can change the parallelism of job. Restarting job."); + rescaleScheduled = false; + if (context.shouldRescale( + getExecutionGraph(), false)) { // JobManagerOptions#MIN_PARALLELISM_INCREASE met Review Comment: `JobManagerOptions#MIN_PARALLELISM_INCREASE` is the implementation inside of `context.shouldRescale`, and we should interface-oriented rather than implementation-oriented, So here shouldn't mention `MIN_PARALLELISM_INCREASE`, we just care about `forceRescale` is ture or false here. Does it make sense? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java: ########## @@ -67,13 +77,36 @@ class Executing extends StateWithExecutionGraph implements ResourceListener { this.context = context; Preconditions.checkState( executionGraph.getState() == JobStatus.RUNNING, "Assuming running execution graph"); + this.scalingIntervalMin = scalingIntervalMin; + this.scalingIntervalMax = scalingIntervalMax; + this.lastRescale = + Instant.now(); // Executing is recreated with each restart (when we rescale) + // we consider the first execution of the pipeline as a rescale event + Preconditions.checkState( + !scalingIntervalMin.isNegative(), + "%s must be positive integer or 0", + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key()); + if (scalingIntervalMax != null) { + Preconditions.checkState( + scalingIntervalMax.compareTo(scalingIntervalMin) > 0, + "%s(%d) must be greater than %s(%d)", + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), + scalingIntervalMax, + JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key(), + scalingIntervalMin); + } deploy(); // check if new resources have come available in the meantime context.runIfState(this, this::maybeRescale, Duration.ZERO); } + @VisibleForTesting + void setLastRescale(Instant lastRescale) { Review Comment: Can the `lastRescale` be passed from constructor? If so, the `lastRescale` can be final, it's safer. I see it can works, please help check, thanks~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org