JunRuiLee commented on code in PR #26149: URL: https://github.com/apache/flink/pull/26149#discussion_r1952405330
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveSkewedJoinOptimizationStrategy.java: ########## @@ -301,22 +301,23 @@ private static boolean existBytesLargerThanThreshold(long[] inputBytes, long thr } private static boolean canPerformOptimizationAutomatic(ImmutableStreamNode adaptiveJoinNode) { - // In AUTO mode, we need to ensure that there are no intra-correlated out edge to ensure the - // application of this optimization wouldn't break data correctness or introduce additional - // performance overhead. + // In AUTO mode, we need to ensure that all pointwise out edges do not have + // intra-correlation to ensure the application of this optimization wouldn't break data + // correctness or introduce additional performance overhead. return adaptiveJoinNode.getOutEdges().stream() - .noneMatch(ImmutableStreamEdge::isIntraInputKeyCorrelated); + .noneMatch(edge -> edge.isPointwiseEdge() && edge.isIntraInputKeyCorrelated()); Review Comment: Could we introduce a whitelist to check whether the output partitioner can maintain key group correlation by itself, such as with Hash, Custom, etc.? ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/adaptive/AdaptiveSkewedJoinITCase.scala: ########## @@ -87,6 +87,13 @@ class AdaptiveSkewedJoinITCase extends AdaptiveJoinITCase { checkResult(sql) } + @Test + def testJoinWithHashOutput(): Unit = { + val sql = + "SELECT * FROM (SELECT * FROM T1, T2 WHERE T1.a1 = T2.a2) as T4 LEFT JOIN T3 ON T3.b3 = T4.b1" + checkResult(sql) Review Comment: Since this verification aims to check the result, how can we know whether this change will take effect for AdaptiveSkewedJoin when the configuration is set to AUTO? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org