Hi community,
I don't understand why that KeyedProcessFunction.onTimer() is implemented
here [1] is different from here [2]. Both are KeyedProcessFunction and they
aim to fire a window on event time. At [1] the events are emitted at if
(timestamp == result.lastModified + 60000) and the time is scheduled from
the ctx.timestamp().
public void processElement()..... {
current.lastModified = ctx.timestamp();
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
public void onTimer().... {
// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();
// check if this is an outdated timer or the latest timer
if (timestamp == result.lastModified + 60000) {
// emit the state on timeout
out.collect(new Tuple2<String, Long>(result.key, result.count));
}
At [2] there is no comparison of time on the onTimer() method. Plus the
events are scheduled using a formula (eventTime - (eventTime %
durationMsec) + durationMsec - 1) and only if they are not late of the
watermark (eventTime <= timerService.currentWatermark()).
public void processElement()..... {
if (eventTime <= timerService.currentWatermark()) {
// This event is late; its window has already been triggered.
} else {
// Round up eventTime to the end of the window containing this
event.
long endOfWindow = (eventTime - (eventTime % durationMsec) +
durationMsec - 1);
}
public void onTimer().... {
Float sumOfTips = this.sumOfTips.get(timestamp);
My use case uses a CoProcessFunction and I am saving the states
on ListState. It works fine with the approach [1]. When I used the approach
[2] some of the events are late because of the watermark.
What is the correct to be used? Or what is the best?
Afterwards I have to make this function fault tolerant. So, my next
question is. Do I have to implement CheckpointedFunction and
CheckpointedRestoring. Or because I am using ListState it already recovers
from failures?
Thanks,
Felipe
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/#example
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/event_driven/#the-ontimer-method
*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez*