1996fanrui commented on code in PR #22985:
URL: https://github.com/apache/flink/pull/22985#discussion_r1356383787


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -124,23 +157,74 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
     @Override
     public void onNewResourcesAvailable() {
-        maybeRescale();
+        rescaleWhenCooldownPeriodIsOver();
     }
 
     @Override
     public void onNewResourceRequirements() {
-        maybeRescale();
+        rescaleWhenCooldownPeriodIsOver();
+    }
+
+    /** Force rescaling as long as the target parallelism is different from 
the current one. */
+    private void forceRescale() {
+        if (context.shouldRescale(getExecutionGraph(), true)) {
+            getLogger()
+                    .info(
+                            "Added resources are still there after {} 
time({}), force a rescale.",
+                            
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+                            scalingIntervalMax);
+            context.goToRestarting(
+                    getExecutionGraph(),
+                    getExecutionGraphHandler(),
+                    getOperatorCoordinatorHandler(),
+                    Duration.ofMillis(0L),
+                    getFailures());
+        }
     }
 
+    /**
+     * Rescale the job if added resource meets {@link 
JobManagerOptions#MIN_PARALLELISM_INCREASE}.
+     * Otherwise, force a rescale after {@link 
JobManagerOptions#SCHEDULER_SCALING_INTERVAL_MAX} if
+     * the resource is still there.
+     */
     private void maybeRescale() {
-        if (context.shouldRescale(getExecutionGraph())) {
-            getLogger().info("Can change the parallelism of job. Restarting 
job.");
+        rescaleScheduled = false;
+        if (context.shouldRescale(
+                getExecutionGraph(), false)) { // 
JobManagerOptions#MIN_PARALLELISM_INCREASE met
+            getLogger().info("Can change the parallelism of the job. 
Restarting the job.");
             context.goToRestarting(
                     getExecutionGraph(),
                     getExecutionGraphHandler(),
                     getOperatorCoordinatorHandler(),
                     Duration.ofMillis(0L),
                     getFailures());
+        } else if (scalingIntervalMax
+                != null) { // JobManagerOptions#MIN_PARALLELISM_INCREASE not 
met

Review Comment:
   I see all comments and code aren't same line in flink repo. Please move 
these comments before the code, and update the comments using the 
`forceRescale`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -124,23 +157,74 @@ private void handleDeploymentFailure(ExecutionVertex 
executionVertex, JobExcepti
 
     @Override
     public void onNewResourcesAvailable() {
-        maybeRescale();
+        rescaleWhenCooldownPeriodIsOver();
     }
 
     @Override
     public void onNewResourceRequirements() {
-        maybeRescale();
+        rescaleWhenCooldownPeriodIsOver();
+    }
+
+    /** Force rescaling as long as the target parallelism is different from 
the current one. */
+    private void forceRescale() {
+        if (context.shouldRescale(getExecutionGraph(), true)) {
+            getLogger()
+                    .info(
+                            "Added resources are still there after {} 
time({}), force a rescale.",
+                            
JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+                            scalingIntervalMax);
+            context.goToRestarting(
+                    getExecutionGraph(),
+                    getExecutionGraphHandler(),
+                    getOperatorCoordinatorHandler(),
+                    Duration.ofMillis(0L),
+                    getFailures());
+        }
     }
 
+    /**
+     * Rescale the job if added resource meets {@link 
JobManagerOptions#MIN_PARALLELISM_INCREASE}.
+     * Otherwise, force a rescale after {@link 
JobManagerOptions#SCHEDULER_SCALING_INTERVAL_MAX} if
+     * the resource is still there.
+     */
     private void maybeRescale() {
-        if (context.shouldRescale(getExecutionGraph())) {
-            getLogger().info("Can change the parallelism of job. Restarting 
job.");
+        rescaleScheduled = false;
+        if (context.shouldRescale(
+                getExecutionGraph(), false)) { // 
JobManagerOptions#MIN_PARALLELISM_INCREASE met

Review Comment:
   `JobManagerOptions#MIN_PARALLELISM_INCREASE` is the implementation inside of 
 `context.shouldRescale`, and we should interface-oriented rather than 
implementation-oriented, 
   
   So here shouldn't mention `MIN_PARALLELISM_INCREASE`, we just care about 
`forceRescale` is ture or false here. Does it make sense?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -67,13 +77,36 @@ class Executing extends StateWithExecutionGraph implements 
ResourceListener {
         this.context = context;
         Preconditions.checkState(
                 executionGraph.getState() == JobStatus.RUNNING, "Assuming 
running execution graph");
+        this.scalingIntervalMin = scalingIntervalMin;
+        this.scalingIntervalMax = scalingIntervalMax;
+        this.lastRescale =
+                Instant.now(); // Executing is recreated with each restart 
(when we rescale)
+        // we consider the first execution of the pipeline as a rescale event
+        Preconditions.checkState(
+                !scalingIntervalMin.isNegative(),
+                "%s must be positive integer or 0",
+                JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key());
+        if (scalingIntervalMax != null) {
+            Preconditions.checkState(
+                    scalingIntervalMax.compareTo(scalingIntervalMin) > 0,
+                    "%s(%d) must be greater than %s(%d)",
+                    JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(),
+                    scalingIntervalMax,
+                    JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key(),
+                    scalingIntervalMin);
+        }
 
         deploy();
 
         // check if new resources have come available in the meantime
         context.runIfState(this, this::maybeRescale, Duration.ZERO);
     }
 
+    @VisibleForTesting
+    void setLastRescale(Instant lastRescale) {

Review Comment:
   Can the `lastRescale` be passed from constructor? 
   
   If so, the `lastRescale` can be final, it's safer. I see it can works, 
please help check, thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to