Hi Alexis, There are a couple of recent Flink tickets on watermark alignment, specifically https://issues.apache.org/jira/browse/FLINK-32414 and https://issues.apache.org/jira/browse/FLINK-32420 - Could the later be also applicable in your case?
Best regards, Martijn On Wed, Jun 28, 2023 at 11:33 AM Alexis Sarda-Espinosa < [email protected]> wrote: > Hello, > > just for completeness, I don't see the problem if I assign a different > alignment group to each source, i.e. using only split-level watermark > alignment. > > Regards, > Alexis. > > Am Mi., 28. Juni 2023 um 08:13 Uhr schrieb haishui <[email protected]>: > >> Hi, >> I have the same trouble. This is really a bug. >> `shouldWaitForAlignment` needs to be another change. >> >> By the way, a source will be marked as idle, when the source has waiting >> for alignment for a long time. Is this a bug? >> >> >> >> >> >> >> 在 2023-06-27 23:25:38,"Alexis Sarda-Espinosa" <[email protected]> >> 写道: >> >> Hello, >> >> I am currently evaluating idleness and alignment with Flink 1.17.1 and >> the externalized Kafka connector. My job has 3 sources whose watermark >> strategies are defined like this: >> >> WatermarkStrategy.<T>forBoundedOutOfOrderness(maxAllowedWatermarkDrift) >> .withIdleness(idleTimeout) >> .withWatermarkAlignment("group", maxAllowedWatermarkDrift, >> Duration.ofSeconds(1L)) >> >> The max allowed drift is currently 5 seconds, and my sources have an >> idleTimeout of 1, 1.5, and 5 seconds. >> >> What I observe is that, when I restart the job, all sources publish >> messages, but then 2 of them are marked as idle and never resume. I found >> https://issues.apache.org/jira/browse/FLINK-31632, which should be fixed >> in 1.17.1, but I don't think it's the same issue, my logs don't show >> negative values: >> >> 2023-06-27 15:11:42,927 DEBUG >> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New >> reported watermark=Watermark @ 1687878696690 (2023-06-27 15:11:36.690) from >> subTaskId=1 >> 2023-06-27 15:11:43,009 DEBUG >> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New >> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 >> 07:12:55.807) from subTaskId=0 >> 2023-06-27 15:11:43,091 DEBUG >> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New >> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 >> 07:12:55.807) from subTaskId=0 >> 2023-06-27 15:11:43,116 DEBUG >> org.apache.flink.runtime.source.coordinator.SourceCoordinator:609 - New >> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 >> 07:12:55.807) from subTaskId=0 >> 2023-06-27 15:11:43,298 INFO >> org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 - >> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0, 1] >> 2023-06-27 15:11:43,304 INFO >> org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 - >> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0] >> 2023-06-27 15:11:43,306 INFO >> org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 - >> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[0] >> 2023-06-27 15:11:43,486 INFO >> org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 - >> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[] >> 2023-06-27 15:11:43,489 INFO >> org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 - >> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[] >> 2023-06-27 15:11:43,492 INFO >> org.apache.flink.runtime.source.coordinator.SourceCoordinator:194 - >> Distributing maxAllowedWatermark=1687878701690 to subTaskIds=[] >> >> Does anyone know if I'm missing something or this is really a bug? >> >> Regards, >> Alexis. >> >>
