[ https://issues.apache.org/jira/browse/FLINK-24466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17449708#comment-17449708 ]
Shen Zhu commented on FLINK-24466: ---------------------------------- Hey [~slinkydeveloper] , thanks for writing the patch, it's very useful! However, I have a question about filtering late events in {*}KeyedCoProcessOperatorWithWatermarkDelayAndFilterLateEvents{*}, based on the tests in {*}RowTimeIntervalJoinTest{*}, seems the late events will be cached {code:java} testHarness.processWatermark1(new Watermark(1)); testHarness.processWatermark2(new Watermark(1)); // Test late data. testHarness.processElement1(insertRecord(1L, "k1")); // Though (1L, "k1") is actually late, it will also be cached. assertEquals(1, testHarness.numEventTimeTimers()); {code} And if we filter the events in {*}KeyedCoProcessOperatorWithWatermarkDelayAndFilterLateEvents{*}, those events wouldn't be part of the join process, which will cause the following test cast to fail {code:java} List<Object> expectedOutput = new ArrayList<>(); expectedOutput.add(new Watermark(-19)); // This result is produced by the late row (1, "k1"). expectedOutput.add(insertRecord(1L, "k1", 2L, "k1")); expectedOutput.add(insertRecord(2L, "k1", 2L, "k1")); expectedOutput.add(insertRecord(5L, "k1", 2L, "k1")); expectedOutput.add(insertRecord(5L, "k1", 15L, "k1")); expectedOutput.add(new Watermark(0)); expectedOutput.add(insertRecord(35L, "k1", 15L, "k1")); expectedOutput.add(new Watermark(18)); expectedOutput.add(insertRecord(40L, "k2", 39L, "k2")); expectedOutput.add(new Watermark(41));{code} Do you think we would update the path to respect such behavior? Best, Shen Zhu > Interval Join late events handling behaviour is not consistent > -------------------------------------------------------------- > > Key: FLINK-24466 > URL: https://issues.apache.org/jira/browse/FLINK-24466 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Reporter: Francesco Guardiani > Priority: Major > Attachments: Fix_late_events_filtering_for_interval_join.patch > > > Interval Join handles late events emitting them in the output, as a padded > row. This behavior is also tested extensively in {{RowTimeIntervalJoinTest}}. > The problem with this behavior is the way an event is considered "late" or > not: in order to distinguish between the two, {{RowTimeIntervalJoin}} uses > the {{ctx.timerService().currentWatermark()}} to find out if an event is > later than the last received watermark or not. But that method returns the > "combined" watermark across all the keys, partitions and *input streams*, > that is if one of the two streams goes "slower" than the other one, the > returned watermark is going to be the minimum among the two. > This means that our late events handling effectively works only if the two > streams run "at the same pace", otherwise we'll just see what we consider > _late events_ for one of the two streams as joined. > To observe this behavior, just run the test > {{IntervalJoinITCase#testRowTimeInnerJoinWithEquiTimeAttrs}} in this revision > https://github.com/apache/flink/commit/7033cbfe404bea1519d3342a611e2f92768d70f9 > several times and you'll see that after a couple of runs it fails, joining > one of the {{"should-be-discarded"}} records. Those records are way behind > the watermark - 1 second, as defined. > You'll find attached in the issue a small patch to show how this could be > fixed. -- This message was sent by Atlassian Jira (v8.20.1#820001)