gustavodemorais commented on code in PR #27166:
URL: https://github.com/apache/flink/pull/27166#discussion_r2524295689


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java:
##########
@@ -442,134 +439,45 @@ private boolean canCombine(RelNode input, Join origJoin) 
{
 
     /**
      * Checks if original join and child multi-join have common join keys to 
decide if we can merge
-     * them into a single MultiJoin with one more input.
+     * them into a single MultiJoin with one more input. The method uses {@link
+     * AttributeBasedJoinKeyExtractor} to try to create valid common join key 
extractors.
      *
      * @param origJoin original Join
      * @param otherJoin child MultiJoin
      * @return true if original Join and child multi-join have at least one 
common JoinKey
      */
     private boolean haveCommonJoinKey(Join origJoin, MultiJoin otherJoin) {
-        Set<String> origJoinKeys = getJoinKeys(origJoin);
-        Set<String> otherJoinKeys = getJoinKeys(otherJoin);
-
-        origJoinKeys.retainAll(otherJoinKeys);
-
-        return !origJoinKeys.isEmpty();
-    }
-
-    /**
-     * Returns a set of join keys as strings following this format 
[table_name.field_name].
-     *
-     * @param join Join or MultiJoin node
-     * @return set of all the join keys (keys from join conditions)
-     */
-    public Set<String> getJoinKeys(RelNode join) {
-        Set<String> joinKeys = new HashSet<>();
-        List<RexCall> conditions = Collections.emptyList();
-        List<RelNode> inputs = join.getInputs();
-
-        if (join instanceof Join) {
-            conditions = collectConjunctions(((Join) join).getCondition());
-        } else if (join instanceof MultiJoin) {
-            conditions =
-                    ((MultiJoin) join)
-                            .getOuterJoinConditions().stream()
-                                    .flatMap(cond -> 
collectConjunctions(cond).stream())
-                                    .collect(Collectors.toList());
+        final List<RelNode> combinedJoinInputs =
+                Stream.concat(otherJoin.getInputs().stream(), 
Stream.of(origJoin.getRight()))
+                        .collect(Collectors.toUnmodifiableList());
+
+        final List<RowType> combinedInputTypes =
+                combinedJoinInputs.stream()
+                        .map(i -> 
FlinkTypeFactory.toLogicalRowType(i.getRowType()))
+                        .collect(Collectors.toUnmodifiableList());
+
+        final List<RexNode> combinedJoinConditions =
+                Stream.concat(
+                                otherJoin.getOuterJoinConditions().stream(),
+                                List.of(origJoin.getCondition()).stream())
+                        .collect(Collectors.toUnmodifiableList());
+
+        final Map<Integer, 
List<AttributeBasedJoinKeyExtractor.ConditionAttributeRef>>
+                joinAttributeMap =
+                        createJoinAttributeMap(combinedJoinInputs, 
combinedJoinConditions);
+
+        boolean haveCommonJoinKey = false;
+        try {
+            // we probe to instantiate AttributeBasedJoinKeyExtractor's 
constructor to check whether
+            // it's possible to initialize common join key structures
+            final JoinKeyExtractor keyExtractor =
+                    new AttributeBasedJoinKeyExtractor(joinAttributeMap, 
combinedInputTypes);
+            haveCommonJoinKey = keyExtractor.getCommonJoinKeyIndices(0).length 
> 0;
+        } catch (NoCommonJoinKeyException ignored) {
+            // failed to instantiate common join key structures => 
haveCommonJoinKey is false

Review Comment:
   > Out of curiosity: if NoCommonJoinKeyException is used only to answer 
question whether we have common join key or not, can it be done without 
throwing exception?
   
   It certainly can but it would require more refactoring. This rule will be 
only used when the users enable the multijoin option and then we expect the 
joins to usually have a common join key. Do you think it's a no go here? I also 
wondered about the approach and I looked around and I _think_ I found other 
places where we do similar checks. 
   
   Some context: the same code is used in the StreamExecMultiJoin and 
StreamPhysicalMultiJoinRule and we need to throw exceptions there if we have a 
MultiJoin without a common join key. That was the idea of the PR, to use the 
same code, since we were having inconsistencies between the logics calculating 
the common join key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to