JunRuiLee commented on code in PR #26180:
URL: https://github.com/apache/flink/pull/26180#discussion_r1963051617


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/VertexParallelismAndInputInfosDeciderUtils.java:
##########
@@ -658,4 +660,148 @@ public static long calculateDataVolumePerTaskForInput(
             long globalDataVolumePerTask, long inputsGroupBytes, long 
totalDataBytes) {
         return (long) ((double) inputsGroupBytes / totalDataBytes * 
globalDataVolumePerTask);
     }
+
+    public static Optional<String> constructOptimizationLog(
+            BlockingInputInfo inputInfo, JobVertexInputInfo 
jobVertexInputInfo) {
+        if (inputInfo.areInterInputsKeysCorrelated() && 
inputInfo.isIntraInputKeyCorrelated()) {
+            return Optional.empty();
+        }
+        boolean optimized = false;
+        List<ExecutionVertexInputInfo> executionVertexInputInfos =
+                jobVertexInputInfo.getExecutionVertexInputInfos();
+        int parallelism = executionVertexInputInfos.size();
+        long[] optimizedDataBytes = new long[parallelism];
+        long optimizedMin = Long.MAX_VALUE, optimizedMax = 0;
+        long[] nonOptimizedDataBytes = new long[parallelism];
+        long nonOptimizedMin = Long.MAX_VALUE, nonOptimizedMax = 0;
+        for (int i = 0; i < parallelism; ++i) {
+            Map<IndexRange, IndexRange> consumedSubpartitionGroups =
+                    
executionVertexInputInfos.get(i).getConsumedSubpartitionGroups();
+            for (Map.Entry<IndexRange, IndexRange> entry : 
consumedSubpartitionGroups.entrySet()) {
+                IndexRange partitionRange = entry.getKey();
+                IndexRange subpartitionRange = entry.getValue();
+                optimizedDataBytes[i] +=
+                        inputInfo.getNumBytesProduced(partitionRange, 
subpartitionRange);
+            }
+            optimizedMin = Math.min(optimizedMin, optimizedDataBytes[i]);
+            optimizedMax = Math.max(optimizedMax, optimizedDataBytes[i]);
+
+            Map<IndexRange, IndexRange> nonOptimizedConsumedSubpartitionGroup =
+                    computeNumBasedConsumedSubpartitionGroup(parallelism, i, 
inputInfo);
+            checkState(nonOptimizedConsumedSubpartitionGroup.size() == 1);
+            nonOptimizedDataBytes[i] +=
+                    inputInfo.getNumBytesProduced(
+                            nonOptimizedConsumedSubpartitionGroup
+                                    .entrySet()
+                                    .iterator()
+                                    .next()
+                                    .getKey(),
+                            nonOptimizedConsumedSubpartitionGroup
+                                    .entrySet()
+                                    .iterator()
+                                    .next()
+                                    .getValue());
+            nonOptimizedMin = Math.min(nonOptimizedMin, 
nonOptimizedDataBytes[i]);
+            nonOptimizedMax = Math.max(nonOptimizedMax, 
nonOptimizedDataBytes[i]);
+
+            if (!optimized
+                    && 
!consumedSubpartitionGroups.equals(nonOptimizedConsumedSubpartitionGroup)) {
+                optimized = true;
+            }
+        }
+        if (optimized) {
+            long optimizedMed = median(optimizedDataBytes);
+            long nonOptimizedMed = median(nonOptimizedDataBytes);
+            String logMessage =
+                    String.format(
+                            "Result id: %s, "
+                                    + "type number: %d, "
+                                    + "input data size: "
+                                    + "[ Before: {min: %s, median: %s, max: 
%s}, "
+                                    + "After: {min: %s, median: %s, max: %s} 
]",
+                            inputInfo.getResultId(),
+                            inputInfo.getInputTypeNumber(),
+                            new 
MemorySize(nonOptimizedMin).toHumanReadableString(),
+                            new 
MemorySize(nonOptimizedMed).toHumanReadableString(),
+                            new 
MemorySize(nonOptimizedMax).toHumanReadableString(),
+                            new 
MemorySize(optimizedMin).toHumanReadableString(),
+                            new 
MemorySize(optimizedMed).toHumanReadableString(),
+                            new 
MemorySize(optimizedMax).toHumanReadableString());
+            return Optional.of(logMessage);
+        }
+        return Optional.empty();
+    }
+
+    private static Map<IndexRange, IndexRange> 
computeNumBasedConsumedSubpartitionGroup(
+            int parallelism, int currentIndex, BlockingInputInfo inputInfo) {
+        int sourceParallelism = inputInfo.getNumPartitions();
+
+        if (inputInfo.isPointwise()) {
+            return computeNumBasedConsumedSubpartitionGroupForPointwise(
+                    sourceParallelism, parallelism, currentIndex, 
inputInfo::getNumSubpartitions);
+        } else {
+            return computeNumBasedConsumedSubpartitionGroupForAllToAll(
+                    sourceParallelism,
+                    parallelism,
+                    currentIndex,
+                    inputInfo::getNumSubpartitions,
+                    inputInfo.isBroadcast(),
+                    inputInfo.isSingleSubpartitionContainsAllData());
+        }
+    }
+
+    static Map<IndexRange, IndexRange> 
computeNumBasedConsumedSubpartitionGroupForPointwise(

Review Comment:
   VisibleForTesting
   And Can we reuse the algorithm of 
VertexInputInfoComputationUtils#computeVertexInputInfoForPointwise in this 
method? The same applies to computeNumBasedConsumedSubpartitionGroupForAllToAll.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to