mxm commented on code in PR #795:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/795#discussion_r1521324931


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java:
##########
@@ -92,24 +99,51 @@ public static ConfigChanges tuneTaskManagerMemory(
             return EMPTY_CONFIG;
         }
 
-        MemorySize specHeapSize = 
memSpecs.getFlinkMemory().getJvmHeapMemorySize();
-        MemorySize specManagedSize = memSpecs.getFlinkMemory().getManaged();
-        MemorySize specNetworkSize = memSpecs.getFlinkMemory().getNetwork();
-        MemorySize specMetaspaceSize = memSpecs.getJvmMetaspaceSize();
-        LOG.info(
-                "Spec memory - heap: {}, managed: {}, network: {}, meta: {}",
-                specHeapSize.toHumanReadableString(),
-                specManagedSize.toHumanReadableString(),
-                specNetworkSize.toHumanReadableString(),
-                specMetaspaceSize.toHumanReadableString());
+        final TuningSimpleMemorySpec originalSimpleSpec = new 
TuningSimpleMemorySpec(memSpecs);
+        LOG.info("The original memory spec : {}", originalSimpleSpec);
 
-        MemorySize maxMemoryBySpec = 
context.getTaskManagerMemory().orElse(MemorySize.ZERO);
-        if (maxMemoryBySpec.compareTo(MemorySize.ZERO) <= 0) {
-            LOG.warn("Spec TaskManager memory size could not be determined.");
+        MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes());
+        final TuningSimpleMemorySpec tunedSimpleSpec =
+                generateTunedMemorySpec(
+                        memSpecs,
+                        context,
+                        evaluatedMetrics,
+                        jobTopology,
+                        scalingSummaries,
+                        config,
+                        originalSimpleSpec,
+                        memBudget);
+        final long flinkMemoryDiffBytes =
+                calculateFlinkMemoryDiffBytes(originalSimpleSpec, 
tunedSimpleSpec);
+
+        // Update total memory according to memory diffs
+        final MemorySize totalMemory =
+                new MemorySize(maxMemoryBySpec.getBytes() - 
memBudget.getRemaining());
+        if (totalMemory.compareTo(MemorySize.ZERO) <= 0) {
+            LOG.warn("Invalid total memory configuration: {}", totalMemory);
             return EMPTY_CONFIG;
         }
 
-        MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes());
+        ConfigChanges tuningConfig =
+                generateTuningConfig(memSpecs, tunedSimpleSpec, 
flinkMemoryDiffBytes, totalMemory);
+        triggerMemoryTuningEvent(context, eventHandler, config, tuningConfig);
+
+        if 
(!context.getConfiguration().get(AutoScalerOptions.MEMORY_TUNING_ENABLED)) {
+            return EMPTY_CONFIG;
+        }
+        return tuningConfig;
+    }
+
+    @Nonnull
+    private static TuningSimpleMemorySpec generateTunedMemorySpec(
+            CommonProcessMemorySpec<TaskExecutorFlinkMemory> memSpecs,
+            JobAutoScalerContext<?> context,
+            EvaluatedMetrics evaluatedMetrics,
+            JobTopology jobTopology,
+            Map<JobVertexID, ScalingSummary> scalingSummaries,
+            UnmodifiableConfiguration config,
+            TuningSimpleMemorySpec originalSimpleSpec,
+            MemoryBudget memBudget) {

Review Comment:
   Thanks Rui! I'm not against refactoring the code. I just wasn't convinced 
this change made the code easier to understand. Thank you for listening to my 
concerns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to