Hi Alexis

In some watermark generators such as BoundedOutOfOrderTimestamps,
the timestamp of watermark will be reset to Long.MIN_VALUE if the subtask
is restarted and no event from source is processed.

Best,
Shammon FY

On Tue, Mar 14, 2023 at 4:58 PM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hi David, thanks for the answer. One follow-up question: will the
> watermark be reset to Long.MIN_VALUE every time I restart a job with
> savepoint?
>
> Am Di., 14. März 2023 um 05:08 Uhr schrieb David Anderson <
> dander...@apache.org>:
>
>> Watermarks always follow the corresponding event(s). I'm not sure why
>> they were designed that way, but that is how they are implemented.
>> Windows maintain this contract by emitting all of their results before
>> forwarding the watermark that triggered the results.
>>
>> David
>>
>> On Mon, Mar 13, 2023 at 5:28 PM Shammon FY <zjur...@gmail.com> wrote:
>> >
>> > Hi Alexis
>> >
>> > Do you use both event-time watermark generator and TimerService for
>> processing time in your job? Maybe you can try using event-time watermark
>> first.
>> >
>> > Best,
>> > Shammon.FY
>> >
>> > On Sat, Mar 11, 2023 at 7:47 AM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>> >>
>> >> Hello,
>> >>
>> >> I recently ran into a weird issue with a streaming job in Flink
>> 1.16.1. One of my functions (KeyedProcessFunction) has been using
>> processing time timers. I now want to execute the same job based on a
>> historical data dump, so I had to adjust the logic to use event time timers
>> in that case (and did not use BATCH execution mode). Since my data has a
>> timestamp field, I implemented a custom WatermarkGenerator that always
>> emits a watermark with that timestamp in the onEvent callback, and does
>> nothing in the onPeriodicEmit callback.
>> >>
>> >> My problem is that, sometimes, the very first time my function calls
>> TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes
>> some false triggers when the first watermark actually arrives.
>> >>
>> >> I would have expected that, if WatermarkGenerator.onEvent emits a
>> watermark, it would be sent before the corresponding event, but maybe this
>> is not always the case?
>> >>
>> >> In case it's relevant, a brief overview of my job's topology:
>> >>
>> >> Source1 -> Broadcast
>> >>
>> >> Source2 ->
>> >>   keyBy ->
>> >>   connect(Broadcast) ->
>> >>   process ->
>> >>   filter ->
>> >>   assignTimestampsAndWatermarks -> // newly added for historical data
>> >>   keyBy ->
>> >>   process // function that uses timers
>> >>
>> >> Regards,
>> >> Alexis.
>>
>

Reply via email to