Hi Reem My thinking is that this might be related to recently reported https://issues.apache.org/jira/browse/FLINK-31632.
Best regards, Martijn On Wed, Mar 29, 2023 at 7:07 PM Reem Razak via user <user@flink.apache.org> wrote: > Hey Martijn, > > The version is 1.16.0 > > On Wed, Mar 29, 2023 at 5:43 PM Martijn Visser <martijnvis...@apache.org> > wrote: > >> Hi Reem, >> >> What's the Flink version where you're encountering this issue? >> >> Best regards, >> >> Martijn >> >> On Wed, Mar 29, 2023 at 5:18 PM Reem Razak via user < >> user@flink.apache.org> wrote: >> >>> Hey there! >>> >>> We are seeing a second Flink pipeline encountering similar issues when >>> configuring both `withWatermarkAlignment` and `withIdleness`. The >>> unexpected behaviour gets triggered after a Kafka cluster failover. Any >>> thoughts on there being an incompatibility between the two? >>> >>> Thanks! >>> >>> On Wed, Nov 9, 2022 at 6:42 PM Reem Razak <reem.ra...@shopify.com> >>> wrote: >>> >>>> Hi there, >>>> >>>> We are integrating the watermark alignment feature into a pipeline with >>>> a Kafka source during a "backfill"- i.e. playing from an earlier Kafka >>>> offset. While testing, we noticed some unexpected behaviour in the >>>> watermark advancement which was resolved by removing `withIdleness` from >>>> our watermark strategy. >>>> >>>> >>>> val watermarkStrategy = WatermarkStrategy >>>> .forBoundedOutOfOrderness(Duration.ofMinutes(1)) >>>> .withTimestampAssigner(new >>>> TimestampedEventTimestampAssigner[Event]) >>>> .withWatermarkAlignment("alignment-group-1", >>>> Duration.ofMinutes(1)) >>>> .withIdleness(Duration.ofMinutes(5)) >>>> >>>> I have attached a couple of screenshots of the watermarkAlignmentDrift >>>> metric. As you can see, the behaviour seems normal until a sudden drop in >>>> the value to ~ Long.MIN_VALUE, which causes the pipeline to stop emitting >>>> records completely from the source. Furthermore, the logs originating from >>>> from >>>> https://github.com/dawidwys/flink/blob/888162083fbb5f62f809e5270ad968db58cc9c5b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L174 >>>> also indicate that the `maxAllowedWatermark` switches to ~ Long.MIN_VALUE. >>>> >>>> We found that modifying the `updateInterval` passed into the alignment >>>> parameters seemed to correlate with how long the pipeline would operate >>>> before stopping - a larger interval of 20 minutes would encounter the issue >>>> later than an interval of 1 second. >>>> >>>> We are wondering if a bug exists when using both `withIdleness` and >>>> `withWatermarkAlignment`. Might it be related to >>>> https://issues.apache.org/jira/browse/FLINK-28975, or is there >>>> possibly a race condition in the watermark emission? We do not necessarily >>>> need to have both configured at the same time, but we were also surprised >>>> by the behaviour of the application. Has anyone run into a similar issue or >>>> have further insight? >>>> >>>> Much Appreciated, >>>> - Reem >>>> >>>> >>>> >>>>