Hi Matthias,

thanks for looking at this. Would you then say this comment in the source
code is not really valid?
https://github.com/apache/flink/blob/release-1.18/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L181

That's where the log I was looking at is created.

Regards,
Alexis.

Am Di., 6. Feb. 2024 um 08:54 Uhr schrieb Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Good morning Alexis,
>
>
>
> withIdleness(…) is easily misunderstood, it actually means that the thus
> configured stream is exempt from watermark processing after 5 seconds (in
> your case).
>
> Hence also watermark alignment is turned off for the stream until a new
> event arrives.
>
>
>
> .withIdleness(…) is good for situations where you prefer low latency over
> correctness (causality with respect to time order).
>
> Downstream operators can choose a manual implementation of watermark
> behavior in order to compensate for the missing watermarks.
>
>
>
> IMHO, because I see so many people make the same mistake I would rather
> rename .withIdleness(…) to something like .idleWatermarkExcemption(…) to
> make it more obvious.
>
>
>
> Hope this helps
>
>
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa <sarda.espin...@gmail.com>
> *Sent:* Monday, February 5, 2024 6:04 PM
> *To:* user <user@flink.apache.org>
> *Subject:* Re: Idleness not working if watermark alignment is used
>
>
>
> Ah and I forgot to mention, this is with Flink 1.18.1
>
>
>
> Am Mo., 5. Feb. 2024 um 18:00 Uhr schrieb Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com>:
>
> Hello,
>
>
>
> I have 2 Kafka sources that are configured with a watermark strategy
> instantiated like this:
>
>
>
> WatermarkStrategy.<T>forBoundedOutOfOrderness(maxAllowedWatermarkDrift)
>             .withIdleness(idleTimeout) // 5 seconds currently
>             .withWatermarkAlignment(alignmentGroup,
> maxAllowedWatermarkDrift, Duration.ofSeconds(1L))
>
>
>
> The alignment group is the same for both, but each one consumes from a
> different topic. During a test, I ensured that one of the topics didn't
> receive any messages, but when I check the logs I see multiple entries like
> this:
>
>
>
> Distributing maxAllowedWatermark=1707149933770 of group=dispatcher to
> subTaskIds=[0] for source Source: GenericChangeMessageDeserializer.
>
>
>
> where maxAllowedWatermark grows all the time.
>
>
>
> Maybe my understanding is wrong, but I think this means the source is
> never marked as idle even though it didn't receive any new messages in the
> Kafka topic?
>
>
>
> Regards,
>
> Alexis.
>
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to