sharath1709 commented on code in PR #686:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r2286286182


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##########
@@ -78,6 +88,45 @@ public static void computeDataRateMetrics(
         }
     }
 
+    private static Optional<Double> getObservedTpr(
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+            Map<ScalingMetric, Double> scalingMetrics,
+            double numRecordsInPerSecond,
+            Configuration conf) {
+
+        // If there are no incoming records we return infinity to allow scale 
down
+        if (numRecordsInPerSecond == 0) {
+            return Optional.of(Double.POSITIVE_INFINITY);
+        }
+
+        // We only measure observed tpr when we are catching up, that is when 
the lag is beyond the
+        // configured observe threshold
+        boolean catchingUp =
+                scalingMetrics.getOrDefault(ScalingMetric.LAG, 0.)
+                        >= 
conf.get(AutoScalerOptions.OBSERVE_TRUE_PROCESSING_RATE_LAG_THRESHOLD)
+                                        .toSeconds()
+                                * numRecordsInPerSecond;
+        if (!catchingUp) {
+            return Optional.empty();
+        }
+
+        double observedTpr =
+                computeObservedTprWithBackpressure(
+                        numRecordsInPerSecond,
+                        
flinkMetrics.get(FlinkMetric.BACKPRESSURE_TIME_PER_SEC).getAvg());
+
+        return Double.isNaN(observedTpr) ? Optional.empty() : 
Optional.of(observedTpr);
+    }
+
+    public static double computeObservedTprWithBackpressure(

Review Comment:
   I understand that job.autoscaler.catch-up.duration refers to the time to 
catch up only after the scaling operation. However, I find the assumption that 
backpressure can always be eliminated in the next scaling to be inaccurate. In 
practice, I’ve observed cases where a task remains backpressured even when the 
downstream task shows low utilization, depending on the task’s business logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to