rkhachatryan commented on code in PR #22169: URL: https://github.com/apache/flink/pull/22169#discussion_r1146928351
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java: ########## @@ -1051,35 +1109,25 @@ private ExecutionGraph createExecutionGraphAndRestoreState( } @Override - public boolean canScaleUp(ExecutionGraph executionGraph) { - int availableSlots = declarativeSlotPool.getFreeSlotsInformation().size(); - - if (availableSlots > 0) { - final Optional<VertexParallelism> potentialNewParallelism = - slotAllocator.determineParallelism( - jobInformation, declarativeSlotPool.getAllSlotsInformation()); - - if (potentialNewParallelism.isPresent()) { - int currentCumulativeParallelism = getCurrentCumulativeParallelism(executionGraph); - int newCumulativeParallelism = - potentialNewParallelism.get().getCumulativeParallelism(); - if (newCumulativeParallelism > currentCumulativeParallelism) { - LOG.debug( - "Offering scale up to scale up controller with currentCumulativeParallelism={}, newCumulativeParallelism={}", - currentCumulativeParallelism, - newCumulativeParallelism); - return scaleUpController.canScaleUp( - currentCumulativeParallelism, newCumulativeParallelism); - } - } - } - return false; + public boolean shouldRescale(ExecutionGraph executionGraph) { + final Optional<VertexParallelism> maybeNewParallelism = + slotAllocator.determineParallelism( + jobInformation, declarativeSlotPool.getAllSlotsInformation()); + return maybeNewParallelism + .filter( + vertexParallelism -> + rescalingController.shouldRescale( + getCurrentParallelism(executionGraph), vertexParallelism)) + .isPresent(); } - private static int getCurrentCumulativeParallelism(ExecutionGraph executionGraph) { - return executionGraph.getAllVertices().values().stream() - .map(ExecutionJobVertex::getParallelism) - .reduce(0, Integer::sum); Review Comment: We can also remove `VertexParallelism.getCumulativeParallelism`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org