
ASF GitHub Bot commented on FLINK-2576:

Github user fhueske commented on a diff in the pull request:

    --- Diff: 
    @@ -0,0 +1,679 @@
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.test.javaApiOperators;
    +import org.apache.flink.api.common.InvalidProgramException;
    +import org.apache.flink.api.common.functions.FlatJoinFunction;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.common.functions.RichFlatJoinFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.java.tuple.Tuple5;
    +import org.apache.flink.api.java.tuple.Tuple6;
    +import org.apache.flink.api.java.tuple.Tuple7;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
    +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
    +import org.apache.flink.test.util.MultipleProgramsTestBase;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +import java.util.Collection;
    +import java.util.List;
    +public class OuterJoinITCase extends MultipleProgramsTestBase {
    +   public OuterJoinITCase(TestExecutionMode mode) {
    +           super(mode);
    +   }
    +   @Test
    +   public void testUDFLeftOuterJoinOnTuplesWithKeyFieldPositions() throws 
Exception {
    +           /*
    +            * UDF Join on tuples with key field positions
    +            */
    +           final ExecutionEnvironment env = 
    +           DataSet<Tuple3<Integer, Long, String>> ds1 = 
    +           DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
    +           DataSet<Tuple2<String, String>> joinDs =
    +                           ds1.leftOuterJoin(ds2)
    +                                           .where(0)
    +                                           .equalTo(0)
    +                                           .with(new T3T5FlatJoin());
    +           List<Tuple2<String, String>> result = joinDs.collect();
    +           String expected = "Hi,Hallo\n" +
    +                           "Hello,Hallo Welt\n" +
    +                           "Hello,Hallo Welt wie\n" +
    +                           "Hello world,null\n";
    +           compareResultAsTuples(result, expected);
    +   }
    +   @Test
    +   public void testUDFRightOuterJoinOnTuplesWithKeyFieldPositions() throws 
Exception {
    +           /*
    +            * UDF Join on tuples with key field positions
    +            */
    +           final ExecutionEnvironment env = 
    +           DataSet<Tuple3<Integer, Long, String>> ds1 = 
    +           DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
    +           DataSet<Tuple2<String, String>> joinDs =
    +                           ds1.rightOuterJoin(ds2)
    +                                           .where(1)
    +                                           .equalTo(1)
    +                                           .with(new T3T5FlatJoin());
    +           List<Tuple2<String, String>> result = joinDs.collect();
    +           String expected = "Hi,Hallo\n" +
    +                           "Hello,Hallo Welt\n" +
    +                           "null,Hallo Welt wie\n" +
    +                           "Hello world,Hallo Welt\n";
    +           compareResultAsTuples(result, expected);
    +   }
    +   @Test
    +   public void testUDFFullOuterJoinOnTuplesWithKeyFieldPositions() throws 
Exception {
    +           /*
    +            * UDF Join on tuples with key field positions
    +            */
    +           final ExecutionEnvironment env = 
    +           DataSet<Tuple3<Integer, Long, String>> ds1 = 
    +           DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
    +           DataSet<Tuple2<String, String>> joinDs =
    +                           ds1.fullOuterJoin(ds2)
    +                                           .where(0)
    +                                           .equalTo(2)
    +                                           .with(new T3T5FlatJoin());
    +           List<Tuple2<String, String>> result = joinDs.collect();
    +           String expected = "null,Hallo\n" +
    +                           "Hi,Hallo Welt\n" +
    +                           "Hello,Hallo Welt wie\n" +
    +                           "Hello world,null\n";
    +           compareResultAsTuples(result, expected);
    +   }
    +   @Test
    +   public void testUDFJoinOnTuplesWithMultipleKeyFieldPositions() throws 
Exception {
    +           /*
    +            * UDF Join on tuples with multiple key field positions
    +            */
    +           final ExecutionEnvironment env = 
    +           DataSet<Tuple3<Integer, Long, String>> ds1 = 
    +           DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
    +           DataSet<Tuple2<String, String>> joinDs =
    +                           ds1.fullOuterJoin(ds2)
    +                                           .where(0, 1)
    +                                           .equalTo(0, 4)
    +                                           .with(new T3T5FlatJoin());
    +           List<Tuple2<String, String>> result = joinDs.collect();
    +           String expected = "Hi,Hallo\n" +
    +                           "Hello,Hallo Welt\n" +
    +                           "Hello world,null\n" +
    +                           "null,Hallo Welt wie\n";
    +           compareResultAsTuples(result, expected);
    +   }
    +   @Test(expected = InvalidProgramException.class)
    +   public void testDefaultJoin() throws Exception {
    --- End diff --
    This check should be done as a unit test (as mentioned in my other comment).

> Add outer joins to API and Optimizer
> ------------------------------------
>                 Key: FLINK-2576
>                 URL: https://issues.apache.org/jira/browse/FLINK-2576
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Java API, Optimizer, Scala API
>            Reporter: Ricky Pogalz
>            Priority: Minor
>             Fix For: pre-apache
> Add left/right/full outer join methods to the DataSet APIs (Java, Scala) and 
> to the optimizer of Flink.
> Initially, the execution strategy should be a sort-merge outer join 
> (FLINK-2105) but can later be extended to hash joins for left/right outer 
> joins.

This message was sent by Atlassian JIRA

Reply via email to