rkhachatryan commented on code in PR #22169:
URL: https://github.com/apache/flink/pull/22169#discussion_r1146928351


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1051,35 +1109,25 @@ private ExecutionGraph 
createExecutionGraphAndRestoreState(
     }
 
     @Override
-    public boolean canScaleUp(ExecutionGraph executionGraph) {
-        int availableSlots = 
declarativeSlotPool.getFreeSlotsInformation().size();
-
-        if (availableSlots > 0) {
-            final Optional<VertexParallelism> potentialNewParallelism =
-                    slotAllocator.determineParallelism(
-                            jobInformation, 
declarativeSlotPool.getAllSlotsInformation());
-
-            if (potentialNewParallelism.isPresent()) {
-                int currentCumulativeParallelism = 
getCurrentCumulativeParallelism(executionGraph);
-                int newCumulativeParallelism =
-                        
potentialNewParallelism.get().getCumulativeParallelism();
-                if (newCumulativeParallelism > currentCumulativeParallelism) {
-                    LOG.debug(
-                            "Offering scale up to scale up controller with 
currentCumulativeParallelism={}, newCumulativeParallelism={}",
-                            currentCumulativeParallelism,
-                            newCumulativeParallelism);
-                    return scaleUpController.canScaleUp(
-                            currentCumulativeParallelism, 
newCumulativeParallelism);
-                }
-            }
-        }
-        return false;
+    public boolean shouldRescale(ExecutionGraph executionGraph) {
+        final Optional<VertexParallelism> maybeNewParallelism =
+                slotAllocator.determineParallelism(
+                        jobInformation, 
declarativeSlotPool.getAllSlotsInformation());
+        return maybeNewParallelism
+                .filter(
+                        vertexParallelism ->
+                                rescalingController.shouldRescale(
+                                        getCurrentParallelism(executionGraph), 
vertexParallelism))
+                .isPresent();
     }
 
-    private static int getCurrentCumulativeParallelism(ExecutionGraph 
executionGraph) {
-        return executionGraph.getAllVertices().values().stream()
-                .map(ExecutionJobVertex::getParallelism)
-                .reduce(0, Integer::sum);

Review Comment:
   We can also remove `VertexParallelism.getCumulativeParallelism`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to