Hi community, I don't understand why that KeyedProcessFunction.onTimer() is implemented here [1] is different from here [2]. Both are KeyedProcessFunction and they aim to fire a window on event time. At [1] the events are emitted at if (timestamp == result.lastModified + 60000) and the time is scheduled from the ctx.timestamp().
public void processElement()..... { current.lastModified = ctx.timestamp(); ctx.timerService().registerEventTimeTimer(current.lastModified + 60000); } public void onTimer().... { // get the state for the key that scheduled the timer CountWithTimestamp result = state.value(); // check if this is an outdated timer or the latest timer if (timestamp == result.lastModified + 60000) { // emit the state on timeout out.collect(new Tuple2<String, Long>(result.key, result.count)); } At [2] there is no comparison of time on the onTimer() method. Plus the events are scheduled using a formula (eventTime - (eventTime % durationMsec) + durationMsec - 1) and only if they are not late of the watermark (eventTime <= timerService.currentWatermark()). public void processElement()..... { if (eventTime <= timerService.currentWatermark()) { // This event is late; its window has already been triggered. } else { // Round up eventTime to the end of the window containing this event. long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1); } public void onTimer().... { Float sumOfTips = this.sumOfTips.get(timestamp); My use case uses a CoProcessFunction and I am saving the states on ListState. It works fine with the approach [1]. When I used the approach [2] some of the events are late because of the watermark. What is the correct to be used? Or what is the best? Afterwards I have to make this function fault tolerant. So, my next question is. Do I have to implement CheckpointedFunction and CheckpointedRestoring. Or because I am using ListState it already recovers from failures? Thanks, Felipe [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/#example [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/event_driven/#the-ontimer-method *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez*