Hi All I am trying watermark alignment in Flink 1.15 with:
watermarkStrategy = WatermarkStrategy.<~>forBoundedOutOfOrderness( Duration.ofMillis(outOfOrderness)) .withWatermarkAlignment("wm-group", Duration.ofSeconds(10), Duration.ofSeconds(1)) .withTimestampAssigner( (element, timestamp) -> element.getTimestamp()) .withIdleness(Duration.ofSeconds(1)); And got the following in DEBUG logs: 2022-07-10 06:53:35,713 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=-9223372036854765808 to subTaskIds=[] 2022-07-10 06:53:36,606 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 1657436016036 (2022-07-10 06:53:36.036) from subTaskId=2 2022-07-10 06:53:36,619 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 1657435956048 (2022-07-10 06:52:36.048) from subTaskId=1 2022-07-10 06:53:36,639 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 1657436016034 (2022-07-10 06:53:36.034) from subTaskId=3 2022-07-10 06:53:36,702 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 1657436016053 (2022-07-10 06:53:36.053) from subTaskId=0 2022-07-10 06:53:36,713 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=1657435966048 to subTaskIds=[0, 1, 2, 3] 2022-07-10 06:53:37,229 DEBUG shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - Update lock acquire time to keep lease 2022-07-10 06:53:37,237 DEBUG shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - TryAcquireOrRenew return success 2022-07-10 06:53:37,237 DEBUG shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - Successfully renewed lease 2022-07-10 06:53:37,603 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from subTaskId=2 2022-07-10 06:53:37,605 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from subTaskId=3 2022-07-10 06:53:37,616 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from subTaskId=1 2022-07-10 06:53:37,630 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from subTaskId=0 2022-07-10 06:53:37,713 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=-9223372036854765809 to subTaskIds=[0, 1, 2, 3] 2022-07-10 06:53:38,603 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from subTaskId=2 2022-07-10 06:53:38,604 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from subTaskId=3 2022-07-10 06:53:38,616 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from subTaskId=1 2022-07-10 06:53:38,630 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from subTaskId=0 2022-07-10 06:53:38,713 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=-9223372036854765809 to subTaskIds=[0, 1, 2, 3] Then it stays with maxAllowedWatermark=-9223372036854765809 all the time. The watermark looks to be correct at beginning, then changed to a something related Long.MAX_VALUE… Feels like a buffer overflow issue.. As long as I remove the call .withWatermarkAlignment(), then all worked fine. Any idea? Thanks Jun