[ https://issues.apache.org/jira/browse/FLINK-35886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868098#comment-17868098 ]
Piotr Nowojski edited comment on FLINK-35886 at 7/23/24 3:45 PM: ----------------------------------------------------------------- I think this problem requires a public API change, so I have published a FLIP for this: https://cwiki.apache.org/confluence/display/FLINK/FLIP-471%3A++Fixing+watermark+idleness+timeout+accounting was (Author: pnowojski): I think this problem requires a public API change, so I will publish a FLIP shortly. > Incorrect watermark idleness timeout accounting when subtask is > backpressured/blocked > ------------------------------------------------------------------------------------- > > Key: FLINK-35886 > URL: https://issues.apache.org/jira/browse/FLINK-35886 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Runtime / Task > Affects Versions: 1.18.1, 1.20.0, 1.19.1 > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > Priority: Critical > > Currently when using watermark with idleness in Flink, idleness can be > incorrectly detected when reading records from a source that is blocked by > the runtime. For example this can easily happen when source is either > backpressured, or blocked by the watermark alignment. In those cases, despite > there are more records to be read from the source (or source’s split), > runtime is deciding not to poll (or being unable to) those records. In such > case idleness timeout can kick in, marking source/source split as idle, which > can lead to incorrect combined watermark calculations and dropping of > incorrectly marked late records. > h4. Watermark alignment > If there are two source splits, A and B , and maxAllowedWatermarkDrift is set > to 30s. > # Partition A emitted watermark 1042 sec, while partition B sits at watermark > 1000 sec. > # {{1042s - 1000s > maxAllowedWatermarkDrift}}, so partition A is blocked by > the watermark alignment. > # For the duration of idleTimeout, partition B is emitting some large batch > of records, that do not advance watermark of that partition by much. For > example either watermark for partition B stays 1000s, or is updated by a > small amount to for example 1005s. > # idleTimeout kicks in, marking partition A as idle > # partition B finishes emitting large batch of those older records, and let's > say now there is a gap in rowtimes. Previously partition B was emitting > records with rowtime ~1000s, now it jumps to for example ~5000s. > # As partition A is idle, combined watermark jumps to ~5000s as well. > # Watermark alignment unblocks partition A, and it continues emitting records > with rowtime ~1042s. But now all of those records are dropped due to being > late. > h4. Backpressure > When there are two SourceOperator’s, A and B. Due to for example some data > skew, it could happen that either only A gets backpressured, or A is > backpressured quicker/sooner. Either way, during that time when A is > backpressured, while B is not, B can bump the combined watermark high enough, > so that when backpressure recedes, fresh records from A will be considered as > late, leading to incorrect results. -- This message was sent by Atlassian Jira (v8.20.10#820010)