snuyanzin commented on code in PR #27166:
URL: https://github.com/apache/flink/pull/27166#discussion_r2524071927
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java:
##########
@@ -442,134 +439,45 @@ private boolean canCombine(RelNode input, Join origJoin)
{
/**
* Checks if original join and child multi-join have common join keys to
decide if we can merge
- * them into a single MultiJoin with one more input.
+ * them into a single MultiJoin with one more input. The method uses {@link
+ * AttributeBasedJoinKeyExtractor} to try to create valid common join key
extractors.
*
* @param origJoin original Join
* @param otherJoin child MultiJoin
* @return true if original Join and child multi-join have at least one
common JoinKey
*/
private boolean haveCommonJoinKey(Join origJoin, MultiJoin otherJoin) {
- Set<String> origJoinKeys = getJoinKeys(origJoin);
- Set<String> otherJoinKeys = getJoinKeys(otherJoin);
-
- origJoinKeys.retainAll(otherJoinKeys);
-
- return !origJoinKeys.isEmpty();
- }
-
- /**
- * Returns a set of join keys as strings following this format
[table_name.field_name].
- *
- * @param join Join or MultiJoin node
- * @return set of all the join keys (keys from join conditions)
- */
- public Set<String> getJoinKeys(RelNode join) {
- Set<String> joinKeys = new HashSet<>();
- List<RexCall> conditions = Collections.emptyList();
- List<RelNode> inputs = join.getInputs();
-
- if (join instanceof Join) {
- conditions = collectConjunctions(((Join) join).getCondition());
- } else if (join instanceof MultiJoin) {
- conditions =
- ((MultiJoin) join)
- .getOuterJoinConditions().stream()
- .flatMap(cond ->
collectConjunctions(cond).stream())
- .collect(Collectors.toList());
+ final List<RelNode> combinedJoinInputs =
+ Stream.concat(otherJoin.getInputs().stream(),
Stream.of(origJoin.getRight()))
+ .collect(Collectors.toUnmodifiableList());
+
+ final List<RowType> combinedInputTypes =
+ combinedJoinInputs.stream()
+ .map(i ->
FlinkTypeFactory.toLogicalRowType(i.getRowType()))
+ .collect(Collectors.toUnmodifiableList());
+
+ final List<RexNode> combinedJoinConditions =
+ Stream.concat(
+ otherJoin.getOuterJoinConditions().stream(),
+ List.of(origJoin.getCondition()).stream())
+ .collect(Collectors.toUnmodifiableList());
+
+ final Map<Integer,
List<AttributeBasedJoinKeyExtractor.ConditionAttributeRef>>
+ joinAttributeMap =
+ createJoinAttributeMap(combinedJoinInputs,
combinedJoinConditions);
+
+ boolean haveCommonJoinKey = false;
+ try {
+ // we probe to instantiate AttributeBasedJoinKeyExtractor's
constructor to check whether
+ // it's possible to initialize common join key structures
+ final JoinKeyExtractor keyExtractor =
+ new AttributeBasedJoinKeyExtractor(joinAttributeMap,
combinedInputTypes);
+ haveCommonJoinKey = keyExtractor.getCommonJoinKeyIndices(0).length
> 0;
+ } catch (NoCommonJoinKeyException ignored) {
+ // failed to instantiate common join key structures =>
haveCommonJoinKey is false
Review Comment:
Out of curiosity: if `NoCommonJoinKeyException` is used only to answer
question whether we have common join key or not, can it be done without
throwing exception?
Could it be a case that we have lots of joins without common join keys and
as a result high exception rate?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java:
##########
@@ -442,134 +439,45 @@ private boolean canCombine(RelNode input, Join origJoin)
{
/**
* Checks if original join and child multi-join have common join keys to
decide if we can merge
- * them into a single MultiJoin with one more input.
+ * them into a single MultiJoin with one more input. The method uses {@link
+ * AttributeBasedJoinKeyExtractor} to try to create valid common join key
extractors.
*
* @param origJoin original Join
* @param otherJoin child MultiJoin
* @return true if original Join and child multi-join have at least one
common JoinKey
*/
private boolean haveCommonJoinKey(Join origJoin, MultiJoin otherJoin) {
- Set<String> origJoinKeys = getJoinKeys(origJoin);
- Set<String> otherJoinKeys = getJoinKeys(otherJoin);
-
- origJoinKeys.retainAll(otherJoinKeys);
-
- return !origJoinKeys.isEmpty();
- }
-
- /**
- * Returns a set of join keys as strings following this format
[table_name.field_name].
- *
- * @param join Join or MultiJoin node
- * @return set of all the join keys (keys from join conditions)
- */
- public Set<String> getJoinKeys(RelNode join) {
- Set<String> joinKeys = new HashSet<>();
- List<RexCall> conditions = Collections.emptyList();
- List<RelNode> inputs = join.getInputs();
-
- if (join instanceof Join) {
- conditions = collectConjunctions(((Join) join).getCondition());
- } else if (join instanceof MultiJoin) {
- conditions =
- ((MultiJoin) join)
- .getOuterJoinConditions().stream()
- .flatMap(cond ->
collectConjunctions(cond).stream())
- .collect(Collectors.toList());
+ final List<RelNode> combinedJoinInputs =
+ Stream.concat(otherJoin.getInputs().stream(),
Stream.of(origJoin.getRight()))
+ .collect(Collectors.toUnmodifiableList());
+
+ final List<RowType> combinedInputTypes =
+ combinedJoinInputs.stream()
+ .map(i ->
FlinkTypeFactory.toLogicalRowType(i.getRowType()))
+ .collect(Collectors.toUnmodifiableList());
+
+ final List<RexNode> combinedJoinConditions =
+ Stream.concat(
+ otherJoin.getOuterJoinConditions().stream(),
+ List.of(origJoin.getCondition()).stream())
+ .collect(Collectors.toUnmodifiableList());
+
+ final Map<Integer,
List<AttributeBasedJoinKeyExtractor.ConditionAttributeRef>>
+ joinAttributeMap =
+ createJoinAttributeMap(combinedJoinInputs,
combinedJoinConditions);
+
+ boolean haveCommonJoinKey = false;
+ try {
+ // we probe to instantiate AttributeBasedJoinKeyExtractor's
constructor to check whether
+ // it's possible to initialize common join key structures
+ final JoinKeyExtractor keyExtractor =
+ new AttributeBasedJoinKeyExtractor(joinAttributeMap,
combinedInputTypes);
+ haveCommonJoinKey = keyExtractor.getCommonJoinKeyIndices(0).length
> 0;
+ } catch (NoCommonJoinKeyException ignored) {
+ // failed to instantiate common join key structures =>
haveCommonJoinKey is false
Review Comment:
```suggestion
// failed to instantiate common join key structures => no common
join key
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]