[ https://issues.apache.org/jira/browse/FLINK-2576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14908117#comment-14908117 ]
ASF GitHub Bot commented on FLINK-2576: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1138#discussion_r40439511 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java --- @@ -281,233 +315,225 @@ protected boolean udfWithForwardedFieldsSecondAnnotation(Class<?> udfClass) { } @Override - protected JoinOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) { + protected AbstractJoinOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) { + String name = getName() != null ? getName() : "Join at " + joinLocationName; + + JoinOperatorBaseBuilder<OUT> builder = new JoinOperatorBaseBuilder<OUT>(name, joinType) + .withParallelism(getParallelism()) + .withPartitioner(getPartitioner()) + .withJoinHint(getJoinHint()) + .withResultType(getResultType()); + + final boolean requiresTupleUnwrapping = keys1 instanceof Keys.SelectorFunctionKeys || keys2 instanceof Keys.SelectorFunctionKeys; + if (requiresTupleUnwrapping) { + if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) { + // Both join sides have a key selector function, so we need to do the + // tuple wrapping/unwrapping on both sides. + + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1; + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2; + + builder = builder + .withUdf(new TupleUnwrappingJoiner<>(function)) + .withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type()) + .withUnwrappingRightInput(input2, selectorKeys2, getInput2Type()); + } else if (keys2 instanceof Keys.SelectorFunctionKeys) { + // The right side of the join needs the tuple wrapping/unwrapping + + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2; + + builder = builder + .withUdf(new TupleRightUnwrappingJoiner<>(function)) + .withInput1(input1, getInput1Type(), keys1) + .withUnwrappingRightInput(input2, selectorKeys2, getInput2Type()); + } else { + // The left side of the join needs the tuple wrapping/unwrapping - String name = getName() != null ? getName() : "Join at "+joinLocationName; + @SuppressWarnings("unchecked") + Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1; - final JoinOperatorBase<?, ?, OUT, ?> translated; - - if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) { - // Both join sides have a key selector function, so we need to do the - // tuple wrapping/unwrapping on both sides. + builder = builder + .withUdf(new TupleLeftUnwrappingJoiner<>(function)) + .withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type()) + .withInput2(input2, getInput2Type(), keys2); + } + } else if (keys1 instanceof Keys.ExpressionKeys && keys2 instanceof Keys.ExpressionKeys) { + // Neither side needs the tuple wrapping/unwrapping - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1; - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2; - - PlanBothUnwrappingJoinOperator<I1, I2, OUT, ?> po = - translateSelectorFunctionJoin(selectorKeys1, selectorKeys2, function, - getInput1Type(), getInput2Type(), getResultType(), name, input1, input2); - - // set parallelism - po.setParallelism(this.getParallelism()); - - translated = po; + builder = builder + .withUdf(function) + .withInput1(input1, getInput1Type(), keys1) + .withInput2(input2, getInput2Type(), keys2); + } else { + throw new UnsupportedOperationException("Unrecognized or incompatible key types."); } - else if (keys2 instanceof Keys.SelectorFunctionKeys) { - // The right side of the join needs the tuple wrapping/unwrapping - int[] logicalKeyPositions1 = keys1.computeLogicalKeyPositions(); + return builder.build(); + } - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = - (Keys.SelectorFunctionKeys<I2, ?>) keys2; - PlanRightUnwrappingJoinOperator<I1, I2, OUT, ?> po = - translateSelectorFunctionJoinRight(logicalKeyPositions1, selectorKeys2, - function, getInput1Type(), getInput2Type(), getResultType(), name, - input1, input2); + private static final class JoinOperatorBaseBuilder<OUT> { + + private final String name; + private final JoinType joinType; + + private int parallelism; + private FlatJoinFunction udf; + private TypeInformation<OUT> resultType; + + private Operator input1; + private TypeInformation input1Type; + private Keys<?> keys1; + + private Operator input2; + private TypeInformation input2Type; + private Keys<?> keys2; - // set parallelism - po.setParallelism(this.getParallelism()); + private Partitioner<?> partitioner; + private JoinHint joinHint; - translated = po; + public JoinOperatorBaseBuilder(String name, JoinType joinType) { + this.name = name; + this.joinType = joinType; } - else if (keys1 instanceof Keys.SelectorFunctionKeys) { - // The left side of the join needs the tuple wrapping/unwrapping + public JoinOperatorBaseBuilder<OUT> with() { + return this; + } - @SuppressWarnings("unchecked") - Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = - (Keys.SelectorFunctionKeys<I1, ?>) keys1; + public <I1, K> JoinOperatorBaseBuilder<OUT> withUnwrappingLeftInput( + Operator<I1> input1, + Keys.SelectorFunctionKeys<I1, ?> rawKeys1, + TypeInformation<I1> inputType1) { + TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = new TupleTypeInfo<>(rawKeys1.getKeyType(), inputType1); - int[] logicalKeyPositions2 = keys2.computeLogicalKeyPositions(); + MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 = + createKeyMapper(rawKeys1, inputType1, input1, "Key Extractor 1"); + + return this.withInput1(keyMapper1, typeInfoWithKey1, rawKeys1); + } - PlanLeftUnwrappingJoinOperator<I1, I2, OUT, ?> po = - translateSelectorFunctionJoinLeft(selectorKeys1, logicalKeyPositions2, function, - getInput1Type(), getInput2Type(), getResultType(), name, input1, input2); + public <I2, K> JoinOperatorBaseBuilder<OUT> withUnwrappingRightInput( --- End diff -- rename to `withWrappedInput2()` for consistency with `withInput2()`? > Add outer joins to API and Optimizer > ------------------------------------ > > Key: FLINK-2576 > URL: https://issues.apache.org/jira/browse/FLINK-2576 > Project: Flink > Issue Type: Sub-task > Components: Java API, Optimizer, Scala API > Reporter: Ricky Pogalz > Priority: Minor > Fix For: pre-apache > > > Add left/right/full outer join methods to the DataSet APIs (Java, Scala) and > to the optimizer of Flink. > Initially, the execution strategy should be a sort-merge outer join > (FLINK-2105) but can later be extended to hash joins for left/right outer > joins. -- This message was sent by Atlassian JIRA (v6.3.4#6332)