mxm commented on code in PR #789: URL: https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1511294060
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java: ########## @@ -97,10 +109,6 @@ public boolean isSource(JobVertexID jobVertexID) { return get(jobVertexID).getInputs().isEmpty(); } - public void updateMaxParallelism(JobVertexID vertexID, int maxParallelism) { - get(vertexID).updateMaxParallelism(maxParallelism); Review Comment: Why are removing this logic? This prevents moving beyond the original max parallelism. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ########## @@ -199,6 +214,85 @@ protected static boolean allVerticesWithinUtilizationTarget( return true; } + protected static boolean resourceQuotaReached( + Configuration conf, + EvaluatedMetrics evaluatedMetrics, + Map<JobVertexID, ScalingSummary> scalingSummaries, + JobAutoScalerContext<?> ctx) { + + if (evaluatedMetrics.getJobTopology() == null + || evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().isEmpty()) { + return false; + } + + var cpuQuota = conf.getOptional(AutoScalerOptions.CPU_QUOTA); + var memoryQuota = conf.getOptional(AutoScalerOptions.MEMORY_QUOTA); + var tmMemory = ctx.getTaskManagerMemory(); + var tmCpu = ctx.getTaskManagerCpu(); + + if (cpuQuota.isPresent() || memoryQuota.isPresent()) { + var currentSlotSharingGroupMaxParallelisms = new HashMap<SlotSharingGroupId, Integer>(); + var newSlotSharingGroupMaxParallelisms = new HashMap<SlotSharingGroupId, Integer>(); + for (var e : + evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().entrySet()) { Review Comment: What if the Flink version doesn't support returning the slot sharing group information via the Rest API? ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ########## @@ -199,6 +214,85 @@ protected static boolean allVerticesWithinUtilizationTarget( return true; } + protected static boolean resourceQuotaReached( Review Comment: There is a already a similar resource check in this file (`scalingWouldExceedClusterResources`). I think we could unify the two approaches where they share the same input (e.g. task slots usage, num TMs). ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ########## @@ -254,7 +263,7 @@ private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopol "Updating source {} max parallelism based on available partitions to {}", sourceVertex, numPartitions); - topology.updateMaxParallelism(sourceVertex, (int) numPartitions); + topology.get(sourceVertex).setMaxParallelism((int) numPartitions); Review Comment: I'm not sure that is correct. That means some source partitions will be idle when the provided parallelism exceeds the orignal max parallelism. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ########## @@ -199,6 +214,85 @@ protected static boolean allVerticesWithinUtilizationTarget( return true; } + protected static boolean resourceQuotaReached( + Configuration conf, + EvaluatedMetrics evaluatedMetrics, + Map<JobVertexID, ScalingSummary> scalingSummaries, + JobAutoScalerContext<?> ctx) { + + if (evaluatedMetrics.getJobTopology() == null + || evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().isEmpty()) { + return false; + } + + var cpuQuota = conf.getOptional(AutoScalerOptions.CPU_QUOTA); + var memoryQuota = conf.getOptional(AutoScalerOptions.MEMORY_QUOTA); + var tmMemory = ctx.getTaskManagerMemory(); + var tmCpu = ctx.getTaskManagerCpu(); + + if (cpuQuota.isPresent() || memoryQuota.isPresent()) { + var currentSlotSharingGroupMaxParallelisms = new HashMap<SlotSharingGroupId, Integer>(); + var newSlotSharingGroupMaxParallelisms = new HashMap<SlotSharingGroupId, Integer>(); + for (var e : + evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().entrySet()) { Review Comment: We should have some workaround similarly to the `scalingWouldExceedClusterResources` method which checks whether the maximum found parallelism is identical to the total number of task slots to determine that slot sharing is enabled and otherwise assume it isn't. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedMetrics.java: ########## @@ -30,6 +31,7 @@ @NoArgsConstructor @AllArgsConstructor public class EvaluatedMetrics { + private JobTopology jobTopology; Review Comment: This is not required, `JobTopology` is already available as a parameter to the `scale` method. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java: ########## @@ -115,6 +119,17 @@ public boolean scaleResource( return false; } + if (resourceQuotaReached(conf, evaluatedMetrics, scalingSummaries, context)) { + autoScalerEventHandler.handleEvent( + context, + AutoScalerEventHandler.Type.Warning, + "ResourceQuotaReached", + RESOURCE_QUOTA_REACHED_MESSAGE, + null, + conf.get(SCALING_EVENT_INTERVAL)); + return false; + } Review Comment: Is this the right place to call this method? I think we want to call it after updating the recommended parallelism and after the memory tuning (e.g. line 132). Memory tuning could potentially fix any quota issues. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org