Hi community, I have implemented a join function using CoProcessFunction with CheckpointedFunction to recover from failures. I added some debug lines to check if it is restoring and it does. Before the crash, I process events that fall at processElement2. I create snapshots at snapshotState(), the application comes back and restores the events. That is fine.
After the restore, I process events that fall on processElement1. I register event timers for them as I did on processElement2 before the crash. But the onTimer() is never called. The point is that I don't have any events to send to processElement2() to make the CoProcessFunction register a time for them. They were sent before the crash. I suppose that the onTimer() is called only when there are "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and processElement2. Because when I test the same application without crashing and the CoProcessFunction triggers the onTimer() method. But if I have a crash in the middle the CoProcessFunction does not call onTimer(). Why is that? Is that normal? What do I have to do to make the CoProcessFunction trigger the onTime() method even if only one stream is processed let's say at the processElement2() method and the other stream is restored from a snapshot? I imagine that I have to register a time during the recovery (initializeState()). But how? thanks, Felipe