Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5327#discussion_r182700177
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
 ---
    @@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase {
         expected.add("D,R-8,null")
         StreamITCase.compareWithList(expected)
       }
    +
    +  /** test non-window inner join **/
    +  @Test
    +  def testNonWindowInnerJoin(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setStateBackend(getStateBackend)
    +    StreamITCase.clear
    +
    +    val data1 = new mutable.MutableList[(Int, Long, String)]
    +    data1.+=((1, 1L, "Hi1"))
    +    data1.+=((1, 2L, "Hi2"))
    +    data1.+=((1, 2L, "Hi2"))
    +    data1.+=((1, 5L, "Hi3"))
    +    data1.+=((2, 7L, "Hi5"))
    +    data1.+=((1, 9L, "Hi6"))
    +    data1.+=((1, 8L, "Hi8"))
    +    data1.+=((3, 8L, "Hi9"))
    +
    +    val data2 = new mutable.MutableList[(Int, Long, String)]
    +    data2.+=((1, 1L, "HiHi"))
    +    data2.+=((2, 2L, "HeHe"))
    +    data2.+=((3, 2L, "HeHe"))
    +
    +    val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c)
    +      .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
    +    val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c)
    +      .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
    +
    +    tEnv.registerTable("T1", t1)
    +    tEnv.registerTable("T2", t2)
    +
    +    val sqlQuery =
    +      """
    +        |SELECT t2.a, t2.c, t1.c
    +        |FROM T1 as t1 JOIN T2 as t2 ON
    +        |  t1.a = t2.a AND
    +        |  t1.b > t2.b
    +        |""".stripMargin
    +
    +    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
    +    result.addSink(new StreamITCase.StringSink[Row])
    +    env.execute()
    +
    +    val expected = mutable.MutableList(
    +      "1,HiHi,Hi2",
    +      "1,HiHi,Hi2",
    +      "1,HiHi,Hi3",
    +      "1,HiHi,Hi6",
    +      "1,HiHi,Hi8",
    +      "2,HeHe,Hi5",
    +      "null,HeHe,Hi9")
    +
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testJoin(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +    env.setStateBackend(getStateBackend)
    +    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
    +
    +    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 
'a, 'b, 'c)
    +    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
    +    tEnv.registerTable("Table3", ds1)
    +    tEnv.registerTable("Table5", ds2)
    +
    +    val result = tEnv.sqlQuery(sqlQuery)
    +
    +    val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo 
Welt")
    +    val results = result.toRetractStream[Row]
    +    results.addSink(new StreamITCase.RetractingSink)
    +    env.execute()
    +    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
    +  }
    +
    +  @Test
    +  def testJoinWithFilter(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +    env.setStateBackend(getStateBackend)
    +
    +    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
    +
    +    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 
'a, 'b, 'c)
    +    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
    +    tEnv.registerTable("Table3", ds1)
    +    tEnv.registerTable("Table5", ds2)
    +
    +    val result = tEnv.sqlQuery(sqlQuery)
    +
    +    val expected = Seq("Hi,Hallo")
    +    val results = result.toRetractStream[Row]
    +    results.addSink(new StreamITCase.RetractingSink)
    +    env.execute()
    +    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
    +  }
    +
    +  @Test
    +  def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +    env.setStateBackend(getStateBackend)
    +
    +    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 
AND h < b"
    +
    +    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
    +    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
    +    tEnv.registerTable("Table3", ds1)
    +    tEnv.registerTable("Table5", ds2)
    +
    +    val result = tEnv.sqlQuery(sqlQuery)
    +
    +    val expected = Seq("Hello world, how are you?,Hallo Welt wie", "I am 
fine.,Hallo Welt wie")
    +    val results = result.toRetractStream[Row]
    +    results.addSink(new StreamITCase.RetractingSink)
    +    env.execute()
    +    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
    +  }
    +
    +  @Test
    +  def testJoinWithMultipleKeys(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +    env.setStateBackend(getStateBackend)
    +
    +    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d AND b = h"
    +
    +    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
    +    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
    +    tEnv.registerTable("Table3", ds1)
    +    tEnv.registerTable("Table5", ds2)
    +
    +    val result = tEnv.sqlQuery(sqlQuery)
    +
    +    val expected = Seq(
    +      "Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt wie gehts?", 
"Hello world,ABC",
    +      "I am fine.,HIJ", "I am fine.,IJK")
    +    val results = result.toRetractStream[Row]
    +    results.addSink(new StreamITCase.RetractingSink)
    +    env.execute()
    +    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
    +  }
    +
    +  @Test
    +  def testJoinWithAlias(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +    env.setStateBackend(getStateBackend)
    +
    +    val sqlQuery =
    +      "SELECT Table5.c, T.`1-_./Ü` FROM (SELECT a, b, c AS `1-_./Ü` FROM 
Table3) AS T, Table5 " +
    +        "WHERE a = d AND a < 4"
    +
    +    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
    +    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'c)
    +    tEnv.registerTable("Table3", ds1)
    +    tEnv.registerTable("Table5", ds2)
    +
    +    val result = tEnv.sqlQuery(sqlQuery)
    +
    +    val expected = Seq("1,Hi", "2,Hello", "1,Hello",
    +      "2,Hello world", "2,Hello world", "3,Hello world")
    +    val results = result.toRetractStream[Row]
    +    results.addSink(new StreamITCase.RetractingSink)
    +    env.execute()
    +    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
    +  }
    +
    +  @Test
    +  def testDataStreamJoinWithAggregation(): Unit = {
    --- End diff --
    
    Can you try to use a consistent naming scheme for the test methods you 
added. Remove `DataStream` or `Table` from the names. And mark `Inner`, `Outer` 
joins correctly.


---

Reply via email to