Hi Shammon, thanks for the info. I was hoping the savepoint would include
the watermark, but I'm not sure that would make sense in every scenario.

Regards,
Alexis.

Am Di., 14. März 2023 um 12:59 Uhr schrieb Shammon FY <zjur...@gmail.com>:

> Hi Alexis
>
> In some watermark generators such as BoundedOutOfOrderTimestamps,
> the timestamp of watermark will be reset to Long.MIN_VALUE if the subtask
> is restarted and no event from source is processed.
>
> Best,
> Shammon FY
>
> On Tue, Mar 14, 2023 at 4:58 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hi David, thanks for the answer. One follow-up question: will the
>> watermark be reset to Long.MIN_VALUE every time I restart a job with
>> savepoint?
>>
>> Am Di., 14. März 2023 um 05:08 Uhr schrieb David Anderson <
>> dander...@apache.org>:
>>
>>> Watermarks always follow the corresponding event(s). I'm not sure why
>>> they were designed that way, but that is how they are implemented.
>>> Windows maintain this contract by emitting all of their results before
>>> forwarding the watermark that triggered the results.
>>>
>>> David
>>>
>>> On Mon, Mar 13, 2023 at 5:28 PM Shammon FY <zjur...@gmail.com> wrote:
>>> >
>>> > Hi Alexis
>>> >
>>> > Do you use both event-time watermark generator and TimerService for
>>> processing time in your job? Maybe you can try using event-time watermark
>>> first.
>>> >
>>> > Best,
>>> > Shammon.FY
>>> >
>>> > On Sat, Mar 11, 2023 at 7:47 AM Alexis Sarda-Espinosa <
>>> sarda.espin...@gmail.com> wrote:
>>> >>
>>> >> Hello,
>>> >>
>>> >> I recently ran into a weird issue with a streaming job in Flink
>>> 1.16.1. One of my functions (KeyedProcessFunction) has been using
>>> processing time timers. I now want to execute the same job based on a
>>> historical data dump, so I had to adjust the logic to use event time timers
>>> in that case (and did not use BATCH execution mode). Since my data has a
>>> timestamp field, I implemented a custom WatermarkGenerator that always
>>> emits a watermark with that timestamp in the onEvent callback, and does
>>> nothing in the onPeriodicEmit callback.
>>> >>
>>> >> My problem is that, sometimes, the very first time my function calls
>>> TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes
>>> some false triggers when the first watermark actually arrives.
>>> >>
>>> >> I would have expected that, if WatermarkGenerator.onEvent emits a
>>> watermark, it would be sent before the corresponding event, but maybe this
>>> is not always the case?
>>> >>
>>> >> In case it's relevant, a brief overview of my job's topology:
>>> >>
>>> >> Source1 -> Broadcast
>>> >>
>>> >> Source2 ->
>>> >>   keyBy ->
>>> >>   connect(Broadcast) ->
>>> >>   process ->
>>> >>   filter ->
>>> >>   assignTimestampsAndWatermarks -> // newly added for historical data
>>> >>   keyBy ->
>>> >>   process // function that uses timers
>>> >>
>>> >> Regards,
>>> >> Alexis.
>>>
>>

Reply via email to