zentol commented on code in PR #22169:
URL: https://github.com/apache/flink/pull/22169#discussion_r1147365311


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -730,15 +748,49 @@ public CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoord
                                                         + " does not exist")));
     }
 
+    @Override
+    public JobResourceRequirements requestJobResourceRequirements() {
+        final JobResourceRequirements.Builder builder = 
JobResourceRequirements.newBuilder();
+        for (JobInformation.VertexInformation vertex : 
jobInformation.getVertices()) {
+            builder.setParallelismForJobVertex(vertex.getJobVertexID(), 1, 
vertex.getParallelism());
+        }
+        return builder.build();
+    }
+
+    @Override
+    public void updateJobResourceRequirements(JobResourceRequirements 
jobResourceRequirements) {
+        if (executionMode == SchedulerExecutionMode.REACTIVE) {
+            throw new UnsupportedOperationException(
+                    "Cannot change the parallelism of a job running in 
reactive mode.");
+        }
+        final Optional<VertexParallelismStore> 
maybeUpdateVertexParallelismStore =
+                DefaultVertexParallelismStore.applyJobResourceRequirements(
+                        jobInformation.getVertexParallelismStore(), 
jobResourceRequirements);
+        if (maybeUpdateVertexParallelismStore.isPresent()) {
+            this.jobInformation =
+                    new JobGraphJobInformation(jobGraph, 
maybeUpdateVertexParallelismStore.get());

Review Comment:
   > To scale down, throwing away task managers or keeping them will not affect 
the speed of scale
   
   Of course it does. Image you have 2 TMs with 2 vertices each (p=4), now you 
scale down to p=2. If you use 2 TMs with 1 vertex each you can locally recover 
_all_ state. If you throw away 1 TM, you need to download half the state from 
the checkpoint directory.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -730,15 +748,49 @@ public CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoord
                                                         + " does not exist")));
     }
 
+    @Override
+    public JobResourceRequirements requestJobResourceRequirements() {
+        final JobResourceRequirements.Builder builder = 
JobResourceRequirements.newBuilder();
+        for (JobInformation.VertexInformation vertex : 
jobInformation.getVertices()) {
+            builder.setParallelismForJobVertex(vertex.getJobVertexID(), 1, 
vertex.getParallelism());
+        }
+        return builder.build();
+    }
+
+    @Override
+    public void updateJobResourceRequirements(JobResourceRequirements 
jobResourceRequirements) {
+        if (executionMode == SchedulerExecutionMode.REACTIVE) {
+            throw new UnsupportedOperationException(
+                    "Cannot change the parallelism of a job running in 
reactive mode.");
+        }
+        final Optional<VertexParallelismStore> 
maybeUpdateVertexParallelismStore =
+                DefaultVertexParallelismStore.applyJobResourceRequirements(
+                        jobInformation.getVertexParallelismStore(), 
jobResourceRequirements);
+        if (maybeUpdateVertexParallelismStore.isPresent()) {
+            this.jobInformation =
+                    new JobGraphJobInformation(jobGraph, 
maybeUpdateVertexParallelismStore.get());

Review Comment:
   > To scale down, throwing away task managers or keeping them will not affect 
the speed of scale
   
   Of course it does. Imagine you have 2 TMs with 2 vertices each (p=4), now 
you scale down to p=2. If you use 2 TMs with 1 vertex each you can locally 
recover _all_ state. If you throw away 1 TM, you need to download half the 
state from the checkpoint directory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to