Hi Sweta,

It is actually a sound idea to implement a dedicated process function for this 
purpose, as David suggests.
Especially if you are in a situation where waiting for a valid natural 
watermark after a restore from savepoint is not sufficient.

We had a situation with input streams of different update frequencies (one only 
updated once a day, and hence only generated watermarks once a day).

This is how you can approach the specific task of

  *   watermark storing:
     *   Create a process function
     *   Create a map that stores the latest watermark per sub-partition (i.e. 
there are 128 sub-partitions in a job with max-parallelism of 128)
     *   Store this map into operator state with each checkpoint
     *   Create a repeating processing time timer (with high frequency 
according to your needs), in order to yield a watermark after savepoint restore
  *   watermark restoring:
     *   when restoring from operator state (because there might have been a 
change in parallelism):
     *   determine the lowest watermark among all sub-partition that belong to 
the respective subtask (on operator state restore)
     *   yield this watermark in processing time handler of above timer (once)

Feel free to ask details, I hope this helps … I need to ask my folks whether I 
can share our implementation (20 lines of code, odd).

What do you think?

Thias


From: David Anderson <dander...@apache.org>
Sent: Thursday, June 9, 2022 11:35 AM
To: User-Flink <user@flink.apache.org>
Subject: Re: Recover watermark from savepoint

Sweta,

Flink does not include watermarks in savepoints, nor are they included in 
aligned checkpoints. For what it's worth, I believe that with unaligned 
checkpoints in-flight watermarks are included in checkpoints, but I don't 
believe that would solve the problem, since the watermark strategy's state is 
still lost during a restart.

I can't think of any way to guarantee that all possibly late events will be 
deterministically identified as late. The commonly used 
bounded-out-of-orderness watermark strategy doesn't guarantee this either, even 
without a restart (because watermarks are delayed by the auto watermark 
interval, rather than being produced at every conceivable opportunity).

If this is a strong requirement, you could decide not to rely on watermarks for 
dropping late events, and implement the logic yourself in a process function.

Best,
David

On Wed, Jun 8, 2022 at 6:10 PM Sweta Kalakuntla 
<skalakun...@bandwidth.com<mailto:skalakun...@bandwidth.com>> wrote:
Hi,

I want to understand if flink saves a watermark during savepoint and if not, 
how do we achieve this?

We are seeing an issue where on recovery, the job processes some late events 
which should have been discarded if the job were to be running without any 
downtime.

Thank you,
Sweta
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to