Hi Robert, 1 - I am using Kafka010 as data source. 2 - No, I am not using any kind of ListState. That I think it must be used 3 - Good. I am going to use CheckpointedFunction.
Just a follow-up question. I was reimplementing it using CoProcessFunction to save the state and trigger the window. So, based on your answer I think I am overcomplicating it. If I just use RichCoGroupFunction, save the states on a ListState, and implement CheckpointedFunction, it will do everything that I need. Is that correct? Then I don't have to implement the event window trigger at onTimer(). I just use the regular window from Flink. is that correct? Thanks *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* On Wed, Jun 16, 2021 at 2:16 PM Robert Metzger <rmetz...@apache.org> wrote: > Hi Felipe, > > Which data source are you using? > > > Then, in the MyCoGroupFunction there are only events of stream02 > > Are you storing events in your state? > > > Is this the case where I have to use RichCoGroupFunction and save the > state by implementing the CheckpointedFunction? > > If you want your state to be persisted with each checkpoint, and recovered > after a failure, ye . > > On Tue, Jun 15, 2021 at 6:18 PM Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > >> Hi, >> >> I have a problem on my stream pipeline where the events on a >> CoGroupFunction are not restored after the application crashes. The >> application is like this: >> >> stream01.coGroup(stream02) >> .where(...).equalTo(...) >> .window(TumblingEventTimeWindows.of(1 minute)) >> .apply(new MyCoGroupFunction()) >> .process(new MyProcessFunction()) >> .sink(new MySinkFunction) >> >> The checkpoint is configured to 20 seconds and the window is of 1 minute. >> I follow this sequence to reproduce the error: >> 1 - send 6 events to stream01 >> 2 - after 25 seconds I send an event to make the application crash >> 3 - at this meantime the application recovers >> 4 - after 25 seconds I send 6 events to stream02 >> >> Then, in the MyCoGroupFunction there are only events of stream02. Is this >> the case where I have to use RichCoGroupFunction and save the state by >> implementing the CheckpointedFunction? I am confused because >> the CoGroupFunction.coGroup() method is called only when the Window closes >> and then I see the output stream events of this operator. That is when >> the Collector.collect() is called. >> >> What I think is that the events are held in memory and when the window >> closes the CoGroupFunction.coGroup() is called. So I have to snapshot the >> state in an operator before the CoGroupFunction. Is that correct? In case >> anyone have a toy example of it (CoGroupFunction with Checkpoint and >> testing it in a unit test) could you please send me the link? >> >> Thanks, >> Felipe >> >>