[ 
https://issues.apache.org/jira/browse/FLINK-18996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17182623#comment-17182623
 ] 

Benchao Li commented on FLINK-18996:
------------------------------------

After some more deep-diving into the code with [~nilerzhou] . I think we found 
the root cause for this.

We have already considered the watermark lateness, see: 
[https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIntervalJoin.scala#L335]
 

However, the timer for state cleanup is later than the actual watermark, see 
it's logic here: 
[https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalJoin.java#L368]

Then, for example, we use {{left outer join}}, and there is 1 record in left 
stream, and no record in right stream. Then the trigger time for this record is 
{{rowTime + leftRelativeSize + minCleanUpInterval + allowedLateness + 1}}, 
however the actual watermark is {{Math.max(leftRelativeSize, rightRelativeSize) 
+ allowedLateness}}, then this record will be late for downstream.

> Avoid disorder for time interval join
> -------------------------------------
>
>                 Key: FLINK-18996
>                 URL: https://issues.apache.org/jira/browse/FLINK-18996
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / API, Table SQL / Runtime
>            Reporter: Benchao Li
>            Priority: Major
>
> Currently, the time interval join will produce data with rowtime later than 
> watermark. If we use the rowtime again in downstream, e.t. window 
> aggregation, we'll lose some data.
>  
> reported from user-zh: 
> [http://apache-flink.147419.n8.nabble.com/Re-flink-interval-join-tc4458.html#none]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to