gustavodemorais commented on code in PR #27166:
URL: https://github.com/apache/flink/pull/27166#discussion_r2524295689
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java:
##########
@@ -442,134 +439,45 @@ private boolean canCombine(RelNode input, Join origJoin)
{
/**
* Checks if original join and child multi-join have common join keys to
decide if we can merge
- * them into a single MultiJoin with one more input.
+ * them into a single MultiJoin with one more input. The method uses {@link
+ * AttributeBasedJoinKeyExtractor} to try to create valid common join key
extractors.
*
* @param origJoin original Join
* @param otherJoin child MultiJoin
* @return true if original Join and child multi-join have at least one
common JoinKey
*/
private boolean haveCommonJoinKey(Join origJoin, MultiJoin otherJoin) {
- Set<String> origJoinKeys = getJoinKeys(origJoin);
- Set<String> otherJoinKeys = getJoinKeys(otherJoin);
-
- origJoinKeys.retainAll(otherJoinKeys);
-
- return !origJoinKeys.isEmpty();
- }
-
- /**
- * Returns a set of join keys as strings following this format
[table_name.field_name].
- *
- * @param join Join or MultiJoin node
- * @return set of all the join keys (keys from join conditions)
- */
- public Set<String> getJoinKeys(RelNode join) {
- Set<String> joinKeys = new HashSet<>();
- List<RexCall> conditions = Collections.emptyList();
- List<RelNode> inputs = join.getInputs();
-
- if (join instanceof Join) {
- conditions = collectConjunctions(((Join) join).getCondition());
- } else if (join instanceof MultiJoin) {
- conditions =
- ((MultiJoin) join)
- .getOuterJoinConditions().stream()
- .flatMap(cond ->
collectConjunctions(cond).stream())
- .collect(Collectors.toList());
+ final List<RelNode> combinedJoinInputs =
+ Stream.concat(otherJoin.getInputs().stream(),
Stream.of(origJoin.getRight()))
+ .collect(Collectors.toUnmodifiableList());
+
+ final List<RowType> combinedInputTypes =
+ combinedJoinInputs.stream()
+ .map(i ->
FlinkTypeFactory.toLogicalRowType(i.getRowType()))
+ .collect(Collectors.toUnmodifiableList());
+
+ final List<RexNode> combinedJoinConditions =
+ Stream.concat(
+ otherJoin.getOuterJoinConditions().stream(),
+ List.of(origJoin.getCondition()).stream())
+ .collect(Collectors.toUnmodifiableList());
+
+ final Map<Integer,
List<AttributeBasedJoinKeyExtractor.ConditionAttributeRef>>
+ joinAttributeMap =
+ createJoinAttributeMap(combinedJoinInputs,
combinedJoinConditions);
+
+ boolean haveCommonJoinKey = false;
+ try {
+ // we probe to instantiate AttributeBasedJoinKeyExtractor's
constructor to check whether
+ // it's possible to initialize common join key structures
+ final JoinKeyExtractor keyExtractor =
+ new AttributeBasedJoinKeyExtractor(joinAttributeMap,
combinedInputTypes);
+ haveCommonJoinKey = keyExtractor.getCommonJoinKeyIndices(0).length
> 0;
+ } catch (NoCommonJoinKeyException ignored) {
+ // failed to instantiate common join key structures =>
haveCommonJoinKey is false
Review Comment:
> Out of curiosity: if NoCommonJoinKeyException is used only to answer
question whether we have common join key or not, can it be done without
throwing exception?
It certainly can but it would require more refactoring. This rule will be
only used when the users enable the multijoin option and then we expect the
joins to usually have a common join key. Do you think it's a no go here? I also
wondered about the approach and I looked around and I _think_ I found other
places where we do similar checks.
Some context: the same code is used in the StreamExecMultiJoin and
StreamPhysicalMultiJoinRule and we need to throw exceptions there if we have a
MultiJoin without a common join key. That was the idea of the PR, to use the
same code, since we were having inconsistencies between the logics calculating
the common join key.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]