Hi Alexis

Currently I think checkpoint and savepoint will not save watermarks. I
think how to deal with watermarks at checkpoint/savepoint is a good
question, we can discuss this in dev mail list

Best,
Shammon FY


On Wed, Mar 15, 2023 at 4:22 PM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hi Shammon, thanks for the info. I was hoping the savepoint would include
> the watermark, but I'm not sure that would make sense in every scenario.
>
> Regards,
> Alexis.
>
> Am Di., 14. März 2023 um 12:59 Uhr schrieb Shammon FY <zjur...@gmail.com>:
>
>> Hi Alexis
>>
>> In some watermark generators such as BoundedOutOfOrderTimestamps,
>> the timestamp of watermark will be reset to Long.MIN_VALUE if the subtask
>> is restarted and no event from source is processed.
>>
>> Best,
>> Shammon FY
>>
>> On Tue, Mar 14, 2023 at 4:58 PM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hi David, thanks for the answer. One follow-up question: will the
>>> watermark be reset to Long.MIN_VALUE every time I restart a job with
>>> savepoint?
>>>
>>> Am Di., 14. März 2023 um 05:08 Uhr schrieb David Anderson <
>>> dander...@apache.org>:
>>>
>>>> Watermarks always follow the corresponding event(s). I'm not sure why
>>>> they were designed that way, but that is how they are implemented.
>>>> Windows maintain this contract by emitting all of their results before
>>>> forwarding the watermark that triggered the results.
>>>>
>>>> David
>>>>
>>>> On Mon, Mar 13, 2023 at 5:28 PM Shammon FY <zjur...@gmail.com> wrote:
>>>> >
>>>> > Hi Alexis
>>>> >
>>>> > Do you use both event-time watermark generator and TimerService for
>>>> processing time in your job? Maybe you can try using event-time watermark
>>>> first.
>>>> >
>>>> > Best,
>>>> > Shammon.FY
>>>> >
>>>> > On Sat, Mar 11, 2023 at 7:47 AM Alexis Sarda-Espinosa <
>>>> sarda.espin...@gmail.com> wrote:
>>>> >>
>>>> >> Hello,
>>>> >>
>>>> >> I recently ran into a weird issue with a streaming job in Flink
>>>> 1.16.1. One of my functions (KeyedProcessFunction) has been using
>>>> processing time timers. I now want to execute the same job based on a
>>>> historical data dump, so I had to adjust the logic to use event time timers
>>>> in that case (and did not use BATCH execution mode). Since my data has a
>>>> timestamp field, I implemented a custom WatermarkGenerator that always
>>>> emits a watermark with that timestamp in the onEvent callback, and does
>>>> nothing in the onPeriodicEmit callback.
>>>> >>
>>>> >> My problem is that, sometimes, the very first time my function calls
>>>> TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes
>>>> some false triggers when the first watermark actually arrives.
>>>> >>
>>>> >> I would have expected that, if WatermarkGenerator.onEvent emits a
>>>> watermark, it would be sent before the corresponding event, but maybe this
>>>> is not always the case?
>>>> >>
>>>> >> In case it's relevant, a brief overview of my job's topology:
>>>> >>
>>>> >> Source1 -> Broadcast
>>>> >>
>>>> >> Source2 ->
>>>> >>   keyBy ->
>>>> >>   connect(Broadcast) ->
>>>> >>   process ->
>>>> >>   filter ->
>>>> >>   assignTimestampsAndWatermarks -> // newly added for historical data
>>>> >>   keyBy ->
>>>> >>   process // function that uses timers
>>>> >>
>>>> >> Regards,
>>>> >> Alexis.
>>>>
>>>

Reply via email to