Hi Robert,

1 - I am using Kafka010 as data source.
2 - No, I am not using any kind of ListState. That I think it must be used
3 - Good. I am going to use CheckpointedFunction.

Just a follow-up question. I was reimplementing it using CoProcessFunction
to save the state and trigger the window. So, based on your answer I think
I am overcomplicating it. If I just use RichCoGroupFunction, save the
states on a ListState, and implement CheckpointedFunction, it will do
everything that I need. Is that correct? Then I don't have to implement the
event window trigger at onTimer(). I just use the regular window from
Flink. is that correct?

Thanks

*--*
*-- Felipe Gutierrez*
*-- skype: felipe.o.gutierrez*


On Wed, Jun 16, 2021 at 2:16 PM Robert Metzger <rmetz...@apache.org> wrote:

> Hi Felipe,
>
> Which data source are you using?
>
> > Then, in the MyCoGroupFunction there are only events of stream02
>
> Are you storing events in your state?
>
> > Is this the case where I have to use RichCoGroupFunction and save the
> state by implementing the CheckpointedFunction?
>
> If you want your state to be persisted with each checkpoint, and recovered
> after a failure, ye .
>
> On Tue, Jun 15, 2021 at 6:18 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a problem on my stream pipeline where the events on a
>> CoGroupFunction are not restored after the application crashes. The
>> application is like this:
>>
>> stream01.coGroup(stream02)
>> .where(...).equalTo(...)
>> .window(TumblingEventTimeWindows.of(1 minute))
>> .apply(new MyCoGroupFunction())
>> .process(new MyProcessFunction())
>> .sink(new MySinkFunction)
>>
>> The checkpoint is configured to 20 seconds and the window is of 1 minute.
>> I follow this sequence to reproduce the error:
>> 1 - send 6 events to stream01
>> 2 - after 25 seconds I send an event to make the application crash
>> 3 - at this meantime the application recovers
>> 4 - after 25 seconds I send 6 events to stream02
>>
>> Then, in the MyCoGroupFunction there are only events of stream02. Is this
>> the case where I have to use RichCoGroupFunction and save the state by
>> implementing the CheckpointedFunction? I am confused because
>> the CoGroupFunction.coGroup() method is called only when the Window closes
>> and then I see the output stream events of this operator. That is when
>> the Collector.collect() is called.
>>
>> What I think is that the events are held in memory and when the window
>> closes the CoGroupFunction.coGroup() is called. So I have to snapshot the
>> state in an operator before the CoGroupFunction. Is that correct? In case
>> anyone have a toy example of it (CoGroupFunction with Checkpoint and
>> testing it in a unit test) could you please send me the link?
>>
>> Thanks,
>> Felipe
>>
>>

Reply via email to