Hi Matthias, I think I understand the implications of idleness. In my case I really do need it since even in the production environment one of the Kafka topics will receive messages only sporadically.
With regards to the code, I have very limited understanding of Flink internals, but that part I linked seems to indicate that, if a stream is idle, the log should indicate a hard-coded maxAllowedWatermark equal to Long.MAX_VALUE, that's why I thought the source isn't really considered as idle. Regards, Alexis. Am Di., 6. Feb. 2024 um 11:46 Uhr schrieb Schwalbe Matthias < matthias.schwa...@viseca.ch>: > Hi Alexis, > > > > Yes, I guess so, while not utterly acquainted with that part of the code. > > Apparently the SourceCoordinator cannot come up with a proper watermark > time, if watermarking is turned off (idle mode of stream), and then it > deducts watermark time from the remaining non-idle sources. > > It’s consistent with how idling-state of data streams is designed. > > However it still remains the notion of that one needs to compensate for > .withIdleness(…) if correctness is any consideration. > > Using .withIdleness(…) is IMHO only justified in rare cases where > implications are fully understood. > > > > If a source is not configured with .withIdleness(…) and becomes factually > idle, all window aggregations or stateful stream joins stall until that > source becomes active again (= added latency) > > > > Thias > > > > *From:* Alexis Sarda-Espinosa <sarda.espin...@gmail.com> > *Sent:* Tuesday, February 6, 2024 9:48 AM > *To:* Schwalbe Matthias <matthias.schwa...@viseca.ch> > *Cc:* user <user@flink.apache.org> > *Subject:* Re: Idleness not working if watermark alignment is used > > > > ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠ > > > > Hi Matthias, > > > > thanks for looking at this. Would you then say this comment in the source > code is not really valid? > > > https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L181 > > > > That's where the log I was looking at is created. > > > > Regards, > > Alexis. > > > > Am Di., 6. Feb. 2024 um 08:54 Uhr schrieb Schwalbe Matthias < > matthias.schwa...@viseca.ch>: > > Good morning Alexis, > > > > withIdleness(…) is easily misunderstood, it actually means that the thus > configured stream is exempt from watermark processing after 5 seconds (in > your case). > > Hence also watermark alignment is turned off for the stream until a new > event arrives. > > > > .withIdleness(…) is good for situations where you prefer low latency over > correctness (causality with respect to time order). > > Downstream operators can choose a manual implementation of watermark > behavior in order to compensate for the missing watermarks. > > > > IMHO, because I see so many people make the same mistake I would rather > rename .withIdleness(…) to something like .idleWatermarkExcemption(…) to > make it more obvious. > > > > Hope this helps > > > > > > Thias > > > > > > > > *From:* Alexis Sarda-Espinosa <sarda.espin...@gmail.com> > *Sent:* Monday, February 5, 2024 6:04 PM > *To:* user <user@flink.apache.org> > *Subject:* Re: Idleness not working if watermark alignment is used > > > > Ah and I forgot to mention, this is with Flink 1.18.1 > > > > Am Mo., 5. Feb. 2024 um 18:00 Uhr schrieb Alexis Sarda-Espinosa < > sarda.espin...@gmail.com>: > > Hello, > > > > I have 2 Kafka sources that are configured with a watermark strategy > instantiated like this: > > > > WatermarkStrategy.<T>forBoundedOutOfOrderness(maxAllowedWatermarkDrift) > .withIdleness(idleTimeout) // 5 seconds currently > .withWatermarkAlignment(alignmentGroup, > maxAllowedWatermarkDrift, Duration.ofSeconds(1L)) > > > > The alignment group is the same for both, but each one consumes from a > different topic. During a test, I ensured that one of the topics didn't > receive any messages, but when I check the logs I see multiple entries like > this: > > > > Distributing maxAllowedWatermark=1707149933770 of group=dispatcher to > subTaskIds=[0] for source Source: GenericChangeMessageDeserializer. > > > > where maxAllowedWatermark grows all the time. > > > > Maybe my understanding is wrong, but I think this means the source is > never marked as idle even though it didn't receive any new messages in the > Kafka topic? > > > > Regards, > > Alexis. > > > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >