[ https://issues.apache.org/jira/browse/FLINK-33109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17767128#comment-17767128 ]
Rui Fan commented on FLINK-33109: --------------------------------- Could you try this image? It's the offcial image based on release-1.17 branch. [https://github.com/apache/flink-docker/pkgs/container/flink-docker/128728529?tag=1.17-SNAPSHOT-scala_2.12-java11-debian] Also, would you mind sharing a flink job demo that can reproduce your bug? If so, I can try it with the latest code. > Watermark alignment not applied after recovery from checkpoint > -------------------------------------------------------------- > > Key: FLINK-33109 > URL: https://issues.apache.org/jira/browse/FLINK-33109 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.17.1 > Reporter: Yordan Pavlov > Priority: Major > Attachments: image-2023-09-18-15-40-06-868.png, > image-2023-09-18-15-46-16-106.png > > > I am observing a problem where after recovery from a checkpoint the Kafka > source watermarks would start to diverge not honoring the watermark alignment > setting I have applied. > I have a Kafka source which reads a topic with 32 partitions. I am applying > the following watermark strategy: > {code:java} > new EventAwareWatermarkStrategy[KeyedKafkaSourceMessage]](msg => > msg.value.getTimestamp) > .withWatermarkAlignment("alignment-sources-group", > time.Duration.ofMillis(sourceWatermarkAlignmentBlocks)){code} > > This works great up until my job needs to recover from checkpoint. Once the > recovery takes place, no alignment is taking place any more. This can best be > illustrated by looking at the watermark metrics for various operators in the > image: > !image-2023-09-18-15-40-06-868.png! > > You can see how the watermarks disperse after the recovery. Trying to debug > the problem I noticed that before the failure there would be calls in > > {code:java} > SourceCoordinator::announceCombinedWatermark() > {code} > after the recovery, no calls get there, so no value for > {code:java} > watermarkAlignmentParams.getMaxAllowedWatermarkDrift(){code} > is ever read. I can manually fix the problem If I stop the job, clear all > state from Zookeeper and then manually start Flink providing the last > checkpoint with > {code:java} > '–fromSavepoint'{code} > flag. This would cause the SourceCoordinator to be constructed properly and > watermark drift to be checked. Once recovery manually watermarks would again > converge to the allowed drift as seen in the metrics: > !image-2023-09-18-15-46-16-106.png! > > Let me know If I can be helpful by providing any more information. > -- This message was sent by Atlassian Jira (v8.20.10#820010)