sharath1709 commented on code in PR #686: URL: https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r2286286182
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java: ########## @@ -78,6 +88,45 @@ public static void computeDataRateMetrics( } } + private static Optional<Double> getObservedTpr( + Map<FlinkMetric, AggregatedMetric> flinkMetrics, + Map<ScalingMetric, Double> scalingMetrics, + double numRecordsInPerSecond, + Configuration conf) { + + // If there are no incoming records we return infinity to allow scale down + if (numRecordsInPerSecond == 0) { + return Optional.of(Double.POSITIVE_INFINITY); + } + + // We only measure observed tpr when we are catching up, that is when the lag is beyond the + // configured observe threshold + boolean catchingUp = + scalingMetrics.getOrDefault(ScalingMetric.LAG, 0.) + >= conf.get(AutoScalerOptions.OBSERVE_TRUE_PROCESSING_RATE_LAG_THRESHOLD) + .toSeconds() + * numRecordsInPerSecond; + if (!catchingUp) { + return Optional.empty(); + } + + double observedTpr = + computeObservedTprWithBackpressure( + numRecordsInPerSecond, + flinkMetrics.get(FlinkMetric.BACKPRESSURE_TIME_PER_SEC).getAvg()); + + return Double.isNaN(observedTpr) ? Optional.empty() : Optional.of(observedTpr); + } + + public static double computeObservedTprWithBackpressure( Review Comment: I understand that job.autoscaler.catch-up.duration refers to the time to catch up only after the scaling operation. However, I find the assumption that backpressure can always be eliminated in the next scaling to be inaccurate. In practice, I’ve observed cases where a task remains backpressured even when the downstream task shows low utilization, depending on the task’s business logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org