Hi Felipe,

Which data source are you using?

> Then, in the MyCoGroupFunction there are only events of stream02

Are you storing events in your state?

> Is this the case where I have to use RichCoGroupFunction and save the
state by implementing the CheckpointedFunction?

If you want your state to be persisted with each checkpoint, and recovered
after a failure, ye .

On Tue, Jun 15, 2021 at 6:18 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi,
>
> I have a problem on my stream pipeline where the events on a
> CoGroupFunction are not restored after the application crashes. The
> application is like this:
>
> stream01.coGroup(stream02)
> .where(...).equalTo(...)
> .window(TumblingEventTimeWindows.of(1 minute))
> .apply(new MyCoGroupFunction())
> .process(new MyProcessFunction())
> .sink(new MySinkFunction)
>
> The checkpoint is configured to 20 seconds and the window is of 1 minute.
> I follow this sequence to reproduce the error:
> 1 - send 6 events to stream01
> 2 - after 25 seconds I send an event to make the application crash
> 3 - at this meantime the application recovers
> 4 - after 25 seconds I send 6 events to stream02
>
> Then, in the MyCoGroupFunction there are only events of stream02. Is this
> the case where I have to use RichCoGroupFunction and save the state by
> implementing the CheckpointedFunction? I am confused because
> the CoGroupFunction.coGroup() method is called only when the Window closes
> and then I see the output stream events of this operator. That is when
> the Collector.collect() is called.
>
> What I think is that the events are held in memory and when the window
> closes the CoGroupFunction.coGroup() is called. So I have to snapshot the
> state in an operator before the CoGroupFunction. Is that correct? In case
> anyone have a toy example of it (CoGroupFunction with Checkpoint and
> testing it in a unit test) could you please send me the link?
>
> Thanks,
> Felipe
>
>

Reply via email to