gyfora commented on code in PR #966:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/966#discussion_r2065705970


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -236,6 +248,97 @@ public ParallelismChange computeScaleTargetParallelism(
                 delayedScaleDown);
     }
 
+    /**
+     * Calculates the scaling coefficient based on historical scaling data.
+     *
+     * <p>The scaling coefficient is computed using a weighted least squares 
approach, where more
+     * recent data points and those with higher parallelism are given higher 
weights. If there are
+     * not enough observations, or if the computed coefficient is invalid, a 
default value of {@code
+     * 1.0} is returned, assuming linear scaling.
+     *
+     * @param history A {@code SortedMap} of {@code Instant} timestamps to 
{@code ScalingSummary}
+     * @param minObservations The minimum number of observations required to 
compute the scaling
+     *     coefficient. If the number of historical entries is less than this 
threshold, a default
+     *     coefficient of {@code 1.0} is returned.
+     * @return The computed scaling coefficient.
+     */
+    @VisibleForTesting
+    protected static double calculateObservedScalingCoefficient(
+            SortedMap<Instant, ScalingSummary> history, int minObservations) {
+        /*
+         * The scaling coefficient is computed using a **weighted least 
squares** approach
+         * to fit a linear model:
+         *
+         *      R_i = β * P_i * α
+         *
+         * where:
+         * - R_i = observed processing rate
+         * - P_i = parallelism
+         * - β   = baseline processing rate
+         * - α   = scaling coefficient to optimize
+         *
+         * The optimization minimizes the **weighted sum of squared errors**:
+         *
+         *      Loss = ∑ w_i * (R_i - β * α * P_i)^2
+         *
+         * Differentiating w.r.t. α and solving for α:
+         *
+         *      α = ∑ (w_i * P_i * R_i) / (∑ (w_i * P_i^2) * β)
+         *
+         * We keep the system conservative for higher returns scenario by 
clamping computed α within 1.0.
+         */
+
+        // not enough data to compute scaling coefficient. we assume linear 
scaling.
+        if (history.isEmpty() || history.size() < minObservations) {
+            return 1.0;
+        }
+
+        var baselineProcessingRate = 
AutoScalerUtils.computeBaselineProcessingRate(history);
+
+        if (Double.isNaN(baselineProcessingRate)) {
+            return 1.0;
+        }
+
+        Instant latestTimestamp = history.lastKey();
+
+        List<Double> parallelismList = new ArrayList<>();
+        List<Double> processingRateList = new ArrayList<>();
+        List<Double> weightList = new ArrayList<>();
+
+        for (Map.Entry<Instant, ScalingSummary> entry : history.entrySet()) {
+            Instant timestamp = entry.getKey();
+            ScalingSummary summary = entry.getValue();
+            double parallelism = summary.getCurrentParallelism();
+            double processingRate = 
summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
+
+            if (Double.isNaN(processingRate)) {
+                LOG.warn(
+                        "True processing rate is not available in scaling 
history. Cannot compute scaling coefficient.");
+                return 1.0;
+            }
+
+            // Compute weight based on recency & parallelism
+            double timeDiff =
+                    Duration.between(timestamp, latestTimestamp).getSeconds()
+                            + 1; // Avoid division by zero
+            double weight = parallelism / timeDiff;

Review Comment:
   we could also add an enum configuration with some strategies here if we feel 
that would be required, but maybe an overkill initially



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -236,6 +248,97 @@ public ParallelismChange computeScaleTargetParallelism(
                 delayedScaleDown);
     }
 
+    /**
+     * Calculates the scaling coefficient based on historical scaling data.
+     *
+     * <p>The scaling coefficient is computed using a weighted least squares 
approach, where more
+     * recent data points and those with higher parallelism are given higher 
weights. If there are
+     * not enough observations, or if the computed coefficient is invalid, a 
default value of {@code
+     * 1.0} is returned, assuming linear scaling.
+     *
+     * @param history A {@code SortedMap} of {@code Instant} timestamps to 
{@code ScalingSummary}
+     * @param minObservations The minimum number of observations required to 
compute the scaling
+     *     coefficient. If the number of historical entries is less than this 
threshold, a default
+     *     coefficient of {@code 1.0} is returned.
+     * @return The computed scaling coefficient.
+     */
+    @VisibleForTesting
+    protected static double calculateObservedScalingCoefficient(
+            SortedMap<Instant, ScalingSummary> history, int minObservations) {
+        /*
+         * The scaling coefficient is computed using a **weighted least 
squares** approach
+         * to fit a linear model:
+         *
+         *      R_i = β * P_i * α
+         *
+         * where:
+         * - R_i = observed processing rate
+         * - P_i = parallelism
+         * - β   = baseline processing rate
+         * - α   = scaling coefficient to optimize
+         *
+         * The optimization minimizes the **weighted sum of squared errors**:
+         *
+         *      Loss = ∑ w_i * (R_i - β * α * P_i)^2
+         *
+         * Differentiating w.r.t. α and solving for α:
+         *
+         *      α = ∑ (w_i * P_i * R_i) / (∑ (w_i * P_i^2) * β)
+         *
+         * We keep the system conservative for higher returns scenario by 
clamping computed α within 1.0.
+         */
+
+        // not enough data to compute scaling coefficient. we assume linear 
scaling.
+        if (history.isEmpty() || history.size() < minObservations) {
+            return 1.0;
+        }
+
+        var baselineProcessingRate = 
AutoScalerUtils.computeBaselineProcessingRate(history);
+
+        if (Double.isNaN(baselineProcessingRate)) {
+            return 1.0;
+        }
+
+        Instant latestTimestamp = history.lastKey();
+
+        List<Double> parallelismList = new ArrayList<>();
+        List<Double> processingRateList = new ArrayList<>();
+        List<Double> weightList = new ArrayList<>();
+
+        for (Map.Entry<Instant, ScalingSummary> entry : history.entrySet()) {
+            Instant timestamp = entry.getKey();
+            ScalingSummary summary = entry.getValue();
+            double parallelism = summary.getCurrentParallelism();
+            double processingRate = 
summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
+
+            if (Double.isNaN(processingRate)) {
+                LOG.warn(
+                        "True processing rate is not available in scaling 
history. Cannot compute scaling coefficient.");
+                return 1.0;
+            }
+
+            // Compute weight based on recency & parallelism
+            double timeDiff =
+                    Duration.between(timestamp, latestTimestamp).getSeconds()
+                            + 1; // Avoid division by zero
+            double weight = parallelism / timeDiff;

Review Comment:
   Why did you decide on this particular weighting approach? To be specific, 
what's the benefit compared to:
   
    - Not weighting
    - Using weights based on the difference with the current parallelism 
(locally weighted regression)
    
    I think overall weighting makes sense but maybe weighing based on the 
parallelism difference ( and time) makes more sense then simply parallelism



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -178,6 +184,12 @@ public ParallelismChange computeScaleTargetParallelism(
 
         LOG.debug("Target processing capacity for {} is {}", vertex, 
targetCapacity);
         double scaleFactor = targetCapacity / averageTrueProcessingRate;
+        if (conf.get(OBSERVED_SCALABILITY_ENABLED)) {
+            double scalingCoefficient =
+                    JobVertexScaler.calculateObservedScalingCoefficient(
+                            history, 
conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS));
+            scaleFactor = scaleFactor / scalingCoefficient;

Review Comment:
   We should probably set some limit here, if the scaling coefficient is very 
small then there is not really a point in scaling at all (similar to the 
ineffective scaling detection logic, we could probably reuse the set threshold 
from there)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to