1996fanrui commented on code in PR #795:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/795#discussion_r1520737667


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java:
##########
@@ -92,24 +99,51 @@ public static ConfigChanges tuneTaskManagerMemory(
             return EMPTY_CONFIG;
         }
 
-        MemorySize specHeapSize = 
memSpecs.getFlinkMemory().getJvmHeapMemorySize();
-        MemorySize specManagedSize = memSpecs.getFlinkMemory().getManaged();
-        MemorySize specNetworkSize = memSpecs.getFlinkMemory().getNetwork();
-        MemorySize specMetaspaceSize = memSpecs.getJvmMetaspaceSize();
-        LOG.info(
-                "Spec memory - heap: {}, managed: {}, network: {}, meta: {}",
-                specHeapSize.toHumanReadableString(),
-                specManagedSize.toHumanReadableString(),
-                specNetworkSize.toHumanReadableString(),
-                specMetaspaceSize.toHumanReadableString());
+        final TuningSimpleMemorySpec originalSimpleSpec = new 
TuningSimpleMemorySpec(memSpecs);
+        LOG.info("The original memory spec : {}", originalSimpleSpec);
 
-        MemorySize maxMemoryBySpec = 
context.getTaskManagerMemory().orElse(MemorySize.ZERO);
-        if (maxMemoryBySpec.compareTo(MemorySize.ZERO) <= 0) {
-            LOG.warn("Spec TaskManager memory size could not be determined.");
+        MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes());
+        final TuningSimpleMemorySpec tunedSimpleSpec =
+                generateTunedMemorySpec(
+                        memSpecs,
+                        context,
+                        evaluatedMetrics,
+                        jobTopology,
+                        scalingSummaries,
+                        config,
+                        originalSimpleSpec,
+                        memBudget);
+        final long flinkMemoryDiffBytes =
+                calculateFlinkMemoryDiffBytes(originalSimpleSpec, 
tunedSimpleSpec);
+
+        // Update total memory according to memory diffs
+        final MemorySize totalMemory =
+                new MemorySize(maxMemoryBySpec.getBytes() - 
memBudget.getRemaining());
+        if (totalMemory.compareTo(MemorySize.ZERO) <= 0) {
+            LOG.warn("Invalid total memory configuration: {}", totalMemory);
             return EMPTY_CONFIG;
         }
 
-        MemoryBudget memBudget = new MemoryBudget(maxMemoryBySpec.getBytes());
+        ConfigChanges tuningConfig =
+                generateTuningConfig(memSpecs, tunedSimpleSpec, 
flinkMemoryDiffBytes, totalMemory);
+        triggerMemoryTuningEvent(context, eventHandler, config, tuningConfig);
+
+        if 
(!context.getConfiguration().get(AutoScalerOptions.MEMORY_TUNING_ENABLED)) {
+            return EMPTY_CONFIG;
+        }
+        return tuningConfig;
+    }
+
+    @Nonnull
+    private static TuningSimpleMemorySpec generateTunedMemorySpec(
+            CommonProcessMemorySpec<TaskExecutorFlinkMemory> memSpecs,
+            JobAutoScalerContext<?> context,
+            EvaluatedMetrics evaluatedMetrics,
+            JobTopology jobTopology,
+            Map<JobVertexID, ScalingSummary> scalingSummaries,
+            UnmodifiableConfiguration config,
+            TuningSimpleMemorySpec originalSimpleSpec,
+            MemoryBudget memBudget) {

Review Comment:
   Thanks for the clarification!
   
   I'm worried that this method will become longer in the future and hard to 
read. I don't have strong opinion on this refactoring. Let me close this PR, 
thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to