[ https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940804#comment-15940804 ]
Fabian Hueske commented on FLINK-5498: -------------------------------------- Hi [~lincoln.86xy], I thought about this problem and I think I found a memory-safe way to address it, i.e., without a {{CoGroupFunction}}. The idea is to filter out invalid {{null}} join results in a {{GroupReduceFunction}}. The overhead for this another sort, but the operator becomes memory-safe. I think we should prefer a less-efficient memory-safe implementation if possible. I made a prototype implementation for a LEFT OUTER JOIN (see below) but haven't thought about whether it would work for FULL OUTER JOINs as well. What do you think? Best, Fabian {code} public class OuterJoin { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); Row[] dataOuter = new Row[]{Row.of(1, 100), Row.of(1, 100), Row.of(2, 200), Row.of(3, 300), Row.of(4, 400), Row.of(5, 500), Row.of(6, 600), Row.of(6, 600)}; Row[] dataInner = new Row[]{Row.of(1, 10), Row.of(1, 110), Row.of(2, 220), Row.of(3, 30), Row.of(4, 40), Row.of(4, 41)}; RowTypeInfo rowType = new RowTypeInfo( BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO ); DataSet<Row> outer = env.fromCollection(Arrays.asList(dataOuter), rowType); DataSet<Row> inner = env.fromCollection(Arrays.asList(dataInner), rowType); DataSet<Row> joined = outer .leftOuterJoin(inner) .where(0).equalTo(0) // define join keys .with(new JoinFunc()) // join function adds flag whether join with null or not .groupBy(1, 2) // group by all fields of the outer table (partitioning is reused) .reduceGroup(new NullFilter()); // filter out all null joins if there was any matched join joined.print(); } @FunctionAnnotation.ForwardedFieldsFirst({"f0->f1; f1->f2"}) @FunctionAnnotation.ForwardedFieldsSecond({"f0->f3"}) public static class JoinFunc implements JoinFunction<Row, Row, Row>, ResultTypeQueryable<Row> { @Override public Row join(Row outer, Row inner) throws Exception { if (inner == null) { return Row.of(true, outer.getField(0), outer.getField(1), null, null); } else { if (((int)outer.getField(1)) > ((int)inner.getField(1))) { // remains return Row.of(false, outer.getField(0), outer.getField(1), inner.getField(0), inner.getField(1)); } else { // filtered out return Row.of(true, outer.getField(0), outer.getField(1), null, null); } } } @Override public TypeInformation<Row> getProducedType() { return new RowTypeInfo( BasicTypeInfo.BOOLEAN_TYPE_INFO, // flag to indicate null BasicTypeInfo.INT_TYPE_INFO, // first field of outer table BasicTypeInfo.INT_TYPE_INFO, // second field of outer table BasicTypeInfo.INT_TYPE_INFO, // first field of inner table BasicTypeInfo.INT_TYPE_INFO // second field of inner table ); } } @FunctionAnnotation.ForwardedFields({"f1->f0; f2->f1"}) public static class NullFilter implements GroupReduceFunction<Row, Row>, ResultTypeQueryable<Row> { @Override public void reduce(Iterable<Row> rows, Collector<Row> out) throws Exception { boolean needsNull = true; int nullCnt = 0; Row r = null; Iterator<Row> rowsIt = rows.iterator(); while (rowsIt.hasNext()) { r = rowsIt.next(); boolean isNull = (Boolean) r.getField(0); if (!isNull) { // non nulls are directly forwarded out.collect(Row.of(r.getField(1), r.getField(2), r.getField(3), r.getField(4))); needsNull = false; } else { // nulls are not forwarded but counted. Let's see if there were some join matches nullCnt++; } } if (needsNull) { // no join matches found. Forward null joins for (int i = 0; i < nullCnt; i++) { out.collect(Row.of(r.getField(1), r.getField(2), null, null)); } } } @Override public TypeInformation<Row> getProducedType() { return new RowTypeInfo( BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO ); } } } {code} > Add support for left/right outer joins with non-equality predicates (and 1+ > equality predicates) > ------------------------------------------------------------------------------------------------ > > Key: FLINK-5498 > URL: https://issues.apache.org/jira/browse/FLINK-5498 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Affects Versions: 1.3.0 > Reporter: lincoln.lee > Assignee: lincoln.lee > Priority: Minor > > I found the expected result of a unit test case incorrect compare to that in > a RDMBS, > see > flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala > {code:title=JoinITCase.scala} > def testRightJoinWithNotOnlyEquiJoin(): Unit = { > ... > val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, > 'c) > val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, > 'f, 'g, 'h) > val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g) > > val expected = "Hello world,BCD\n" > val results = joinT.toDataSet[Row].collect() > TestBaseUtils.compareResultAsText(results.asJava, expected) > } > {code} > Then I took some time to learn about the ‘outer join’ in relational > databases, the right result of above case should be(tested in SQL Server and > MySQL, the results are same): > {code} > > select c, g from tuple3 right outer join tuple5 on a=f and b<h; > c g > -------------------------------- -------------------------------- > NULL Hallo > NULL Hallo Welt > NULL Hallo Welt wie > NULL Hallo Welt wie gehts? > NULL ABC > Hello world BCD > NULL CDE > NULL DEF > NULL EFG > NULL FGH > NULL GHI > NULL HIJ > NULL IJK > NULL JKL > NULL KLM > {code} > the join condition {{rightOuterJoin('a === 'd && 'b < 'h)}} is not equivalent > to {{rightOuterJoin('a === 'd).where('b < 'h)}}. > The problem is rooted in the code-generated {{JoinFunction}} (see > {{DataSetJoin.translateToPlan()}}, line 188). If the join condition does not > match, we must emit the outer row padded with nulls instead of returning from > the function without emitting anything. > The code-generated {{JoinFunction}} does also include equality predicates. > These should be removed before generating the code, e.g., in > {{DataSetJoinRule}} when generating the {{DataSetJoin}} with help of > {{JoinInfo.getRemaining()}}. > More details: https://goo.gl/ngekca -- This message was sent by Atlassian JIRA (v6.3.15#6346)