Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5327#discussion_r182700177 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase { expected.add("D,R-8,null") StreamITCase.compareWithList(expected) } + + /** test non-window inner join **/ + @Test + def testNonWindowInnerJoin(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStateBackend(getStateBackend) + StreamITCase.clear + + val data1 = new mutable.MutableList[(Int, Long, String)] + data1.+=((1, 1L, "Hi1")) + data1.+=((1, 2L, "Hi2")) + data1.+=((1, 2L, "Hi2")) + data1.+=((1, 5L, "Hi3")) + data1.+=((2, 7L, "Hi5")) + data1.+=((1, 9L, "Hi6")) + data1.+=((1, 8L, "Hi8")) + data1.+=((3, 8L, "Hi9")) + + val data2 = new mutable.MutableList[(Int, Long, String)] + data2.+=((1, 1L, "HiHi")) + data2.+=((2, 2L, "HeHe")) + data2.+=((3, 2L, "HeHe")) + + val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) + val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c) + .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c) + + tEnv.registerTable("T1", t1) + tEnv.registerTable("T2", t2) + + val sqlQuery = + """ + |SELECT t2.a, t2.c, t1.c + |FROM T1 as t1 JOIN T2 as t2 ON + | t1.a = t2.a AND + | t1.b > t2.b + |""".stripMargin + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = mutable.MutableList( + "1,HiHi,Hi2", + "1,HiHi,Hi2", + "1,HiHi,Hi3", + "1,HiHi,Hi6", + "1,HiHi,Hi8", + "2,HeHe,Hi5", + "null,HeHe,Hi9") + + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testJoin(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + env.setStateBackend(getStateBackend) + val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e" + + val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + val result = tEnv.sqlQuery(sqlQuery) + + val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt") + val results = result.toRetractStream[Row] + results.addSink(new StreamITCase.RetractingSink) + env.execute() + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testJoinWithFilter(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + env.setStateBackend(getStateBackend) + + val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2" + + val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + val result = tEnv.sqlQuery(sqlQuery) + + val expected = Seq("Hi,Hallo") + val results = result.toRetractStream[Row] + results.addSink(new StreamITCase.RetractingSink) + env.execute() + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testInnerJoinWithNonEquiJoinPredicate(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + env.setStateBackend(getStateBackend) + + val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b" + + val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + val result = tEnv.sqlQuery(sqlQuery) + + val expected = Seq("Hello world, how are you?,Hallo Welt wie", "I am fine.,Hallo Welt wie") + val results = result.toRetractStream[Row] + results.addSink(new StreamITCase.RetractingSink) + env.execute() + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testJoinWithMultipleKeys(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + env.setStateBackend(getStateBackend) + + val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d AND b = h" + + val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + val result = tEnv.sqlQuery(sqlQuery) + + val expected = Seq( + "Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt wie gehts?", "Hello world,ABC", + "I am fine.,HIJ", "I am fine.,IJK") + val results = result.toRetractStream[Row] + results.addSink(new StreamITCase.RetractingSink) + env.execute() + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testJoinWithAlias(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + env.setStateBackend(getStateBackend) + + val sqlQuery = + "SELECT Table5.c, T.`1-_./Ã` FROM (SELECT a, b, c AS `1-_./Ã` FROM Table3) AS T, Table5 " + + "WHERE a = d AND a < 4" + + val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) + val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'c) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + val result = tEnv.sqlQuery(sqlQuery) + + val expected = Seq("1,Hi", "2,Hello", "1,Hello", + "2,Hello world", "2,Hello world", "3,Hello world") + val results = result.toRetractStream[Row] + results.addSink(new StreamITCase.RetractingSink) + env.execute() + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testDataStreamJoinWithAggregation(): Unit = { --- End diff -- Can you try to use a consistent naming scheme for the test methods you added. Remove `DataStream` or `Table` from the names. And mark `Inner`, `Outer` joins correctly.
---