Tan Kim created FLINK-31977: ------------------------------- Summary: If scaling.effectiveness.detection.enabled is false, the call to the detectIneffectiveScaleUp() function is unnecessary Key: FLINK-31977 URL: https://issues.apache.org/jira/browse/FLINK-31977 Project: Flink Issue Type: Improvement Components: Autoscaler Affects Versions: 1.17.0 Reporter: Tan Kim
The code below is a function to detect inefficient scaleups. It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED (scaling.effectiveness.detection.enabled) is true after all the necessary computations for detection, but this is an unnecessary computation. {code:java} JobVertexScaler.java #175 private boolean detectIneffectiveScaleUp( AbstractFlinkResource<?, ?> resource, JobVertexID vertex, Configuration conf, Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics, ScalingSummary lastSummary) { double lastProcRate = lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); // 22569.315633422066 double lastExpectedProcRate = lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); // 37340.0 var currentProcRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage(); // To judge the effectiveness of the scale up operation we compute how much of the expected // increase actually happened. For example if we expect a 100 increase in proc rate and only // got an increase of 10 we only accomplished 10% of the desired increase. If this number is // below the threshold, we mark the scaling ineffective. double expectedIncrease = lastExpectedProcRate - lastProcRate; double actualIncrease = currentProcRate - lastProcRate; boolean withinEffectiveThreshold = (actualIncrease / expectedIncrease) >= conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD); if (withinEffectiveThreshold) { return false; } var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex); eventRecorder.triggerEvent( resource, EventRecorder.Type.Normal, EventRecorder.Reason.IneffectiveScaling, EventRecorder.Component.Operator, message); if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) { LOG.info(message); return true; } else { return false; } } {code} It's better to check SCALING_EFFECTIVENESS_DETECTION_ENABLED beforehand and then call the function, as shown in the if statement in the code below, which is the caller of this function. {code:java} JobVertexScaler.java #150 if (currentParallelism == lastSummary.getNewParallelism() && lastSummary.isScaledUp()) { if (scaledUp) { if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) { detectIneffectiveScaleUp(resource, vertex, conf, evaluatedMetrics, lastSummary); } else { return true; } } else { return detectImmediateScaleDownAfterScaleUp(vertex, conf, lastScalingTs); } }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)