Hi, I have a problem on my stream pipeline where the events on a CoGroupFunction are not restored after the application crashes. The application is like this:
stream01.coGroup(stream02) .where(...).equalTo(...) .window(TumblingEventTimeWindows.of(1 minute)) .apply(new MyCoGroupFunction()) .process(new MyProcessFunction()) .sink(new MySinkFunction) The checkpoint is configured to 20 seconds and the window is of 1 minute. I follow this sequence to reproduce the error: 1 - send 6 events to stream01 2 - after 25 seconds I send an event to make the application crash 3 - at this meantime the application recovers 4 - after 25 seconds I send 6 events to stream02 Then, in the MyCoGroupFunction there are only events of stream02. Is this the case where I have to use RichCoGroupFunction and save the state by implementing the CheckpointedFunction? I am confused because the CoGroupFunction.coGroup() method is called only when the Window closes and then I see the output stream events of this operator. That is when the Collector.collect() is called. What I think is that the events are held in memory and when the window closes the CoGroupFunction.coGroup() is called. So I have to snapshot the state in an operator before the CoGroupFunction. Is that correct? In case anyone have a toy example of it (CoGroupFunction with Checkpoint and testing it in a unit test) could you please send me the link? Thanks, Felipe