mxm commented on code in PR #966: URL: https://github.com/apache/flink-kubernetes-operator/pull/966#discussion_r2066005803
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -236,6 +248,97 @@ public ParallelismChange computeScaleTargetParallelism( delayedScaleDown); } + /** + * Calculates the scaling coefficient based on historical scaling data. + * + * <p>The scaling coefficient is computed using a weighted least squares approach, where more + * recent data points and those with higher parallelism are given higher weights. If there are + * not enough observations, or if the computed coefficient is invalid, a default value of {@code + * 1.0} is returned, assuming linear scaling. + * + * @param history A {@code SortedMap} of {@code Instant} timestamps to {@code ScalingSummary} + * @param minObservations The minimum number of observations required to compute the scaling + * coefficient. If the number of historical entries is less than this threshold, a default + * coefficient of {@code 1.0} is returned. + * @return The computed scaling coefficient. + */ + @VisibleForTesting + protected static double calculateObservedScalingCoefficient( + SortedMap<Instant, ScalingSummary> history, int minObservations) { + /* + * The scaling coefficient is computed using a **weighted least squares** approach + * to fit a linear model: + * + * R_i = β * P_i * α + * + * where: + * - R_i = observed processing rate + * - P_i = parallelism + * - β = baseline processing rate + * - α = scaling coefficient to optimize + * + * The optimization minimizes the **weighted sum of squared errors**: + * + * Loss = ∑ w_i * (R_i - β * α * P_i)^2 + * + * Differentiating w.r.t. α and solving for α: + * + * α = ∑ (w_i * P_i * R_i) / (∑ (w_i * P_i^2) * β) + * + * We keep the system conservative for higher returns scenario by clamping computed α within 1.0. + */ + + // not enough data to compute scaling coefficient. we assume linear scaling. + if (history.isEmpty() || history.size() < minObservations) { + return 1.0; + } + + var baselineProcessingRate = AutoScalerUtils.computeBaselineProcessingRate(history); + + if (Double.isNaN(baselineProcessingRate)) { + return 1.0; + } + + Instant latestTimestamp = history.lastKey(); + + List<Double> parallelismList = new ArrayList<>(); + List<Double> processingRateList = new ArrayList<>(); + List<Double> weightList = new ArrayList<>(); + + for (Map.Entry<Instant, ScalingSummary> entry : history.entrySet()) { + Instant timestamp = entry.getKey(); + ScalingSummary summary = entry.getValue(); + double parallelism = summary.getCurrentParallelism(); + double processingRate = summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); + + if (Double.isNaN(processingRate)) { + LOG.warn( + "True processing rate is not available in scaling history. Cannot compute scaling coefficient."); + return 1.0; + } + + // Compute weight based on recency & parallelism + double timeDiff = + Duration.between(timestamp, latestTimestamp).getSeconds() + + 1; // Avoid division by zero + double weight = parallelism / timeDiff; Review Comment: I'm also wondering whether applying too much recency bias could hurt the model. Simply weighing by the parallelism should already produce good results. I see how scalability of a pipeline might change over time due to factors like growing state, so maybe using recency bias is smart, as long as the recency influence isn't too strong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org