JunRuiLee commented on code in PR #26180:
URL: https://github.com/apache/flink/pull/26180#discussion_r1967055974


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/VertexParallelismAndInputInfosDeciderUtils.java:
##########
@@ -658,4 +661,57 @@ public static long calculateDataVolumePerTaskForInput(
             long globalDataVolumePerTask, long inputsGroupBytes, long 
totalDataBytes) {
         return (long) ((double) inputsGroupBytes / totalDataBytes * 
globalDataVolumePerTask);
     }
+
+    /**
+     * Logs the data distribution optimization info when a balanced data 
distribution algorithm is
+     * effectively optimized compared to the num-based data distribution 
algorithm.
+     *
+     * @param logger The logger instance used for logging output.
+     * @param jobVertexId The id for the job vertex.
+     * @param inputInfo The original input info
+     * @param optimizedJobVertexInputInfo The optimized job vertex input info.
+     */
+    public static void logBalancedDataDistributionOptimizationResult(
+            Logger logger,
+            JobVertexID jobVertexId,
+            BlockingInputInfo inputInfo,
+            JobVertexInputInfo optimizedJobVertexInputInfo) {
+        // Currently, we will not optimize inputs that have two types of 
correlations at the same
+        // time, skip it.
+        List<ExecutionVertexInputInfo> optimizedExecutionVertexInputInfos =
+                optimizedJobVertexInputInfo.getExecutionVertexInputInfos();
+        int parallelism = optimizedExecutionVertexInputInfos.size();
+        List<ExecutionVertexInputInfo> nonOptimizedExecutionVertexInputInfos =
+                computeNumBasedJobVertexInputInfo(parallelism, inputInfo)
+                        .getExecutionVertexInputInfos();
+        // When the execution vertex input infos of the two are inconsistent, 
we consider that
+        // data balanced distribution optimization has performed.
+        if 
(!optimizedExecutionVertexInputInfos.equals(nonOptimizedExecutionVertexInputInfos))
 {
+            logger.info(
+                    "Optimized balanced data distribution for vertex[ID:{}] - 
Details: "

Review Comment:
   Optimized the balanced data distribution for vertex {}, which reads from 
result {} with type number {}



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveBroadcastJoinOptimizationStrategy.java:
##########
@@ -172,8 +171,20 @@ protected void tryOptimizeAdaptiveJoin(
     }
 
     private boolean canPerformOptimization(
-            ImmutableStreamNode adaptiveJoinNode, ReadableConfig config) {
-        return !isBroadcastJoinDisabled(config) && 
!isBroadcastJoin(adaptiveJoinNode);
+            ImmutableStreamNode adaptiveJoinNode, StreamGraphContext context) {
+        if 
(isBroadcastJoinDisabled(context.getStreamGraph().getConfiguration())
+                || isBroadcastJoin(adaptiveJoinNode)) {
+            return false;
+        }
+        return adaptiveJoinNode.getOutEdges().stream()
+                .noneMatch(
+                        edge ->
+                                context.getOutputPartitioner(
+                                                        edge.getEdgeId(),
+                                                        edge.getSourceId(),
+                                                        edge.getTargetId())
+                                                instanceof ForwardPartitioner
+                                        && edge.isIntraInputKeyCorrelated());

Review Comment:
   Could we reuse the logic in 
AdaptiveSkewedJoinOptimizationStrategy#canPerformOptimizationAutomatic?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to