mxm commented on code in PR #795: URL: https://github.com/apache/flink-kubernetes-operator/pull/795#discussion_r1519792431
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java: ########## @@ -92,24 +99,51 @@ public static ConfigChanges tuneTaskManagerMemory( return EMPTY_CONFIG; } - MemorySize specHeapSize = memSpecs.getFlinkMemory().getJvmHeapMemorySize(); - MemorySize specManagedSize = memSpecs.getFlinkMemory().getManaged(); - MemorySize specNetworkSize = memSpecs.getFlinkMemory().getNetwork(); - MemorySize specMetaspaceSize = memSpecs.getJvmMetaspaceSize(); - LOG.info( - "Spec memory - heap: {}, managed: {}, network: {}, meta: {}", - specHeapSize.toHumanReadableString(), - specManagedSize.toHumanReadableString(), - specNetworkSize.toHumanReadableString(), - specMetaspaceSize.toHumanReadableString()); + final TuningSimpleMemorySpec originalSimpleSpec = new TuningSimpleMemorySpec(memSpecs); + LOG.info("The original memory spec : {}", originalSimpleSpec); - MemorySize maxMemoryBySpec = context.getTaskManagerMemory().orElse(MemorySize.ZERO); - if (maxMemoryBySpec.compareTo(MemorySize.ZERO) <= 0) { - LOG.warn("Spec TaskManager memory size could not be determined."); + MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes()); + final TuningSimpleMemorySpec tunedSimpleSpec = + generateTunedMemorySpec( + memSpecs, + context, + evaluatedMetrics, + jobTopology, + scalingSummaries, + config, + originalSimpleSpec, + memBudget); + final long flinkMemoryDiffBytes = + calculateFlinkMemoryDiffBytes(originalSimpleSpec, tunedSimpleSpec); + + // Update total memory according to memory diffs + final MemorySize totalMemory = + new MemorySize(maxMemoryBySpec.getBytes() - memBudget.getRemaining()); + if (totalMemory.compareTo(MemorySize.ZERO) <= 0) { + LOG.warn("Invalid total memory configuration: {}", totalMemory); return EMPTY_CONFIG; } - MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes()); + ConfigChanges tuningConfig = + generateTuningConfig(memSpecs, tunedSimpleSpec, flinkMemoryDiffBytes, totalMemory); + triggerMemoryTuningEvent(context, eventHandler, config, tuningConfig); + + if (!context.getConfiguration().get(AutoScalerOptions.MEMORY_TUNING_ENABLED)) { + return EMPTY_CONFIG; + } + return tuningConfig; + } + + @Nonnull + private static TuningSimpleMemorySpec generateTunedMemorySpec( + CommonProcessMemorySpec<TaskExecutorFlinkMemory> memSpecs, + JobAutoScalerContext<?> context, + EvaluatedMetrics evaluatedMetrics, + JobTopology jobTopology, + Map<JobVertexID, ScalingSummary> scalingSummaries, + UnmodifiableConfiguration config, + TuningSimpleMemorySpec originalSimpleSpec, + MemoryBudget memBudget) { Review Comment: I might be biased, but after this PR, the logic is much harder to understand. Slightly longer methods are not bad per-se. Using methods is actually tricky and the precise reason why the code is written the way it was written. I considered using methods but figured it would make the code harder to read because of the many dependencies which would go into the method signatures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org