gyfora commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1493771147


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##########
@@ -223,128 +203,65 @@ public static void computeLagMetrics(
         }
     }
 
-    // TODO: FLINK-34213: Consider using accumulated busy time instead of 
busyMsPerSecond
-    private static double getBusyTimeMsPerSecond(
+    private static double getNumRecordsInPerSecond(
             Map<FlinkMetric, AggregatedMetric> flinkMetrics,
-            Configuration conf,
-            JobVertexID jobVertexId) {
-        var busyTimeAggregator = 
conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR);
-        var busyTimeMsPerSecond =
-                
busyTimeAggregator.get(flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC));
-        if (!Double.isFinite(busyTimeMsPerSecond)) {
-            if (AutoScalerUtils.excludeVertexFromScaling(conf, jobVertexId)) {
-                // We only want to log this once
-                LOG.warn(
-                        "No busyTimeMsPerSecond metric available for {}. No 
scaling will be performed for this vertex.",
-                        jobVertexId);
-            }
-            return Double.NaN;
-        }
-        return Math.max(0, busyTimeMsPerSecond);
+            IOMetrics ioMetrics,
+            JobVertexID jobVertexID,
+            boolean isSource) {
+        return getNumRecordsInternal(flinkMetrics, ioMetrics, jobVertexID, 
isSource, true);
     }
 
-    private static double getNumRecordsInPerSecond(
+    private static double getNumRecordsAccumulated(
             Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+            IOMetrics ioMetrics,
             JobVertexID jobVertexID,
             boolean isSource) {
+        return getNumRecordsInternal(flinkMetrics, ioMetrics, jobVertexID, 
isSource, false);
+    }
+
+    private static double getNumRecordsInternal(

Review Comment:
   Added



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java:
##########
@@ -17,30 +17,68 @@
 
 package org.apache.flink.autoscaler.topology;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
-import lombok.RequiredArgsConstructor;
-import lombok.Value;
+import lombok.Data;
+
+import javax.annotation.Nullable;
 
 import java.util.Set;
 
 /** Job vertex information. */
-@Value
-@RequiredArgsConstructor
+@Data
 public class VertexInfo {
 
-    JobVertexID id;
+    private final JobVertexID id;
+
+    private final Set<JobVertexID> inputs;
+
+    private @Nullable Set<JobVertexID> outputs;

Review Comment:
   makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to