mxm commented on code in PR #966:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/966#discussion_r2066005803


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -236,6 +248,97 @@ public ParallelismChange computeScaleTargetParallelism(
                 delayedScaleDown);
     }
 
+    /**
+     * Calculates the scaling coefficient based on historical scaling data.
+     *
+     * <p>The scaling coefficient is computed using a weighted least squares 
approach, where more
+     * recent data points and those with higher parallelism are given higher 
weights. If there are
+     * not enough observations, or if the computed coefficient is invalid, a 
default value of {@code
+     * 1.0} is returned, assuming linear scaling.
+     *
+     * @param history A {@code SortedMap} of {@code Instant} timestamps to 
{@code ScalingSummary}
+     * @param minObservations The minimum number of observations required to 
compute the scaling
+     *     coefficient. If the number of historical entries is less than this 
threshold, a default
+     *     coefficient of {@code 1.0} is returned.
+     * @return The computed scaling coefficient.
+     */
+    @VisibleForTesting
+    protected static double calculateObservedScalingCoefficient(
+            SortedMap<Instant, ScalingSummary> history, int minObservations) {
+        /*
+         * The scaling coefficient is computed using a **weighted least 
squares** approach
+         * to fit a linear model:
+         *
+         *      R_i = β * P_i * α
+         *
+         * where:
+         * - R_i = observed processing rate
+         * - P_i = parallelism
+         * - β   = baseline processing rate
+         * - α   = scaling coefficient to optimize
+         *
+         * The optimization minimizes the **weighted sum of squared errors**:
+         *
+         *      Loss = ∑ w_i * (R_i - β * α * P_i)^2
+         *
+         * Differentiating w.r.t. α and solving for α:
+         *
+         *      α = ∑ (w_i * P_i * R_i) / (∑ (w_i * P_i^2) * β)
+         *
+         * We keep the system conservative for higher returns scenario by 
clamping computed α within 1.0.
+         */
+
+        // not enough data to compute scaling coefficient. we assume linear 
scaling.
+        if (history.isEmpty() || history.size() < minObservations) {
+            return 1.0;
+        }
+
+        var baselineProcessingRate = 
AutoScalerUtils.computeBaselineProcessingRate(history);
+
+        if (Double.isNaN(baselineProcessingRate)) {
+            return 1.0;
+        }
+
+        Instant latestTimestamp = history.lastKey();
+
+        List<Double> parallelismList = new ArrayList<>();
+        List<Double> processingRateList = new ArrayList<>();
+        List<Double> weightList = new ArrayList<>();
+
+        for (Map.Entry<Instant, ScalingSummary> entry : history.entrySet()) {
+            Instant timestamp = entry.getKey();
+            ScalingSummary summary = entry.getValue();
+            double parallelism = summary.getCurrentParallelism();
+            double processingRate = 
summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
+
+            if (Double.isNaN(processingRate)) {
+                LOG.warn(
+                        "True processing rate is not available in scaling 
history. Cannot compute scaling coefficient.");
+                return 1.0;
+            }
+
+            // Compute weight based on recency & parallelism
+            double timeDiff =
+                    Duration.between(timestamp, latestTimestamp).getSeconds()
+                            + 1; // Avoid division by zero
+            double weight = parallelism / timeDiff;

Review Comment:
   I'm also wondering whether applying too much recency bias could hurt the 
model. Simply weighing by the parallelism should already produce good results. 
I see how scalability of a pipeline might change over time due to factors like 
growing state, so maybe using recency bias is smart, as long as the recency 
influence isn't too strong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to