[ 
https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940804#comment-15940804
 ] 

Fabian Hueske commented on FLINK-5498:
--------------------------------------

Hi [~lincoln.86xy], I thought about this problem and I think I found a 
memory-safe way to address it, i.e., without a {{CoGroupFunction}}. The idea is 
to filter out invalid {{null}} join results in a {{GroupReduceFunction}}. The 
overhead for this another sort, but the operator becomes memory-safe.

I think we should prefer a less-efficient memory-safe implementation if 
possible.

I made a prototype implementation for a LEFT OUTER JOIN (see below) but haven't 
thought about whether it would work for FULL OUTER JOINs as well.

What do you think?

Best, Fabian

{code}
public class OuterJoin {
  
  public static void main(String[] args) throws Exception {

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(2);

    Row[] dataOuter = new Row[]{Row.of(1, 100), Row.of(1, 100), Row.of(2, 200), 
Row.of(3, 300), Row.of(4, 400), Row.of(5, 500), Row.of(6, 600), Row.of(6, 600)};
    Row[] dataInner = new Row[]{Row.of(1, 10), Row.of(1, 110), Row.of(2, 220), 
Row.of(3, 30), Row.of(4, 40), Row.of(4, 41)};

    RowTypeInfo rowType = new RowTypeInfo(
      BasicTypeInfo.INT_TYPE_INFO,
      BasicTypeInfo.INT_TYPE_INFO
    );

    DataSet<Row> outer = env.fromCollection(Arrays.asList(dataOuter), rowType);
    DataSet<Row> inner = env.fromCollection(Arrays.asList(dataInner), rowType);

    DataSet<Row> joined = outer
      .leftOuterJoin(inner)
        .where(0).equalTo(0) // define join keys
        .with(new JoinFunc()) // join function adds flag whether join with null 
or not
      .groupBy(1, 2) // group by all fields of the outer table (partitioning is 
reused)
      .reduceGroup(new NullFilter()); // filter out all null joins if there was 
any matched join

    joined.print();
  }

  @FunctionAnnotation.ForwardedFieldsFirst({"f0->f1; f1->f2"})
  @FunctionAnnotation.ForwardedFieldsSecond({"f0->f3"})
  public static class JoinFunc implements JoinFunction<Row, Row, Row>, 
ResultTypeQueryable<Row> {

    @Override
    public Row join(Row outer, Row inner) throws Exception {
      if (inner == null) {
        return Row.of(true, outer.getField(0), outer.getField(1), null, null);
      } else {
        if (((int)outer.getField(1)) > ((int)inner.getField(1))) {
          // remains
          return Row.of(false, outer.getField(0), outer.getField(1), 
inner.getField(0), inner.getField(1));
        } else {
          // filtered out
          return Row.of(true, outer.getField(0), outer.getField(1), null, null);
        }

      }
    }

    @Override
    public TypeInformation<Row> getProducedType() {
      return new RowTypeInfo(
        BasicTypeInfo.BOOLEAN_TYPE_INFO, // flag to indicate null
        BasicTypeInfo.INT_TYPE_INFO,     // first field of outer table
        BasicTypeInfo.INT_TYPE_INFO,     // second field of outer table
        BasicTypeInfo.INT_TYPE_INFO,     // first field of inner table
        BasicTypeInfo.INT_TYPE_INFO      // second field of inner table
      );
    }
  }

  @FunctionAnnotation.ForwardedFields({"f1->f0; f2->f1"})
  public static class NullFilter implements GroupReduceFunction<Row, Row>, 
ResultTypeQueryable<Row> {

    @Override
    public void reduce(Iterable<Row> rows, Collector<Row> out) throws Exception 
{

      boolean needsNull = true;
      int nullCnt = 0;
      Row r = null;

      Iterator<Row> rowsIt = rows.iterator();
      while (rowsIt.hasNext()) {

        r = rowsIt.next();
        boolean isNull = (Boolean) r.getField(0);

        if (!isNull) {
          // non nulls are directly forwarded
          out.collect(Row.of(r.getField(1), r.getField(2), r.getField(3), 
r.getField(4)));
          needsNull = false;
        } else {
          // nulls are not forwarded but counted. Let's see if there were some 
join matches
          nullCnt++;
        }
      }

      if (needsNull) {
        // no join matches found. Forward null joins
        for (int i = 0; i < nullCnt; i++) {
          out.collect(Row.of(r.getField(1), r.getField(2), null, null));
        }
      }
    }

    @Override
    public TypeInformation<Row> getProducedType() {
      return new RowTypeInfo(
        BasicTypeInfo.INT_TYPE_INFO,
        BasicTypeInfo.INT_TYPE_INFO,
        BasicTypeInfo.INT_TYPE_INFO,
        BasicTypeInfo.INT_TYPE_INFO
      );
    }
  }
}
{code}

> Add support for left/right outer joins with non-equality predicates (and 1+ 
> equality predicates)
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-5498
>                 URL: https://issues.apache.org/jira/browse/FLINK-5498
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>    Affects Versions: 1.3.0
>            Reporter: lincoln.lee
>            Assignee: lincoln.lee
>            Priority: Minor
>
> I found the expected result of a unit test case incorrect compare to that in 
> a RDMBS, 
> see 
> flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
> {code:title=JoinITCase.scala}
> def testRightJoinWithNotOnlyEquiJoin(): Unit = {
>      ...
>      val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
> 'c)
>      val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
> 'f, 'g, 'h)
>      val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
>  
>      val expected = "Hello world,BCD\n"
>      val results = joinT.toDataSet[Row].collect()
>      TestBaseUtils.compareResultAsText(results.asJava, expected)
> }
> {code}
> Then I took some time to learn about the ‘outer join’ in relational 
> databases, the right result of above case should be(tested in SQL Server and 
> MySQL, the results are same):
> {code}
> > select c, g from tuple3 right outer join tuple5 on a=f and b<h;
> c                                g                               
> -------------------------------- --------------------------------
> NULL                             Hallo                           
> NULL                             Hallo Welt                      
> NULL                             Hallo Welt wie                  
> NULL                             Hallo Welt wie gehts?           
> NULL                             ABC                             
> Hello world                      BCD                             
> NULL                             CDE                             
> NULL                             DEF                             
> NULL                             EFG                             
> NULL                             FGH                             
> NULL                             GHI                             
> NULL                             HIJ                             
> NULL                             IJK                             
> NULL                             JKL                             
> NULL                             KLM   
> {code}
> the join condition {{rightOuterJoin('a === 'd && 'b < 'h)}} is not equivalent 
> to {{rightOuterJoin('a === 'd).where('b < 'h)}}.  
> The problem is rooted in the code-generated {{JoinFunction}} (see 
> {{DataSetJoin.translateToPlan()}}, line 188). If the join condition does not 
> match, we must emit the outer row padded with nulls instead of returning from 
> the function without emitting anything.
> The code-generated {{JoinFunction}} does also include equality predicates. 
> These should be removed before generating the code, e.g., in 
> {{DataSetJoinRule}} when generating the {{DataSetJoin}} with help of 
> {{JoinInfo.getRemaining()}}.
> More details: https://goo.gl/ngekca



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to