mxm commented on code in PR #789:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/789#discussion_r1511294060


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java:
##########
@@ -97,10 +109,6 @@ public boolean isSource(JobVertexID jobVertexID) {
         return get(jobVertexID).getInputs().isEmpty();
     }
 
-    public void updateMaxParallelism(JobVertexID vertexID, int maxParallelism) 
{
-        get(vertexID).updateMaxParallelism(maxParallelism);

Review Comment:
   Why are removing this logic? This prevents moving beyond the original max 
parallelism.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -199,6 +214,85 @@ protected static boolean 
allVerticesWithinUtilizationTarget(
         return true;
     }
 
+    protected static boolean resourceQuotaReached(
+            Configuration conf,
+            EvaluatedMetrics evaluatedMetrics,
+            Map<JobVertexID, ScalingSummary> scalingSummaries,
+            JobAutoScalerContext<?> ctx) {
+
+        if (evaluatedMetrics.getJobTopology() == null
+                || 
evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().isEmpty()) {
+            return false;
+        }
+
+        var cpuQuota = conf.getOptional(AutoScalerOptions.CPU_QUOTA);
+        var memoryQuota = conf.getOptional(AutoScalerOptions.MEMORY_QUOTA);
+        var tmMemory = ctx.getTaskManagerMemory();
+        var tmCpu = ctx.getTaskManagerCpu();
+
+        if (cpuQuota.isPresent() || memoryQuota.isPresent()) {
+            var currentSlotSharingGroupMaxParallelisms = new 
HashMap<SlotSharingGroupId, Integer>();
+            var newSlotSharingGroupMaxParallelisms = new 
HashMap<SlotSharingGroupId, Integer>();
+            for (var e :
+                    
evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().entrySet()) {

Review Comment:
   What if the Flink version doesn't support returning the slot sharing group 
information via the Rest API?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -199,6 +214,85 @@ protected static boolean 
allVerticesWithinUtilizationTarget(
         return true;
     }
 
+    protected static boolean resourceQuotaReached(

Review Comment:
   There is a already a similar resource check in this file 
(`scalingWouldExceedClusterResources`). I think we could unify the two 
approaches where they share the same input (e.g. task slots usage, num TMs). 



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##########
@@ -254,7 +263,7 @@ private void updateKafkaSourceMaxParallelisms(Context ctx, 
JobID jobId, JobTopol
                                 "Updating source {} max parallelism based on 
available partitions to {}",
                                 sourceVertex,
                                 numPartitions);
-                        topology.updateMaxParallelism(sourceVertex, (int) 
numPartitions);
+                        topology.get(sourceVertex).setMaxParallelism((int) 
numPartitions);

Review Comment:
   I'm not sure that is correct. That means some source partitions will be idle 
when the provided parallelism exceeds the orignal max parallelism.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -199,6 +214,85 @@ protected static boolean 
allVerticesWithinUtilizationTarget(
         return true;
     }
 
+    protected static boolean resourceQuotaReached(
+            Configuration conf,
+            EvaluatedMetrics evaluatedMetrics,
+            Map<JobVertexID, ScalingSummary> scalingSummaries,
+            JobAutoScalerContext<?> ctx) {
+
+        if (evaluatedMetrics.getJobTopology() == null
+                || 
evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().isEmpty()) {
+            return false;
+        }
+
+        var cpuQuota = conf.getOptional(AutoScalerOptions.CPU_QUOTA);
+        var memoryQuota = conf.getOptional(AutoScalerOptions.MEMORY_QUOTA);
+        var tmMemory = ctx.getTaskManagerMemory();
+        var tmCpu = ctx.getTaskManagerCpu();
+
+        if (cpuQuota.isPresent() || memoryQuota.isPresent()) {
+            var currentSlotSharingGroupMaxParallelisms = new 
HashMap<SlotSharingGroupId, Integer>();
+            var newSlotSharingGroupMaxParallelisms = new 
HashMap<SlotSharingGroupId, Integer>();
+            for (var e :
+                    
evaluatedMetrics.getJobTopology().getSlotSharingGroupMapping().entrySet()) {

Review Comment:
   We should have some workaround similarly to the 
`scalingWouldExceedClusterResources` method which checks whether the maximum 
found parallelism is identical to the total number of task slots to determine 
that slot sharing is enabled and otherwise assume it isn't.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedMetrics.java:
##########
@@ -30,6 +31,7 @@
 @NoArgsConstructor
 @AllArgsConstructor
 public class EvaluatedMetrics {
+    private JobTopology jobTopology;

Review Comment:
   This is not required, `JobTopology` is already available as a parameter to 
the `scale` method.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -115,6 +119,17 @@ public boolean scaleResource(
             return false;
         }
 
+        if (resourceQuotaReached(conf, evaluatedMetrics, scalingSummaries, 
context)) {
+            autoScalerEventHandler.handleEvent(
+                    context,
+                    AutoScalerEventHandler.Type.Warning,
+                    "ResourceQuotaReached",
+                    RESOURCE_QUOTA_REACHED_MESSAGE,
+                    null,
+                    conf.get(SCALING_EVENT_INTERVAL));
+            return false;
+        }

Review Comment:
   Is this the right place to call this method? I think we want to call it 
after updating the recommended parallelism and after the memory tuning (e.g. 
line 132). Memory tuning could potentially fix any quota issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to