[ https://issues.apache.org/jira/browse/FLINK-35886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17924419#comment-17924419 ]
Piotr Nowojski commented on FLINK-35886: ---------------------------------------- Hi [~afedulov], sorry I was out of office and now I'm going back through the notifications. However I noticed that you have already cherry-picked the japi exclusion for this bug fix from 1.19 to 1.20, so everything has been already fixed? fed8c110558 [10 days ago] [FLINK-35886][source] Exclude japi checks for the API changes ? > Incorrect watermark idleness timeout accounting when subtask is > backpressured/blocked > ------------------------------------------------------------------------------------- > > Key: FLINK-35886 > URL: https://issues.apache.org/jira/browse/FLINK-35886 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Runtime / Task > Affects Versions: 1.18.1, 1.20.0, 1.19.1 > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > Priority: Critical > Labels: pull-request-available > Fix For: 1.19.2, 1.20.1, 2.0-preview > > > Currently when using watermark with idleness in Flink, idleness can be > incorrectly detected when reading records from a source that is blocked by > the runtime. For example this can easily happen when source is either > backpressured, or blocked by the watermark alignment. In those cases, despite > there are more records to be read from the source (or source’s split), > runtime is deciding not to poll (or being unable to) those records. In such > case idleness timeout can kick in, marking source/source split as idle, which > can lead to incorrect combined watermark calculations and dropping of > incorrectly marked late records. > h4. Watermark alignment > If there are two source splits, A and B , and maxAllowedWatermarkDrift is set > to 30s. > # Partition A emitted watermark 1042 sec, while partition B sits at watermark > 1000 sec. > # {{1042s - 1000s > maxAllowedWatermarkDrift}}, so partition A is blocked by > the watermark alignment. > # For the duration of idleTimeout, partition B is emitting some large batch > of records, that do not advance watermark of that partition by much. For > example either watermark for partition B stays 1000s, or is updated by a > small amount to for example 1005s. > # idleTimeout kicks in, marking partition A as idle > # partition B finishes emitting large batch of those older records, and let's > say now there is a gap in rowtimes. Previously partition B was emitting > records with rowtime ~1000s, now it jumps to for example ~5000s. > # As partition A is idle, combined watermark jumps to ~5000s as well. > # Watermark alignment unblocks partition A, and it continues emitting records > with rowtime ~1042s. But now all of those records are dropped due to being > late. > h4. Backpressure > When there are two SourceOperator’s, A and B. Due to for example some data > skew, it could happen that either only A gets backpressured, or A is > backpressured quicker/sooner. Either way, during that time when A is > backpressured, while B is not, B can bump the combined watermark high enough, > so that when backpressure recedes, fresh records from A will be considered as > late, leading to incorrect results. -- This message was sent by Atlassian Jira (v8.20.10#820010)