Hi,

I have a problem on my stream pipeline where the events on a
CoGroupFunction are not restored after the application crashes. The
application is like this:

stream01.coGroup(stream02)
.where(...).equalTo(...)
.window(TumblingEventTimeWindows.of(1 minute))
.apply(new MyCoGroupFunction())
.process(new MyProcessFunction())
.sink(new MySinkFunction)

The checkpoint is configured to 20 seconds and the window is of 1 minute. I
follow this sequence to reproduce the error:
1 - send 6 events to stream01
2 - after 25 seconds I send an event to make the application crash
3 - at this meantime the application recovers
4 - after 25 seconds I send 6 events to stream02

Then, in the MyCoGroupFunction there are only events of stream02. Is this
the case where I have to use RichCoGroupFunction and save the state by
implementing the CheckpointedFunction? I am confused because
the CoGroupFunction.coGroup() method is called only when the Window closes
and then I see the output stream events of this operator. That is when
the Collector.collect() is called.

What I think is that the events are held in memory and when the window
closes the CoGroupFunction.coGroup() is called. So I have to snapshot the
state in an operator before the CoGroupFunction. Is that correct? In case
anyone have a toy example of it (CoGroupFunction with Checkpoint and
testing it in a unit test) could you please send me the link?

Thanks,
Felipe

Reply via email to