Hi community,

I have implemented a join function using CoProcessFunction with
CheckpointedFunction to recover from failures. I added some debug lines to
check if it is restoring and it does. Before the crash, I process events
that fall at processElement2. I create snapshots at snapshotState(), the
application comes back and restores the events. That is fine.

After the restore, I process events that fall on processElement1. I
register event timers for them as I did on processElement2 before the
crash. But the onTimer() is never called. The point is that I don't have
any events to send to processElement2() to make the CoProcessFunction
register a time for them. They were sent before the crash.

I suppose that the onTimer() is called only when there are
"timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and
processElement2. Because when I test the same application without crashing
and the CoProcessFunction triggers the onTimer() method.

But if I have a crash in the middle the CoProcessFunction does not call
onTimer(). Why is that? Is that normal? What do I have to do to make the
CoProcessFunction trigger the onTime() method even if only one stream is
processed let's say at the processElement2() method and the other stream is
restored from a snapshot? I imagine that I have to register a time during
the recovery (initializeState()). But how?

thanks,
Felipe

Reply via email to