mxm commented on code in PR #672: URL: https://github.com/apache/flink-kubernetes-operator/pull/672#discussion_r1324308777
########## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java: ########## @@ -74,6 +76,36 @@ public JobAutoScalerImpl( this.infoManager = new AutoscalerInfoManager(); } + @Override + public void scale(FlinkResourceContext<?> ctx) { + var conf = ctx.getObserveConfig(); + var resource = ctx.getResource(); + var resourceId = ResourceID.fromResource(resource); + var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx, resourceId); + + try { + if (resource.getSpec().getJob() == null || !conf.getBoolean(AUTOSCALER_ENABLED)) { + LOG.debug("Autoscaler is disabled"); + return; + } + + // Initialize metrics only if autoscaler is enabled + var status = resource.getStatus(); + if (status.getLifecycleState() != ResourceLifecycleState.STABLE + || !status.getJobStatus().getState().equals(JobStatus.RUNNING.name())) { + LOG.info("Autoscaler is waiting for RUNNING job state"); + lastEvaluatedMetrics.remove(resourceId); + return; + } + + updateParallelismOverrides(ctx, conf, resource, resourceId, autoscalerMetrics); + } catch (Throwable e) { + onError(ctx, resource, autoscalerMetrics, e); + } finally { + applyParallelismOverrides(ctx); Review Comment: At first sight, this looks like the overrides will get applied, even if the autoscaler is disabled. There is another check though that prevents this here: https://github.com/apache/flink-kubernetes-operator/pull/672/files?diff=unified&w=1#diff-7df0c6b50a32c0055e6a1dcfcf9ab25cddb2a245b2125119fd9b57d65918698dR128 (line 128) A bit confusing. See other comment line 88. ########## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java: ########## @@ -74,6 +76,36 @@ public JobAutoScalerImpl( this.infoManager = new AutoscalerInfoManager(); } + @Override + public void scale(FlinkResourceContext<?> ctx) { + var conf = ctx.getObserveConfig(); + var resource = ctx.getResource(); + var resourceId = ResourceID.fromResource(resource); + var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx, resourceId); + + try { + if (resource.getSpec().getJob() == null || !conf.getBoolean(AUTOSCALER_ENABLED)) { + LOG.debug("Autoscaler is disabled"); + return; + } + + // Initialize metrics only if autoscaler is enabled + var status = resource.getStatus(); + if (status.getLifecycleState() != ResourceLifecycleState.STABLE + || !status.getJobStatus().getState().equals(JobStatus.RUNNING.name())) { + LOG.info("Autoscaler is waiting for RUNNING job state"); + lastEvaluatedMetrics.remove(resourceId); + return; + } + + updateParallelismOverrides(ctx, conf, resource, resourceId, autoscalerMetrics); Review Comment: ```suggestion runScalingLogic(ctx, conf, resource, resourceId, autoscalerMetrics); ``` ########## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java: ########## @@ -74,6 +76,36 @@ public JobAutoScalerImpl( this.infoManager = new AutoscalerInfoManager(); } + @Override + public void scale(FlinkResourceContext<?> ctx) { + var conf = ctx.getObserveConfig(); + var resource = ctx.getResource(); + var resourceId = ResourceID.fromResource(resource); + var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx, resourceId); + + try { + if (resource.getSpec().getJob() == null || !conf.getBoolean(AUTOSCALER_ENABLED)) { + LOG.debug("Autoscaler is disabled"); Review Comment: Would reset the overrides here. ########## flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java: ########## @@ -74,6 +76,36 @@ public JobAutoScalerImpl( this.infoManager = new AutoscalerInfoManager(); } + @Override + public void scale(FlinkResourceContext<?> ctx) { + var conf = ctx.getObserveConfig(); + var resource = ctx.getResource(); + var resourceId = ResourceID.fromResource(resource); + var autoscalerMetrics = getOrInitAutoscalerFlinkMetrics(ctx, resourceId); + Review Comment: An alternative would be to apply the current overrides here and the new overrides after the scaling. That would get rid of the finally block. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org