Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4625#discussion_r139401824 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/JoinHarnessTest.scala --- @@ -383,13 +384,158 @@ class JoinHarnessTest extends HarnessTestBase{ val expectedOutput = new ConcurrentLinkedQueue[Object]() expectedOutput.add(new StreamRecord( - CRow(Row.of(2: JInt, "aaa2", 2: JInt, "bbb7"), true), 7)) + CRow(Row.of(2L: JLong, "aaa2", 2L: JLong, "bbb7"), true), 7)) expectedOutput.add(new StreamRecord( - CRow(Row.of(1: JInt, "aaa3", 1: JInt, "bbb12"), true), 12)) + CRow(Row.of(1L: JLong, "aaa3", 1L: JLong, "bbb12"), true), 12)) verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() } + /** a.c1 >= b.rowtime - 10 and a.rowtime <= b.rowtime + 20 **/ + @Test + def testCommonRowTimeJoin() { + + val joinProcessFunc = new RowTimeBoundedStreamInnerJoin( + -10, 20, 0, rT, rT, "TestJoinFunction", funcCode, 0, 0) + + val operator: KeyedCoProcessOperator[String, CRow, CRow, CRow] = + new KeyedCoProcessOperator[String, CRow, CRow, CRow](joinProcessFunc) + val testHarness: KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow] = + new KeyedTwoInputStreamOperatorTestHarness[String, CRow, CRow, CRow]( + operator, + new TupleRowKeySelector[String](1), + new TupleRowKeySelector[String](1), + BasicTypeInfo.STRING_TYPE_INFO, + 1, 1, 0) + + testHarness.open() + + // Advance + testHarness.processWatermark1(new Watermark(1)) + testHarness.processWatermark2(new Watermark(1)) + + // Test late data + testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(1L: JLong, "k1"), true), 0)) + + assertEquals(0, testHarness.numEventTimeTimers()) + + testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(2L: JLong, "k1"), true), 0)) + testHarness.processElement2(new StreamRecord[CRow]( + CRow(Row.of(2L: JLong, "k1"), true), 0)) + + assertEquals(2, testHarness.numEventTimeTimers()) + assertEquals(4, testHarness.numKeyedStateEntries()) + + testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(5L: JLong, "k1"), true), 0)) + testHarness.processElement2(new StreamRecord[CRow]( + CRow(Row.of(15L: JLong, "k1"), true), 0)) + + testHarness.processWatermark1(new Watermark(20)) + testHarness.processWatermark2(new Watermark(20)) + + assertEquals(4, testHarness.numKeyedStateEntries()) + + testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(35L: JLong, "k1"), true), 0)) + + testHarness.processWatermark1(new Watermark(38)) + testHarness.processWatermark2(new Watermark(38)) + + testHarness.processElement1(new StreamRecord[CRow]( + CRow(Row.of(40L: JLong, "k2"), true), 0)) + testHarness.processElement2(new StreamRecord[CRow]( + CRow(Row.of(39L: JLong, "k2"), true), 0)) + + assertEquals(6, testHarness.numKeyedStateEntries()) + + testHarness.processWatermark1(new Watermark(61)) + testHarness.processWatermark2(new Watermark(61)) + + assertEquals(4, testHarness.numKeyedStateEntries()) + + val expectedOutput = new ConcurrentLinkedQueue[Object]() --- End diff -- Add multiple rows for the same key and time to validate that this case is correctly handled. It might make sense to add another string field to the data with a unique value ("left1", ...) to make the input and output records easier to compare.
---