JunRuiLee commented on code in PR #26149:
URL: https://github.com/apache/flink/pull/26149#discussion_r1952405330


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveSkewedJoinOptimizationStrategy.java:
##########
@@ -301,22 +301,23 @@ private static boolean 
existBytesLargerThanThreshold(long[] inputBytes, long thr
     }
 
     private static boolean canPerformOptimizationAutomatic(ImmutableStreamNode 
adaptiveJoinNode) {
-        // In AUTO mode, we need to ensure that there are no intra-correlated 
out edge to ensure the
-        // application of this optimization wouldn't break data correctness or 
introduce additional
-        // performance overhead.
+        // In AUTO mode, we need to ensure that all pointwise out edges do not 
have
+        // intra-correlation to ensure the application of this optimization 
wouldn't break data
+        // correctness or introduce additional performance overhead.
         return adaptiveJoinNode.getOutEdges().stream()
-                .noneMatch(ImmutableStreamEdge::isIntraInputKeyCorrelated);
+                .noneMatch(edge -> edge.isPointwiseEdge() && 
edge.isIntraInputKeyCorrelated());

Review Comment:
   Could we introduce a whitelist to check whether the output partitioner can 
maintain key group correlation by itself, such as with Hash, Custom, etc.?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/adaptive/AdaptiveSkewedJoinITCase.scala:
##########
@@ -87,6 +87,13 @@ class AdaptiveSkewedJoinITCase extends AdaptiveJoinITCase {
     checkResult(sql)
   }
 
+  @Test
+  def testJoinWithHashOutput(): Unit = {
+    val sql =
+      "SELECT * FROM (SELECT * FROM T1, T2 WHERE T1.a1 = T2.a2) as T4 LEFT 
JOIN T3 ON T3.b3 = T4.b1"
+    checkResult(sql)

Review Comment:
   Since this verification aims to check the result, how can we know whether 
this change will take effect for AdaptiveSkewedJoin when the configuration is 
set to AUTO?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to