Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r142769397 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala --- @@ -102,5 +117,154 @@ class JoinITCase extends StreamingWithStateTestBase { env.execute() } + /** test rowtime inner join **/ + @Test + def testRowTimeInnerJoin(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStateBackend(getStateBackend) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + StreamITCase.clear + env.setParallelism(1) + + val sqlQuery = + """ + |SELECT t2.a, t2.c, t1.c + |FROM T1 as t1 join T2 as t2 ON + | t1.a = t2.a AND + | t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND + | t2.rt + INTERVAL '6' SECOND + |""".stripMargin + + val data1 = new mutable.MutableList[(Int, Long, String, Long)] --- End diff -- Add two rows with null keys on both sides within join window boundaries to test that join predicates on null values are not evaluated to true. For this to work we need to also fix the `keyBy()` calls to support partitioning of null keys (see #4732)
---