[ 
https://issues.apache.org/jira/browse/FLINK-35886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868098#comment-17868098
 ] 

Piotr Nowojski edited comment on FLINK-35886 at 7/23/24 3:45 PM:
-----------------------------------------------------------------

I think this problem requires a public API change, so I have published a FLIP 
for this: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-471%3A++Fixing+watermark+idleness+timeout+accounting


was (Author: pnowojski):
I think this problem requires a public API change, so I will publish a FLIP 
shortly.

> Incorrect watermark idleness timeout accounting when subtask is 
> backpressured/blocked
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-35886
>                 URL: https://issues.apache.org/jira/browse/FLINK-35886
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, Runtime / Task
>    Affects Versions: 1.18.1, 1.20.0, 1.19.1
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>            Priority: Critical
>
> Currently when using watermark with idleness in Flink, idleness can be 
> incorrectly detected when reading records from a source that is blocked by 
> the runtime. For example this can easily happen when source is either 
> backpressured, or blocked by the watermark alignment. In those cases, despite 
> there are more records to be read from the source (or source’s split), 
> runtime is deciding not to poll (or being unable to) those records. In such 
> case idleness timeout can kick in, marking source/source split as idle, which 
> can lead to incorrect combined watermark calculations and dropping of 
> incorrectly marked late records.
> h4. Watermark alignment
> If there are two source splits, A and B , and maxAllowedWatermarkDrift is set 
> to 30s. 
> # Partition A emitted watermark 1042 sec, while partition B sits at watermark 
> 1000 sec.
> # {{1042s - 1000s > maxAllowedWatermarkDrift}}, so partition A is blocked by 
> the watermark alignment.
> # For the duration of idleTimeout, partition B is emitting some large batch 
> of records, that do not advance watermark of that partition by much. For 
> example either watermark for partition B stays 1000s, or is updated by a 
> small amount to for example 1005s.
> # idleTimeout kicks in, marking partition A as idle
> # partition B finishes emitting large batch of those older records, and let's 
> say now there is a gap in rowtimes. Previously partition B was emitting 
> records with rowtime ~1000s, now it jumps to for example ~5000s.
> # As partition A is idle, combined watermark jumps to ~5000s as well.
> # Watermark alignment unblocks partition A, and it continues emitting records 
> with rowtime ~1042s. But now all of those records are dropped due to being 
> late.
> h4. Backpressure
> When there are two SourceOperator’s, A and B. Due to for example some data 
> skew, it could happen that either only A gets backpressured, or A is 
> backpressured quicker/sooner. Either way, during that time when A is 
> backpressured, while B is not, B can bump the combined watermark high enough, 
> so that when backpressure recedes, fresh records from A will be considered as 
> late, leading to incorrect results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to