Hi Shammon, thanks for the info. I was hoping the savepoint would include the watermark, but I'm not sure that would make sense in every scenario.
Regards, Alexis. Am Di., 14. März 2023 um 12:59 Uhr schrieb Shammon FY <zjur...@gmail.com>: > Hi Alexis > > In some watermark generators such as BoundedOutOfOrderTimestamps, > the timestamp of watermark will be reset to Long.MIN_VALUE if the subtask > is restarted and no event from source is processed. > > Best, > Shammon FY > > On Tue, Mar 14, 2023 at 4:58 PM Alexis Sarda-Espinosa < > sarda.espin...@gmail.com> wrote: > >> Hi David, thanks for the answer. One follow-up question: will the >> watermark be reset to Long.MIN_VALUE every time I restart a job with >> savepoint? >> >> Am Di., 14. März 2023 um 05:08 Uhr schrieb David Anderson < >> dander...@apache.org>: >> >>> Watermarks always follow the corresponding event(s). I'm not sure why >>> they were designed that way, but that is how they are implemented. >>> Windows maintain this contract by emitting all of their results before >>> forwarding the watermark that triggered the results. >>> >>> David >>> >>> On Mon, Mar 13, 2023 at 5:28 PM Shammon FY <zjur...@gmail.com> wrote: >>> > >>> > Hi Alexis >>> > >>> > Do you use both event-time watermark generator and TimerService for >>> processing time in your job? Maybe you can try using event-time watermark >>> first. >>> > >>> > Best, >>> > Shammon.FY >>> > >>> > On Sat, Mar 11, 2023 at 7:47 AM Alexis Sarda-Espinosa < >>> sarda.espin...@gmail.com> wrote: >>> >> >>> >> Hello, >>> >> >>> >> I recently ran into a weird issue with a streaming job in Flink >>> 1.16.1. One of my functions (KeyedProcessFunction) has been using >>> processing time timers. I now want to execute the same job based on a >>> historical data dump, so I had to adjust the logic to use event time timers >>> in that case (and did not use BATCH execution mode). Since my data has a >>> timestamp field, I implemented a custom WatermarkGenerator that always >>> emits a watermark with that timestamp in the onEvent callback, and does >>> nothing in the onPeriodicEmit callback. >>> >> >>> >> My problem is that, sometimes, the very first time my function calls >>> TimerService.currentWatermark, the value is Long.MIN_VALUE, which causes >>> some false triggers when the first watermark actually arrives. >>> >> >>> >> I would have expected that, if WatermarkGenerator.onEvent emits a >>> watermark, it would be sent before the corresponding event, but maybe this >>> is not always the case? >>> >> >>> >> In case it's relevant, a brief overview of my job's topology: >>> >> >>> >> Source1 -> Broadcast >>> >> >>> >> Source2 -> >>> >> keyBy -> >>> >> connect(Broadcast) -> >>> >> process -> >>> >> filter -> >>> >> assignTimestampsAndWatermarks -> // newly added for historical data >>> >> keyBy -> >>> >> process // function that uses timers >>> >> >>> >> Regards, >>> >> Alexis. >>> >>