Hi Matthias,

I think I understand the implications of idleness. In my case I really do
need it since even in the production environment one of the Kafka topics
will receive messages only sporadically.

With regards to the code, I have very limited understanding of Flink
internals, but that part I linked seems to indicate that, if a stream is
idle, the log should indicate a hard-coded maxAllowedWatermark equal to
Long.MAX_VALUE, that's why I thought the source isn't really considered as
idle.

Regards,
Alexis.

Am Di., 6. Feb. 2024 um 11:46 Uhr schrieb Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Hi Alexis,
>
>
>
> Yes, I guess so, while not utterly acquainted with that part of the code.
>
> Apparently the SourceCoordinator cannot come up with a proper watermark
> time, if watermarking is turned off (idle mode of stream), and then it
> deducts watermark time from the remaining non-idle sources.
>
> It’s consistent with how idling-state of data streams is designed.
>
> However it still remains the notion of that one needs to compensate for
> .withIdleness(…) if correctness is any consideration.
>
> Using .withIdleness(…) is IMHO only justified in rare cases where
> implications are fully understood.
>
>
>
> If a source is not configured with .withIdleness(…) and becomes factually
> idle, all window aggregations or stateful stream joins stall until that
> source becomes active again (= added latency)
>
>
>
> Thias
>
>
>
> *From:* Alexis Sarda-Espinosa <sarda.espin...@gmail.com>
> *Sent:* Tuesday, February 6, 2024 9:48 AM
> *To:* Schwalbe Matthias <matthias.schwa...@viseca.ch>
> *Cc:* user <user@flink.apache.org>
> *Subject:* Re: Idleness not working if watermark alignment is used
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Matthias,
>
>
>
> thanks for looking at this. Would you then say this comment in the source
> code is not really valid?
>
>
> https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L181
>
>
>
> That's where the log I was looking at is created.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> Am Di., 6. Feb. 2024 um 08:54 Uhr schrieb Schwalbe Matthias <
> matthias.schwa...@viseca.ch>:
>
> Good morning Alexis,
>
>
>
> withIdleness(…) is easily misunderstood, it actually means that the thus
> configured stream is exempt from watermark processing after 5 seconds (in
> your case).
>
> Hence also watermark alignment is turned off for the stream until a new
> event arrives.
>
>
>
> .withIdleness(…) is good for situations where you prefer low latency over
> correctness (causality with respect to time order).
>
> Downstream operators can choose a manual implementation of watermark
> behavior in order to compensate for the missing watermarks.
>
>
>
> IMHO, because I see so many people make the same mistake I would rather
> rename .withIdleness(…) to something like .idleWatermarkExcemption(…) to
> make it more obvious.
>
>
>
> Hope this helps
>
>
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa <sarda.espin...@gmail.com>
> *Sent:* Monday, February 5, 2024 6:04 PM
> *To:* user <user@flink.apache.org>
> *Subject:* Re: Idleness not working if watermark alignment is used
>
>
>
> Ah and I forgot to mention, this is with Flink 1.18.1
>
>
>
> Am Mo., 5. Feb. 2024 um 18:00 Uhr schrieb Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com>:
>
> Hello,
>
>
>
> I have 2 Kafka sources that are configured with a watermark strategy
> instantiated like this:
>
>
>
> WatermarkStrategy.<T>forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
>             .withIdleness(idleTimeout) // 5 seconds currently
>             .withWatermarkAlignment(alignmentGroup,
> maxAllowedWatermarkDrift, Duration.ofSeconds(1L))
>
>
>
> The alignment group is the same for both, but each one consumes from a
> different topic. During a test, I ensured that one of the topics didn't
> receive any messages, but when I check the logs I see multiple entries like
> this:
>
>
>
> Distributing maxAllowedWatermark=1707149933770 of group=dispatcher to
> subTaskIds=[0] for source Source: GenericChangeMessageDeserializer.
>
>
>
> where maxAllowedWatermark grows all the time.
>
>
>
> Maybe my understanding is wrong, but I think this means the source is
> never marked as idle even though it didn't receive any new messages in the
> Kafka topic?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to