mxm commented on code in PR #795: URL: https://github.com/apache/flink-kubernetes-operator/pull/795#discussion_r1521324931
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java: ########## @@ -92,24 +99,51 @@ public static ConfigChanges tuneTaskManagerMemory( return EMPTY_CONFIG; } - MemorySize specHeapSize = memSpecs.getFlinkMemory().getJvmHeapMemorySize(); - MemorySize specManagedSize = memSpecs.getFlinkMemory().getManaged(); - MemorySize specNetworkSize = memSpecs.getFlinkMemory().getNetwork(); - MemorySize specMetaspaceSize = memSpecs.getJvmMetaspaceSize(); - LOG.info( - "Spec memory - heap: {}, managed: {}, network: {}, meta: {}", - specHeapSize.toHumanReadableString(), - specManagedSize.toHumanReadableString(), - specNetworkSize.toHumanReadableString(), - specMetaspaceSize.toHumanReadableString()); + final TuningSimpleMemorySpec originalSimpleSpec = new TuningSimpleMemorySpec(memSpecs); + LOG.info("The original memory spec : {}", originalSimpleSpec); - MemorySize maxMemoryBySpec = context.getTaskManagerMemory().orElse(MemorySize.ZERO); - if (maxMemoryBySpec.compareTo(MemorySize.ZERO) <= 0) { - LOG.warn("Spec TaskManager memory size could not be determined."); + MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes()); + final TuningSimpleMemorySpec tunedSimpleSpec = + generateTunedMemorySpec( + memSpecs, + context, + evaluatedMetrics, + jobTopology, + scalingSummaries, + config, + originalSimpleSpec, + memBudget); + final long flinkMemoryDiffBytes = + calculateFlinkMemoryDiffBytes(originalSimpleSpec, tunedSimpleSpec); + + // Update total memory according to memory diffs + final MemorySize totalMemory = + new MemorySize(maxMemoryBySpec.getBytes() - memBudget.getRemaining()); + if (totalMemory.compareTo(MemorySize.ZERO) <= 0) { + LOG.warn("Invalid total memory configuration: {}", totalMemory); return EMPTY_CONFIG; } - MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes()); + ConfigChanges tuningConfig = + generateTuningConfig(memSpecs, tunedSimpleSpec, flinkMemoryDiffBytes, totalMemory); + triggerMemoryTuningEvent(context, eventHandler, config, tuningConfig); + + if (!context.getConfiguration().get(AutoScalerOptions.MEMORY_TUNING_ENABLED)) { + return EMPTY_CONFIG; + } + return tuningConfig; + } + + @Nonnull + private static TuningSimpleMemorySpec generateTunedMemorySpec( + CommonProcessMemorySpec<TaskExecutorFlinkMemory> memSpecs, + JobAutoScalerContext<?> context, + EvaluatedMetrics evaluatedMetrics, + JobTopology jobTopology, + Map<JobVertexID, ScalingSummary> scalingSummaries, + UnmodifiableConfiguration config, + TuningSimpleMemorySpec originalSimpleSpec, + MemoryBudget memBudget) { Review Comment: Thanks Rui! I'm not against refactoring the code. I just wasn't convinced this change made the code easier to understand. Thank you for listening to my concerns. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org