Hi Felipe, Which data source are you using?
> Then, in the MyCoGroupFunction there are only events of stream02 Are you storing events in your state? > Is this the case where I have to use RichCoGroupFunction and save the state by implementing the CheckpointedFunction? If you want your state to be persisted with each checkpoint, and recovered after a failure, ye . On Tue, Jun 15, 2021 at 6:18 PM Felipe Gutierrez < felipe.o.gutier...@gmail.com> wrote: > Hi, > > I have a problem on my stream pipeline where the events on a > CoGroupFunction are not restored after the application crashes. The > application is like this: > > stream01.coGroup(stream02) > .where(...).equalTo(...) > .window(TumblingEventTimeWindows.of(1 minute)) > .apply(new MyCoGroupFunction()) > .process(new MyProcessFunction()) > .sink(new MySinkFunction) > > The checkpoint is configured to 20 seconds and the window is of 1 minute. > I follow this sequence to reproduce the error: > 1 - send 6 events to stream01 > 2 - after 25 seconds I send an event to make the application crash > 3 - at this meantime the application recovers > 4 - after 25 seconds I send 6 events to stream02 > > Then, in the MyCoGroupFunction there are only events of stream02. Is this > the case where I have to use RichCoGroupFunction and save the state by > implementing the CheckpointedFunction? I am confused because > the CoGroupFunction.coGroup() method is called only when the Window closes > and then I see the output stream events of this operator. That is when > the Collector.collect() is called. > > What I think is that the events are held in memory and when the window > closes the CoGroupFunction.coGroup() is called. So I have to snapshot the > state in an operator before the CoGroupFunction. Is that correct? In case > anyone have a toy example of it (CoGroupFunction with Checkpoint and > testing it in a unit test) could you please send me the link? > > Thanks, > Felipe > >