Hi Alexis In some watermark generators such as BoundedOutOfOrderTimestamps, the timestamp of watermark will be reset to Long.MIN_VALUE if the subtask is restarted and no event from source is processed.
Best, Shammon FY On Tue, Mar 14, 2023 at 4:58 PM Alexis Sarda-Espinosa < sarda.espin...@gmail.com> wrote: > Hi David, thanks for the answer. One follow-up question: will the > watermark be reset to Long.MIN_VALUE every time I restart a job with > savepoint? > > Am Di., 14. März 2023 um 05:08 Uhr schrieb David Anderson < > dander...@apache.org>: > >> Watermarks always follow the corresponding event(s). I'm not sure why >> they were designed that way, but that is how they are implemented. >> Windows maintain this contract by emitting all of their results before >> forwarding the watermark that triggered the results. >> >> David >> >> On Mon, Mar 13, 2023 at 5:28 PM Shammon FY <zjur...@gmail.com> wrote: >> > >> > Hi Alexis >> > >> > Do you use both event-time watermark generator and TimerService for >> processing time in your job? Maybe you can try using event-time watermark >> first. >> > >> > Best, >> > Shammon.FY >> > >> > On Sat, Mar 11, 2023 at 7:47 AM Alexis Sarda-Espinosa < >> sarda.espin...@gmail.com> wrote: >> >> >> >> Hello, >> >> >> >> I recently ran into a weird issue with a streaming job in Flink >> 1.16.1. One of my functions (KeyedProcessFunction) has been using >> processing time timers. I now want to execute the same job based on a >> historical data dump, so I had to adjust the logic to use event time timers >> in that case (and did not use BATCH execution mode). Since my data has a >> timestamp field, I implemented a custom WatermarkGenerator that always >> emits a watermark with that timestamp in the onEvent callback, and does >> nothing in the onPeriodicEmit callback. >> >> >> >> My problem is that, sometimes, the very first time my function calls >> TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes >> some false triggers when the first watermark actually arrives. >> >> >> >> I would have expected that, if WatermarkGenerator.onEvent emits a >> watermark, it would be sent before the corresponding event, but maybe this >> is not always the case? >> >> >> >> In case it's relevant, a brief overview of my job's topology: >> >> >> >> Source1 -> Broadcast >> >> >> >> Source2 -> >> >> keyBy -> >> >> connect(Broadcast) -> >> >> process -> >> >> filter -> >> >> assignTimestampsAndWatermarks -> // newly added for historical data >> >> keyBy -> >> >> process // function that uses timers >> >> >> >> Regards, >> >> Alexis. >> >