gyfora commented on code in PR #966: URL: https://github.com/apache/flink-kubernetes-operator/pull/966#discussion_r2065705970
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -236,6 +248,97 @@ public ParallelismChange computeScaleTargetParallelism( delayedScaleDown); } + /** + * Calculates the scaling coefficient based on historical scaling data. + * + * <p>The scaling coefficient is computed using a weighted least squares approach, where more + * recent data points and those with higher parallelism are given higher weights. If there are + * not enough observations, or if the computed coefficient is invalid, a default value of {@code + * 1.0} is returned, assuming linear scaling. + * + * @param history A {@code SortedMap} of {@code Instant} timestamps to {@code ScalingSummary} + * @param minObservations The minimum number of observations required to compute the scaling + * coefficient. If the number of historical entries is less than this threshold, a default + * coefficient of {@code 1.0} is returned. + * @return The computed scaling coefficient. + */ + @VisibleForTesting + protected static double calculateObservedScalingCoefficient( + SortedMap<Instant, ScalingSummary> history, int minObservations) { + /* + * The scaling coefficient is computed using a **weighted least squares** approach + * to fit a linear model: + * + * R_i = β * P_i * α + * + * where: + * - R_i = observed processing rate + * - P_i = parallelism + * - β = baseline processing rate + * - α = scaling coefficient to optimize + * + * The optimization minimizes the **weighted sum of squared errors**: + * + * Loss = ∑ w_i * (R_i - β * α * P_i)^2 + * + * Differentiating w.r.t. α and solving for α: + * + * α = ∑ (w_i * P_i * R_i) / (∑ (w_i * P_i^2) * β) + * + * We keep the system conservative for higher returns scenario by clamping computed α within 1.0. + */ + + // not enough data to compute scaling coefficient. we assume linear scaling. + if (history.isEmpty() || history.size() < minObservations) { + return 1.0; + } + + var baselineProcessingRate = AutoScalerUtils.computeBaselineProcessingRate(history); + + if (Double.isNaN(baselineProcessingRate)) { + return 1.0; + } + + Instant latestTimestamp = history.lastKey(); + + List<Double> parallelismList = new ArrayList<>(); + List<Double> processingRateList = new ArrayList<>(); + List<Double> weightList = new ArrayList<>(); + + for (Map.Entry<Instant, ScalingSummary> entry : history.entrySet()) { + Instant timestamp = entry.getKey(); + ScalingSummary summary = entry.getValue(); + double parallelism = summary.getCurrentParallelism(); + double processingRate = summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); + + if (Double.isNaN(processingRate)) { + LOG.warn( + "True processing rate is not available in scaling history. Cannot compute scaling coefficient."); + return 1.0; + } + + // Compute weight based on recency & parallelism + double timeDiff = + Duration.between(timestamp, latestTimestamp).getSeconds() + + 1; // Avoid division by zero + double weight = parallelism / timeDiff; Review Comment: we could also add an enum configuration with some strategies here if we feel that would be required, but maybe an overkill initially ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -236,6 +248,97 @@ public ParallelismChange computeScaleTargetParallelism( delayedScaleDown); } + /** + * Calculates the scaling coefficient based on historical scaling data. + * + * <p>The scaling coefficient is computed using a weighted least squares approach, where more + * recent data points and those with higher parallelism are given higher weights. If there are + * not enough observations, or if the computed coefficient is invalid, a default value of {@code + * 1.0} is returned, assuming linear scaling. + * + * @param history A {@code SortedMap} of {@code Instant} timestamps to {@code ScalingSummary} + * @param minObservations The minimum number of observations required to compute the scaling + * coefficient. If the number of historical entries is less than this threshold, a default + * coefficient of {@code 1.0} is returned. + * @return The computed scaling coefficient. + */ + @VisibleForTesting + protected static double calculateObservedScalingCoefficient( + SortedMap<Instant, ScalingSummary> history, int minObservations) { + /* + * The scaling coefficient is computed using a **weighted least squares** approach + * to fit a linear model: + * + * R_i = β * P_i * α + * + * where: + * - R_i = observed processing rate + * - P_i = parallelism + * - β = baseline processing rate + * - α = scaling coefficient to optimize + * + * The optimization minimizes the **weighted sum of squared errors**: + * + * Loss = ∑ w_i * (R_i - β * α * P_i)^2 + * + * Differentiating w.r.t. α and solving for α: + * + * α = ∑ (w_i * P_i * R_i) / (∑ (w_i * P_i^2) * β) + * + * We keep the system conservative for higher returns scenario by clamping computed α within 1.0. + */ + + // not enough data to compute scaling coefficient. we assume linear scaling. + if (history.isEmpty() || history.size() < minObservations) { + return 1.0; + } + + var baselineProcessingRate = AutoScalerUtils.computeBaselineProcessingRate(history); + + if (Double.isNaN(baselineProcessingRate)) { + return 1.0; + } + + Instant latestTimestamp = history.lastKey(); + + List<Double> parallelismList = new ArrayList<>(); + List<Double> processingRateList = new ArrayList<>(); + List<Double> weightList = new ArrayList<>(); + + for (Map.Entry<Instant, ScalingSummary> entry : history.entrySet()) { + Instant timestamp = entry.getKey(); + ScalingSummary summary = entry.getValue(); + double parallelism = summary.getCurrentParallelism(); + double processingRate = summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); + + if (Double.isNaN(processingRate)) { + LOG.warn( + "True processing rate is not available in scaling history. Cannot compute scaling coefficient."); + return 1.0; + } + + // Compute weight based on recency & parallelism + double timeDiff = + Duration.between(timestamp, latestTimestamp).getSeconds() + + 1; // Avoid division by zero + double weight = parallelism / timeDiff; Review Comment: Why did you decide on this particular weighting approach? To be specific, what's the benefit compared to: - Not weighting - Using weights based on the difference with the current parallelism (locally weighted regression) I think overall weighting makes sense but maybe weighing based on the parallelism difference ( and time) makes more sense then simply parallelism ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -178,6 +184,12 @@ public ParallelismChange computeScaleTargetParallelism( LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity); double scaleFactor = targetCapacity / averageTrueProcessingRate; + if (conf.get(OBSERVED_SCALABILITY_ENABLED)) { + double scalingCoefficient = + JobVertexScaler.calculateObservedScalingCoefficient( + history, conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS)); + scaleFactor = scaleFactor / scalingCoefficient; Review Comment: We should probably set some limit here, if the scaling coefficient is very small then there is not really a point in scaling at all (similar to the ineffective scaling detection logic, we could probably reuse the set threshold from there) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org