Hi community,

I don't understand why that KeyedProcessFunction.onTimer() is implemented
here [1] is different from here [2]. Both are KeyedProcessFunction and they
aim to fire a window on event time. At [1] the events are emitted at if
(timestamp == result.lastModified + 60000) and the time is scheduled from
the ctx.timestamp().

public void processElement()..... {
current.lastModified = ctx.timestamp();
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
public void onTimer().... {
        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();
        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }

At [2] there is no comparison of time on the onTimer() method. Plus the
events are scheduled using a formula (eventTime - (eventTime %
durationMsec) + durationMsec - 1) and only if they are not late of the
watermark (eventTime <= timerService.currentWatermark()).

public void processElement()..... {
if (eventTime <= timerService.currentWatermark()) {
        // This event is late; its window has already been triggered.
    } else {
        // Round up eventTime to the end of the window containing this
event.
        long endOfWindow = (eventTime - (eventTime % durationMsec) +
durationMsec - 1);
}
public void onTimer().... {
Float sumOfTips = this.sumOfTips.get(timestamp);


My use case uses a CoProcessFunction and I am saving the states
on ListState. It works fine with the approach [1]. When I used the approach
[2] some of the events are late because of the watermark.

What is the correct to be used? Or what is the best?

Afterwards I have to make this function fault tolerant. So, my next
question is. Do I have to implement CheckpointedFunction and
CheckpointedRestoring. Or because I am using ListState it already recovers
from failures?

Thanks,
Felipe


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/#example
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/event_driven/#the-ontimer-method

*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez*

Reply via email to