JunRuiLee commented on code in PR #26180: URL: https://github.com/apache/flink/pull/26180#discussion_r1967055974
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/util/VertexParallelismAndInputInfosDeciderUtils.java: ########## @@ -658,4 +661,57 @@ public static long calculateDataVolumePerTaskForInput( long globalDataVolumePerTask, long inputsGroupBytes, long totalDataBytes) { return (long) ((double) inputsGroupBytes / totalDataBytes * globalDataVolumePerTask); } + + /** + * Logs the data distribution optimization info when a balanced data distribution algorithm is + * effectively optimized compared to the num-based data distribution algorithm. + * + * @param logger The logger instance used for logging output. + * @param jobVertexId The id for the job vertex. + * @param inputInfo The original input info + * @param optimizedJobVertexInputInfo The optimized job vertex input info. + */ + public static void logBalancedDataDistributionOptimizationResult( + Logger logger, + JobVertexID jobVertexId, + BlockingInputInfo inputInfo, + JobVertexInputInfo optimizedJobVertexInputInfo) { + // Currently, we will not optimize inputs that have two types of correlations at the same + // time, skip it. + List<ExecutionVertexInputInfo> optimizedExecutionVertexInputInfos = + optimizedJobVertexInputInfo.getExecutionVertexInputInfos(); + int parallelism = optimizedExecutionVertexInputInfos.size(); + List<ExecutionVertexInputInfo> nonOptimizedExecutionVertexInputInfos = + computeNumBasedJobVertexInputInfo(parallelism, inputInfo) + .getExecutionVertexInputInfos(); + // When the execution vertex input infos of the two are inconsistent, we consider that + // data balanced distribution optimization has performed. + if (!optimizedExecutionVertexInputInfos.equals(nonOptimizedExecutionVertexInputInfos)) { + logger.info( + "Optimized balanced data distribution for vertex[ID:{}] - Details: " Review Comment: Optimized the balanced data distribution for vertex {}, which reads from result {} with type number {} ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveBroadcastJoinOptimizationStrategy.java: ########## @@ -172,8 +171,20 @@ protected void tryOptimizeAdaptiveJoin( } private boolean canPerformOptimization( - ImmutableStreamNode adaptiveJoinNode, ReadableConfig config) { - return !isBroadcastJoinDisabled(config) && !isBroadcastJoin(adaptiveJoinNode); + ImmutableStreamNode adaptiveJoinNode, StreamGraphContext context) { + if (isBroadcastJoinDisabled(context.getStreamGraph().getConfiguration()) + || isBroadcastJoin(adaptiveJoinNode)) { + return false; + } + return adaptiveJoinNode.getOutEdges().stream() + .noneMatch( + edge -> + context.getOutputPartitioner( + edge.getEdgeId(), + edge.getSourceId(), + edge.getTargetId()) + instanceof ForwardPartitioner + && edge.isIntraInputKeyCorrelated()); Review Comment: Could we reuse the logic in AdaptiveSkewedJoinOptimizationStrategy#canPerformOptimizationAutomatic? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org