afedulov commented on code in PR #735: URL: https://github.com/apache/flink-kubernetes-operator/pull/735#discussion_r1446480705
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java: ########## @@ -182,22 +182,22 @@ private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetri jobTopology.getVerticesInTopologicalOrder(), () -> lastEvaluatedMetrics.get(ctx.getJobKey())); - if (!collectedMetrics.isFullyCollected()) { - // We have done an upfront evaluation, but we are not ready for scaling. - resetRecommendedParallelism(evaluatedMetrics); - return; - } - var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now); // A scaling tracking without an end time gets created whenever a scaling decision is // applied. Here, when the job transitions to RUNNING, we record the time for it. if (ctx.getJobStatus() == JobStatus.RUNNING) { - if (scalingTracking.setEndTimeIfTrackedAndParallelismMatches( + if (scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches( now, jobTopology, scalingHistory)) { stateStore.storeScalingTracking(ctx, scalingTracking); } } Review Comment: > It could be part of stateStore.storeScalingTracking(ctx, scalingTracking); even I removed the redundant RUNNING check, as Max recommended, so it looks more straightforward now. Pushing this call down into the `storeScalingTracking` would make it harder to reason, since it is key that `runRescaleLogic` is only executed when the job is in the RUNNING state and hence the transition is considered complete. It also does not seem right to bundle the logic specific to this concrete situation into `KubernetesAutoScalerStateStore` which acts more as a simple persistence layer. Hope this is fine by you. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java: ########## @@ -182,22 +182,22 @@ private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetri jobTopology.getVerticesInTopologicalOrder(), () -> lastEvaluatedMetrics.get(ctx.getJobKey())); - if (!collectedMetrics.isFullyCollected()) { - // We have done an upfront evaluation, but we are not ready for scaling. - resetRecommendedParallelism(evaluatedMetrics); - return; - } - var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now); // A scaling tracking without an end time gets created whenever a scaling decision is // applied. Here, when the job transitions to RUNNING, we record the time for it. if (ctx.getJobStatus() == JobStatus.RUNNING) { - if (scalingTracking.setEndTimeIfTrackedAndParallelismMatches( + if (scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches( now, jobTopology, scalingHistory)) { stateStore.storeScalingTracking(ctx, scalingTracking); } } Review Comment: Good catch, thanks. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java: ########## @@ -182,22 +182,22 @@ private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetri jobTopology.getVerticesInTopologicalOrder(), () -> lastEvaluatedMetrics.get(ctx.getJobKey())); - if (!collectedMetrics.isFullyCollected()) { - // We have done an upfront evaluation, but we are not ready for scaling. - resetRecommendedParallelism(evaluatedMetrics); - return; - } - var scalingHistory = getTrimmedScalingHistory(stateStore, ctx, now); // A scaling tracking without an end time gets created whenever a scaling decision is // applied. Here, when the job transitions to RUNNING, we record the time for it. if (ctx.getJobStatus() == JobStatus.RUNNING) { - if (scalingTracking.setEndTimeIfTrackedAndParallelismMatches( + if (scalingTracking.recordRestartDurationIfTrackedAndParallelismMatches( now, jobTopology, scalingHistory)) { stateStore.storeScalingTracking(ctx, scalingTracking); } } Review Comment: > It could be part of stateStore.storeScalingTracking(ctx, scalingTracking); even I removed the redundant RUNNING check, as Max recommended, so it looks more straightforward now. Pushing this call down into the `storeScalingTracking` would make it harder to reason, since it is key that `runRescaleLogic` is only executed when the job is in the RUNNING state and hence the transition is considered complete. It also does not seem right to bundle the logic specific to this concrete situation into `KubernetesAutoScalerStateStore` which acts more as a simple persistence layer. Hope this is fine by you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org