I'm glad I could help, I hope it will solve your problem :)

Best,
Piotrek

pt., 18 cze 2021 o 14:38 Felipe Gutierrez <felipe.o.gutier...@gmail.com>
napisał(a):

> On Fri, Jun 18, 2021 at 1:41 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Hi,
>>
>> Keep in mind that this is a quite low level approach to this problem. It
>> would be much better to make sure that after recovery watermarks are still
>> being emitted.
>>
>
> yes. Indeed it looks like a very low level. I did a small test to emit one
> watermark for the stream that was recovered and then it can process
> the join. It has the same behavior on using a CoGroupFunction nad a
> CoProcessFunction. So in the end I don't need to implement
> MyCoProcessFunction with checkpoint. I just need to emit a new watermark
> after the job recovers.
>
> In my case, I am using Kafka source. so, if I make Kafka keeping emitting
> watermarks I solve the problem. Otherwise, I have to implement this custom
> operator.
>
> Thanks for your answer!
> Felipe
>
>
>>
>> If you are using a built-in source, it's probably easier to do it in a
>> custom operator. I would try to implement a custom one based on
>> AbstractStreamOperator. Your class would also need to implement the
>> OneInputStreamOperator interface. `processElement` you could implement as
>> an identity function (just pass down the stream element unchanged). In
>> `processWatermark` you would need to store the latest watermark on the
>> `ListState<Long>` field (you can declare it inside
>> `AbstractStreamOperator#initializeState` via `context.getListState(new
>> ListStateDescriptor<>("your-field-name", Long.class));`). During normal
>> processing (`processWatermark`) make sure it's a singleton list. During
>> recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling,
>> you would just access this state field and re-emit the only element on that
>> list. However during recovery, depending if you are scaling up (a) or down
>> (b), you could have a case where you sometimes have either (a) empty list
>> (in that case you can not emit anything), or (b) many elements on the list
>> (in that case you would need to calculate a minimum of all elements).
>>
>> As operator API is not a very oficial one, it's not well documented. For
>> an example you would need to take a look in the Flink code itself by
>> finding existing implementations of the `AbstractStreamOperator` or
>> `OneInputStreamOperator`.
>>
>> Best,
>> Piotrek
>>
>> pt., 18 cze 2021 o 12:49 Felipe Gutierrez <felipe.o.gutier...@gmail.com>
>> napisał(a):
>>
>>> Hello Piotrek,
>>>
>>> On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski <pnowoj...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> As far as I can tell timers should be checkpointed and recovered. What
>>>> may be happening is that the state of the last seen watermarks by operators
>>>> on different inputs and different channels inside an input is not
>>>> persisted. Flink is assuming that after the restart, watermark assigners
>>>> will emit newer watermarks after the recovery. However if one of your
>>>> inputs is dormant and it has already emitted some very high watermark long
>>>> time before the failure, after recovery if no new watermark is emitted,
>>>> this input/input channel might be preventing timers from firing. Can you
>>>> check if that's what's happening in your case?
>>>>
>>>
>>> I think you are correct. at least when I reproduce the bug it is like
>>> you said.
>>>
>>>
>>>> If so you would have to make sure one way or another that some
>>>> watermarks will be emitted after recovery. As a last resort, you could
>>>> manually store the watermarks in the operators/sources state and re-emit
>>>> last seen watermark during recovery.
>>>>
>>>
>>> Could you please point how I can checkpoint the watermarks on a source
>>> operator? Is it done by this code below from here (
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>> )?
>>>
>>> FlinkKafkaConsumer<MyType> kafkaSource = new
>>> FlinkKafkaConsumer<>("myTopic", schema, props);
>>> kafkaSource.assignTimestampsAndWatermarks(
>>>         WatermarkStrategy.
>>>                 .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
>>>
>>> Thanks,
>>> Felipe
>>>
>>>
>>>>
>>>> Best,
>>>> Piotrek
>>>>
>>>> czw., 17 cze 2021 o 13:46 Felipe Gutierrez <
>>>> felipe.o.gutier...@gmail.com> napisał(a):
>>>>
>>>>> Hi community,
>>>>>
>>>>> I have implemented a join function using CoProcessFunction with
>>>>> CheckpointedFunction to recover from failures. I added some debug lines to
>>>>> check if it is restoring and it does. Before the crash, I process events
>>>>> that fall at processElement2. I create snapshots at snapshotState(), the
>>>>> application comes back and restores the events. That is fine.
>>>>>
>>>>> After the restore, I process events that fall on processElement1. I
>>>>> register event timers for them as I did on processElement2 before the
>>>>> crash. But the onTimer() is never called. The point is that I don't have
>>>>> any events to send to processElement2() to make the CoProcessFunction
>>>>> register a time for them. They were sent before the crash.
>>>>>
>>>>> I suppose that the onTimer() is called only when there are
>>>>> "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 
>>>>> and
>>>>> processElement2. Because when I test the same application without crashing
>>>>> and the CoProcessFunction triggers the onTimer() method.
>>>>>
>>>>> But if I have a crash in the middle the CoProcessFunction does not
>>>>> call onTimer(). Why is that? Is that normal? What do I have to do to make
>>>>> the CoProcessFunction trigger the onTime() method even if only one stream
>>>>> is processed let's say at the processElement2() method and the other 
>>>>> stream
>>>>> is restored from a snapshot? I imagine that I have to register a time
>>>>> during the recovery (initializeState()). But how?
>>>>>
>>>>> thanks,
>>>>> Felipe
>>>>>
>>>>

Reply via email to