1996fanrui commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1426576655
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ########## @@ -219,6 +219,22 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); + public static final ConfigOption<Double> GC_PRESSURE_THRESHOLD = + autoScalerConfig("memory.gc-pressure.threshold") + .doubleType() + .defaultValue(0.3) + .withFallbackKeys(oldOperatorConfigKey("memory.gc-pressure.threshold")) + .withDescription( + "Max allowed GC pressure (percentage spent garbage collecting) during scaling operations. Autoscaling will be paused if the GC pressure exceeds this limit."); + + public static final ConfigOption<Double> HEAP_USAGE_THRESHOLD = + autoScalerConfig("memory.heap-usage.threshold") + .doubleType() + .defaultValue(1.) + .withFallbackKeys(oldOperatorConfigKey("memory.heap-usage.threshold")) Review Comment: `oldOperatorConfigKey` is not needed, right? `GC_PRESSURE_THRESHOLD` is same as well. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java: ########## @@ -65,15 +69,35 @@ public enum ScalingMetric { SCALE_DOWN_RATE_THRESHOLD(false), /** Expected true processing rate after scale up. */ - EXPECTED_PROCESSING_RATE(false); + EXPECTED_PROCESSING_RATE(false), - private final boolean calculateAverage; + /** + * Maximum GC pressure across taskmanagers. Percentage of time spent garbage collecting between + * 0 (no time in GC) and 1 (100% time in GC). + */ + GC_PRESSURE(false), + + /** Percentage of max heap used (between 0 and 1). */ + HEAP_USAGE(true); + + @Getter private final boolean calculateAverage; + + /** List of {@link ScalingMetric}s to be reported as per vertex Flink metrics. */ + public static final List<ScalingMetric> REPORTED_VERTEX_METRICS = + List.of( + LOAD, + TRUE_PROCESSING_RATE, + TARGET_DATA_RATE, + CATCH_UP_DATA_RATE, + LAG, + PARALLELISM, + RECOMMENDED_PARALLELISM, + MAX_PARALLELISM, + SCALE_UP_RATE_THRESHOLD, + SCALE_DOWN_RATE_THRESHOLD, + EXPECTED_PROCESSING_RATE); Review Comment: > I have added a test to compare the list of evaluated metrics to this to help developers when adding new metrics. Sorry, could you please clarify which test will fail when one new metric should be added to `REPORTED_VERTEX_METRICS` set but developers forget it? I didn't find it. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java: ########## @@ -78,19 +99,89 @@ protected Map<FlinkMetric, AggregatedMetric> queryAggregatedVertexMetrics( EmptyRequestBody.getInstance()) .get(); - return responseBody.getMetrics().stream() - .collect( - Collectors.toMap( - m -> metrics.get(m.getId()), - m -> m, - (m1, m2) -> - new AggregatedMetric( - m1.getId() + " merged with " + m2.getId(), - Math.min(m1.getMin(), m2.getMin()), - Math.max(m1.getMax(), m2.getMax()), - // Average can't be computed - Double.NaN, - m1.getSum() + m2.getSum()))); + return aggregateByFlinkMetric(metrics, responseBody); } } + + protected Map<FlinkMetric, AggregatedMetric> queryTmMetrics(Context ctx) throws Exception { + try (var restClient = ctx.getRestClusterClient()) { + boolean hasGcMetrics = + jobsWithGcMetrics.computeIfAbsent( + ctx.getJobKey(), + k -> { + boolean gcMetricsFound = + !queryAggregatedTmMetrics( + restClient, TM_METRIC_NAMES_WITH_GC) + .isEmpty(); Review Comment: Sorry, I don't understand this code logic. The code is that when the `Heap.Max` and `Heap.Used` are found, but `GarbageCollector.All.TimeMsPerSecond` isn't found. The `hasGcMetrics` and `gcMetricsFound` will be true, and we will query `GarbageCollector.All.TimeMsPerSecond` in the future, right? Please correct me if my understanding is wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org