Piotr Nowojski created FLINK-35886:
--------------------------------------

             Summary: Incorrect watermark idleness timeout accounting when 
subtask is backpressured/blocked
                 Key: FLINK-35886
                 URL: https://issues.apache.org/jira/browse/FLINK-35886
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream, Runtime / Task
    Affects Versions: 1.19.1, 1.18.1, 1.20.0
            Reporter: Piotr Nowojski


Currently when using watermark with idleness in Flink, idleness can be 
incorrectly detected when reading records from a source that is blocked by the 
runtime. For example this can easily happen when source is either 
backpressured, or blocked by the watermark alignment. In those cases, despite 
there are more records to be read from the source (or source’s split), runtime 
is deciding not to poll (or being unable to) those records. In such case 
idleness timeout can kick in, marking source/source split as idle, which can 
lead to incorrect combined watermark calculations and dropping of incorrectly 
marked late records.

h4. Watermark alignment

If there are two source splits, A and B , and maxAllowedWatermarkDrift is set 
to 30s. 

# Partition A emitted watermark 1042 sec, while partition B sits at watermark 
1000 sec.
# {{1042s - 1000s > maxAllowedWatermarkDrift}}, so partition A is blocked by 
the watermark alignment.
# For the duration of idleTimeout, partition B is emitting some large batch of 
records, that do not advance watermark of that partition by much. For example 
either watermark for partition B stays 1000s, or is updated by a small amount 
to for example 1005s.
# idleTimeout kicks in, marking partition A as idle
# partition B finishes emitting large batch of those older records, and let's 
say now there is a gap in rowtimes. Previously partition B was emitting records 
with rowtime ~1000s, now it jumps to for example ~5000s.
# As partition A is idle, combined watermark jumps to ~5000s as well.
# Watermark alignment unblocks partition A, and it continues emitting records 
with rowtime ~1042s. But now all of those records are dropped due to being late.

h4. Backpressure

When there are two SourceOperator’s, A and B. Due to for example some data 
skew, it could happen that either only A gets backpressured, or A is 
backpressured quicker/sooner. Either way, during that time when A is 
backpressured, while B is not, B can bump the combined watermark high enough, 
so that when backpressure recedes, fresh records from A will be considered as 
late, leading to incorrect results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to