[ https://issues.apache.org/jira/browse/FLINK-35157 ]
elon_X deleted comment on FLINK-35157: -------------------------------- was (Author: JIRAUSER303028): [~fanrui] Sure, I will backport this fix to 1.17, 1.18, and 1.19. Thank you :D > Sources with watermark alignment get stuck once some subtasks finish > -------------------------------------------------------------------- > > Key: FLINK-35157 > URL: https://issues.apache.org/jira/browse/FLINK-35157 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.17.2, 1.19.0, 1.18.1 > Reporter: Gyula Fora > Assignee: elon_X > Priority: Critical > Labels: pull-request-available > Fix For: 1.20.0 > > Attachments: image-2024-04-24-21-36-16-146.png > > > The current watermark alignment logic can easily get stuck if some subtasks > finish while others are still running. > The reason is that once a source subtask finishes, the subtask is not > excluded from alignment, effectively blocking the rest of the job to make > progress beyond last wm + alignment time for the finished sources. > This can be easily reproduced by the following simple pipeline: > {noformat} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(2); > DataStream<Long> s = env.fromSource(new NumberSequenceSource(0, 100), > > WatermarkStrategy.<Long>forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner<Long>) > (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10), > Duration.ofSeconds(2)), > "Sequence Source").filter((FilterFunction<Long>) aLong -> { > Thread.sleep(200); > return true; > } > ); > s.print(); > env.execute();{noformat} > The solution could be to send out a max watermark event once the sources > finish or to exclude them from the source coordinator -- This message was sent by Atlassian Jira (v8.20.10#820010)