[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305857#comment-16305857 ]
ASF GitHub Bot commented on FLINK-7797: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5140#discussion_r159014072 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -461,6 +462,302 @@ class JoinITCase extends StreamingWithStateTestBase { StreamITCase.compareWithList(expected) } + // Tests for left outer join + @Test + def testProcTimeLeftOuterJoin(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStateBackend(getStateBackend) + StreamITCase.clear + env.setParallelism(1) + + val sqlQuery = + """ + |SELECT t2.a, t2.c, t1.c + |FROM T1 as t1 LEFT OUTER JOIN T2 as t2 ON + | t1.a = t2.a AND + | t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND + | t2.proctime + INTERVAL '3' SECOND + |""".stripMargin + + val data1 = new mutable.MutableList[(Int, Long, String)] + data1.+=((1, 1L, "Hi1")) + data1.+=((1, 2L, "Hi2")) + data1.+=((1, 5L, "Hi3")) + data1.+=((2, 7L, "Hi5")) + + val data2 = new mutable.MutableList[(Int, Long, String)] + data2.+=((1, 1L, "HiHi")) + data2.+=((2, 2L, "HeHe")) + + val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + .select('a, 'b, 'c, 'proctime) + val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + .select('a, 'b, 'c, 'proctime) + + tEnv.registerTable("T1", t1) + tEnv.registerTable("T2", t2) + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + } + + @Test + def testRowTimeLeftOuterJoin(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStateBackend(getStateBackend) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + StreamITCase.clear + + val sqlQuery = + """ + |SELECT t2.key, t2.id, t1.id + |FROM T1 AS t1 LEFT OUTER JOIN T2 AS t2 ON + | t1.key = t2.key AND + | t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND + | t2.rt + INTERVAL '6' SECOND + |""".stripMargin + + val data1 = new mutable.MutableList[(String, String, Long)] + // for boundary test + data1.+=(("A", "L-1", 1000L)) + data1.+=(("A", "L-2", 2000L)) + data1.+=(("B", "L-4", 4000L)) + data1.+=(("A", "L-6", 6000L)) + data1.+=(("C", "L-7", 7000L)) + data1.+=(("A", "L-10", 10000L)) + data1.+=(("A", "L-12", 12000L)) + data1.+=(("A", "L-20", 20000L)) + + val data2 = new mutable.MutableList[(String, String, Long)] + data2.+=(("A", "R-6", 6000L)) + data2.+=(("B", "R-7", 7000L)) + data2.+=(("D", "R-8", 8000L)) + + val t1 = env.fromCollection(data1) + .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2) + .toTable(tEnv, 'key, 'id, 'rt.rowtime) + val t2 = env.fromCollection(data2) + .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2) + .toTable(tEnv, 'key, 'id, 'rt.rowtime) + + tEnv.registerTable("T1", t1) + tEnv.registerTable("T2", t2) + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + val expected = new java.util.ArrayList[String] + expected.add("A,R-6,L-1") + expected.add("A,R-6,L-2") + expected.add("A,R-6,L-6") + expected.add("A,R-6,L-10") + expected.add("A,R-6,L-12") + expected.add("B,R-7,L-4") + expected.add("null,null,L-7") + expected.add("null,null,L-20") + StreamITCase.compareWithList(expected) + } + + // Tests for right outer join + @Test + def testProcTimeRightOuterJoin(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStateBackend(getStateBackend) + StreamITCase.clear + env.setParallelism(1) + + val sqlQuery = + """ + |SELECT t2.a, t2.c, t1.c + |FROM T1 as t1 RIGHT OUTER JOIN T2 as t2 ON + | t1.a = t2.a AND + | t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND + | t2.proctime + INTERVAL '3' SECOND + |""".stripMargin + + val data1 = new mutable.MutableList[(Int, Long, String)] + data1.+=((1, 1L, "Hi1")) + data1.+=((1, 2L, "Hi2")) + data1.+=((1, 5L, "Hi3")) + data1.+=((2, 7L, "Hi5")) + + val data2 = new mutable.MutableList[(Int, Long, String)] + data2.+=((1, 1L, "HiHi")) + data2.+=((2, 2L, "HeHe")) + + val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + .select('a, 'b, 'c, 'proctime) + val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + .select('a, 'b, 'c, 'proctime) + + tEnv.registerTable("T1", t1) + tEnv.registerTable("T2", t2) + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + } + + @Test + def testRowTimeRightOuterJoin(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStateBackend(getStateBackend) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + StreamITCase.clear + + val sqlQuery = + """ + |SELECT t2.key, t2.id, t1.id + |FROM T1 AS t1 RIGHT OUTER JOIN T2 AS t2 ON + | t1.key = t2.key AND + | t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND + | t2.rt + INTERVAL '6' SECOND + |""".stripMargin + + val data1 = new mutable.MutableList[(String, String, Long)] + // for boundary test + data1.+=(("A", "L-1", 1000L)) + data1.+=(("A", "L-2", 2000L)) + data1.+=(("B", "L-4", 4000L)) + data1.+=(("A", "L-6", 6000L)) + data1.+=(("C", "L-7", 7000L)) + data1.+=(("A", "L-10", 10000L)) + data1.+=(("A", "L-12", 12000L)) + + val data2 = new mutable.MutableList[(String, String, Long)] --- End diff -- Add a row on key A that does not join with left. > Add support for windowed outer joins for streaming tables > --------------------------------------------------------- > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Affects Versions: 1.4.0 > Reporter: Fabian Hueske > Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)