[ 
https://issues.apache.org/jira/browse/FLINK-2576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14908116#comment-14908116
 ] 

ASF GitHub Bot commented on FLINK-2576:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1138#discussion_r40439495
  
    --- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java 
---
    @@ -281,233 +315,225 @@ protected boolean 
udfWithForwardedFieldsSecondAnnotation(Class<?> udfClass) {
                }
     
                @Override
    -           protected JoinOperatorBase<?, ?, OUT, ?> 
translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
    +           protected AbstractJoinOperatorBase<?, ?, OUT, ?> 
translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
    +                   String name = getName() != null ? getName() : "Join at 
" + joinLocationName;
    +
    +                   JoinOperatorBaseBuilder<OUT> builder = new 
JoinOperatorBaseBuilder<OUT>(name, joinType)
    +                                   .withParallelism(getParallelism())
    +                                   .withPartitioner(getPartitioner())
    +                                   .withJoinHint(getJoinHint())
    +                                   .withResultType(getResultType());
    +
    +                   final boolean requiresTupleUnwrapping = keys1 
instanceof Keys.SelectorFunctionKeys || keys2 instanceof 
Keys.SelectorFunctionKeys;
    +                   if (requiresTupleUnwrapping) {
    +                           if (keys1 instanceof Keys.SelectorFunctionKeys 
&& keys2 instanceof Keys.SelectorFunctionKeys) {
    +                                   // Both join sides have a key selector 
function, so we need to do the
    +                                   // tuple wrapping/unwrapping on both 
sides.
    +
    +                                   @SuppressWarnings("unchecked")
    +                                   Keys.SelectorFunctionKeys<I1, ?> 
selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
    +                                   @SuppressWarnings("unchecked")
    +                                   Keys.SelectorFunctionKeys<I2, ?> 
selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
    +
    +                                   builder = builder
    +                                                   .withUdf(new 
TupleUnwrappingJoiner<>(function))
    +                                                   
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
    +                                                   
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
    +                           } else if (keys2 instanceof 
Keys.SelectorFunctionKeys) {
    +                                   // The right side of the join needs the 
tuple wrapping/unwrapping
    +
    +                                   @SuppressWarnings("unchecked")
    +                                   Keys.SelectorFunctionKeys<I2, ?> 
selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
    +
    +                                   builder = builder
    +                                                   .withUdf(new 
TupleRightUnwrappingJoiner<>(function))
    +                                                   .withInput1(input1, 
getInput1Type(), keys1)
    +                                                   
.withUnwrappingRightInput(input2, selectorKeys2, getInput2Type());
    +                           } else {
    +                                   // The left side of the join needs the 
tuple wrapping/unwrapping
     
    -                   String name = getName() != null ? getName() : "Join at 
"+joinLocationName;
    +                                   @SuppressWarnings("unchecked")
    +                                   Keys.SelectorFunctionKeys<I1, ?> 
selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
     
    -                   final JoinOperatorBase<?, ?, OUT, ?> translated;
    -                   
    -                   if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 
instanceof Keys.SelectorFunctionKeys) {
    -                           // Both join sides have a key selector 
function, so we need to do the
    -                           // tuple wrapping/unwrapping on both sides.
    +                                   builder = builder
    +                                                   .withUdf(new 
TupleLeftUnwrappingJoiner<>(function))
    +                                                   
.withUnwrappingLeftInput(input1, selectorKeys1, getInput1Type())
    +                                                   .withInput2(input2, 
getInput2Type(), keys2);
    +                           }
    +                   } else if (keys1 instanceof Keys.ExpressionKeys && 
keys2 instanceof Keys.ExpressionKeys) {
    +                           // Neither side needs the tuple 
wrapping/unwrapping
     
    -                           @SuppressWarnings("unchecked")
    -                           Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 
= (Keys.SelectorFunctionKeys<I1, ?>) keys1;
    -                           @SuppressWarnings("unchecked")
    -                           Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 
= (Keys.SelectorFunctionKeys<I2, ?>) keys2;
    -                           
    -                           PlanBothUnwrappingJoinOperator<I1, I2, OUT, ?> 
po =
    -                                           
translateSelectorFunctionJoin(selectorKeys1, selectorKeys2, function, 
    -                                           getInput1Type(), 
getInput2Type(), getResultType(), name, input1, input2);
    -                           
    -                           // set parallelism
    -                           po.setParallelism(this.getParallelism());
    -                           
    -                           translated = po;
    +                           builder = builder
    +                                           .withUdf(function)
    +                                           .withInput1(input1, 
getInput1Type(), keys1)
    +                                           .withInput2(input2, 
getInput2Type(), keys2);
    +                   } else {
    +                           throw new 
UnsupportedOperationException("Unrecognized or incompatible key types.");
                        }
    -                   else if (keys2 instanceof Keys.SelectorFunctionKeys) {
    -                           // The right side of the join needs the tuple 
wrapping/unwrapping
     
    -                           int[] logicalKeyPositions1 = 
keys1.computeLogicalKeyPositions();
    +                   return builder.build();
    +           }
     
    -                           @SuppressWarnings("unchecked")
    -                           Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 =
    -                                           (Keys.SelectorFunctionKeys<I2, 
?>) keys2;
     
    -                           PlanRightUnwrappingJoinOperator<I1, I2, OUT, ?> 
po =
    -                                           
translateSelectorFunctionJoinRight(logicalKeyPositions1, selectorKeys2,
    -                                                           function, 
getInput1Type(), getInput2Type(), getResultType(), name,
    -                                                           input1, input2);
    +           private static final class JoinOperatorBaseBuilder<OUT> {
    +
    +                   private final String name;
    +                   private final JoinType joinType;
    +
    +                   private int parallelism;
    +                   private FlatJoinFunction udf;
    +                   private TypeInformation<OUT> resultType;
    +
    +                   private Operator input1;
    +                   private TypeInformation input1Type;
    +                   private Keys<?> keys1;
    +
    +                   private Operator input2;
    +                   private TypeInformation input2Type;
    +                   private Keys<?> keys2;
     
    -                           // set parallelism
    -                           po.setParallelism(this.getParallelism());
    +                   private Partitioner<?> partitioner;
    +                   private JoinHint joinHint;
     
    -                           translated = po;
    +                   public JoinOperatorBaseBuilder(String name, JoinType 
joinType) {
    +                           this.name = name;
    +                           this.joinType = joinType;
                        }
    -                   else if (keys1 instanceof Keys.SelectorFunctionKeys) {
    -                           // The left side of the join needs the tuple 
wrapping/unwrapping
     
    +                   public JoinOperatorBaseBuilder<OUT> with() {
    +                           return this;
    +                   }
     
    -                           @SuppressWarnings("unchecked")
    -                           Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 =
    -                                           (Keys.SelectorFunctionKeys<I1, 
?>) keys1;
    +                   public <I1, K> JoinOperatorBaseBuilder<OUT> 
withUnwrappingLeftInput(
    --- End diff --
    
    rename to `withWrappedInput1()` for consistency with `withInput1()`?


> Add outer joins to API and Optimizer
> ------------------------------------
>
>                 Key: FLINK-2576
>                 URL: https://issues.apache.org/jira/browse/FLINK-2576
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Java API, Optimizer, Scala API
>            Reporter: Ricky Pogalz
>            Priority: Minor
>             Fix For: pre-apache
>
>
> Add left/right/full outer join methods to the DataSet APIs (Java, Scala) and 
> to the optimizer of Flink.
> Initially, the execution strategy should be a sort-merge outer join 
> (FLINK-2105) but can later be extended to hash joins for left/right outer 
> joins.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to