Hi Matthias, thanks for looking at this. Would you then say this comment in the source code is not really valid? https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L181
That's where the log I was looking at is created. Regards, Alexis. Am Di., 6. Feb. 2024 um 08:54 Uhr schrieb Schwalbe Matthias < matthias.schwa...@viseca.ch>: > Good morning Alexis, > > > > withIdleness(…) is easily misunderstood, it actually means that the thus > configured stream is exempt from watermark processing after 5 seconds (in > your case). > > Hence also watermark alignment is turned off for the stream until a new > event arrives. > > > > .withIdleness(…) is good for situations where you prefer low latency over > correctness (causality with respect to time order). > > Downstream operators can choose a manual implementation of watermark > behavior in order to compensate for the missing watermarks. > > > > IMHO, because I see so many people make the same mistake I would rather > rename .withIdleness(…) to something like .idleWatermarkExcemption(…) to > make it more obvious. > > > > Hope this helps > > > > > > Thias > > > > > > > > *From:* Alexis Sarda-Espinosa <sarda.espin...@gmail.com> > *Sent:* Monday, February 5, 2024 6:04 PM > *To:* user <user@flink.apache.org> > *Subject:* Re: Idleness not working if watermark alignment is used > > > > Ah and I forgot to mention, this is with Flink 1.18.1 > > > > Am Mo., 5. Feb. 2024 um 18:00 Uhr schrieb Alexis Sarda-Espinosa < > sarda.espin...@gmail.com>: > > Hello, > > > > I have 2 Kafka sources that are configured with a watermark strategy > instantiated like this: > > > > WatermarkStrategy.<T>forBoundedOutOfOrderness(maxAllowedWatermarkDrift) > .withIdleness(idleTimeout) // 5 seconds currently > .withWatermarkAlignment(alignmentGroup, > maxAllowedWatermarkDrift, Duration.ofSeconds(1L)) > > > > The alignment group is the same for both, but each one consumes from a > different topic. During a test, I ensured that one of the topics didn't > receive any messages, but when I check the logs I see multiple entries like > this: > > > > Distributing maxAllowedWatermark=1707149933770 of group=dispatcher to > subTaskIds=[0] for source Source: GenericChangeMessageDeserializer. > > > > where maxAllowedWatermark grows all the time. > > > > Maybe my understanding is wrong, but I think this means the source is > never marked as idle even though it didn't receive any new messages in the > Kafka topic? > > > > Regards, > > Alexis. > > > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >