afedulov commented on code in PR #735:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/735#discussion_r1446480705

##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##########
@@ -182,22 +182,22 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
                 jobTopology.getVerticesInTopologicalOrder(),
                 () -> lastEvaluatedMetrics.get(ctx.getJobKey()));
 
-        if (!collectedMetrics.isFullyCollected()) {
-            // We have done an upfront evaluation, but we are not ready for 
scaling.
-            resetRecommendedParallelism(evaluatedMetrics);
-            return;
-        }
-
         var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
         // A scaling tracking without an end time gets created whenever a 
scaling decision is
         // applied. Here, when the job transitions to RUNNING, we record the 
time for it.
         if (ctx.getJobStatus() == JobStatus.RUNNING) {
-            if (scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
+            if 
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
                     now, jobTopology, scalingHistory)) {
                 stateStore.storeScalingTracking(ctx, scalingTracking);
             }
         }

Review Comment:
   > It could be part of stateStore.storeScalingTracking(ctx, scalingTracking); 
even
    I removed the redundant RUNNING check, as Max recommended, so it looks more 
straightforward now. Pushing this call down into the `storeScalingTracking` 
would make it harder to reason, since it is key that `runRescaleLogic` is only 
executed when the job is in the RUNNING state and hence the transition is 
considered complete. It also does not seem right to bundle the logic specific 
to this concrete situation into `KubernetesAutoScalerStateStore` which acts 
more as a simple persistence layer. Hope this is fine by you.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##########
@@ -182,22 +182,22 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
                 jobTopology.getVerticesInTopologicalOrder(),
                 () -> lastEvaluatedMetrics.get(ctx.getJobKey()));
 
-        if (!collectedMetrics.isFullyCollected()) {
-            // We have done an upfront evaluation, but we are not ready for 
scaling.
-            resetRecommendedParallelism(evaluatedMetrics);
-            return;
-        }
-
         var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
         // A scaling tracking without an end time gets created whenever a 
scaling decision is
         // applied. Here, when the job transitions to RUNNING, we record the 
time for it.
         if (ctx.getJobStatus() == JobStatus.RUNNING) {
-            if (scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
+            if 
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
                     now, jobTopology, scalingHistory)) {
                 stateStore.storeScalingTracking(ctx, scalingTracking);
             }
         }

Review Comment:
   Good catch, thanks.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##########
@@ -182,22 +182,22 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
                 jobTopology.getVerticesInTopologicalOrder(),
                 () -> lastEvaluatedMetrics.get(ctx.getJobKey()));
 
-        if (!collectedMetrics.isFullyCollected()) {
-            // We have done an upfront evaluation, but we are not ready for 
scaling.
-            resetRecommendedParallelism(evaluatedMetrics);
-            return;
-        }
-
         var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now);
         // A scaling tracking without an end time gets created whenever a 
scaling decision is
         // applied. Here, when the job transitions to RUNNING, we record the 
time for it.
         if (ctx.getJobStatus() == JobStatus.RUNNING) {
-            if (scalingTracking.setEndTimeIfTrackedAndParallelismMatches(
+            if 
(scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches(
                     now, jobTopology, scalingHistory)) {
                 stateStore.storeScalingTracking(ctx, scalingTracking);
             }
         }

Review Comment:
   > It could be part of stateStore.storeScalingTracking(ctx, scalingTracking); 
even
   
    I removed the redundant RUNNING check, as Max recommended, so it looks more 
straightforward now. Pushing this call down into the `storeScalingTracking` 
would make it harder to reason, since it is key that `runRescaleLogic` is only 
executed when the job is in the RUNNING state and hence the transition is 
considered complete. It also does not seem right to bundle the logic specific 
to this concrete situation into `KubernetesAutoScalerStateStore` which acts 
more as a simple persistence layer. Hope this is fine by you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to