On Fri, Jun 18, 2021 at 1:41 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi,
>
> Keep in mind that this is a quite low level approach to this problem. It
> would be much better to make sure that after recovery watermarks are still
> being emitted.
>

yes. Indeed it looks like a very low level. I did a small test to emit one
watermark for the stream that was recovered and then it can process
the join. It has the same behavior on using a CoGroupFunction nad a
CoProcessFunction. So in the end I don't need to implement
MyCoProcessFunction with checkpoint. I just need to emit a new watermark
after the job recovers.

In my case, I am using Kafka source. so, if I make Kafka keeping emitting
watermarks I solve the problem. Otherwise, I have to implement this custom
operator.

Thanks for your answer!
Felipe


>
> If you are using a built-in source, it's probably easier to do it in a
> custom operator. I would try to implement a custom one based on
> AbstractStreamOperator. Your class would also need to implement the
> OneInputStreamOperator interface. `processElement` you could implement as
> an identity function (just pass down the stream element unchanged). In
> `processWatermark` you would need to store the latest watermark on the
> `ListState<Long>` field (you can declare it inside
> `AbstractStreamOperator#initializeState` via `context.getListState(new
> ListStateDescriptor<>("your-field-name", Long.class));`). During normal
> processing (`processWatermark`) make sure it's a singleton list. During
> recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling,
> you would just access this state field and re-emit the only element on that
> list. However during recovery, depending if you are scaling up (a) or down
> (b), you could have a case where you sometimes have either (a) empty list
> (in that case you can not emit anything), or (b) many elements on the list
> (in that case you would need to calculate a minimum of all elements).
>
> As operator API is not a very oficial one, it's not well documented. For
> an example you would need to take a look in the Flink code itself by
> finding existing implementations of the `AbstractStreamOperator` or
> `OneInputStreamOperator`.
>
> Best,
> Piotrek
>
> pt., 18 cze 2021 o 12:49 Felipe Gutierrez <felipe.o.gutier...@gmail.com>
> napisał(a):
>
>> Hello Piotrek,
>>
>> On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski <pnowoj...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> As far as I can tell timers should be checkpointed and recovered. What
>>> may be happening is that the state of the last seen watermarks by operators
>>> on different inputs and different channels inside an input is not
>>> persisted. Flink is assuming that after the restart, watermark assigners
>>> will emit newer watermarks after the recovery. However if one of your
>>> inputs is dormant and it has already emitted some very high watermark long
>>> time before the failure, after recovery if no new watermark is emitted,
>>> this input/input channel might be preventing timers from firing. Can you
>>> check if that's what's happening in your case?
>>>
>>
>> I think you are correct. at least when I reproduce the bug it is like you
>> said.
>>
>>
>>> If so you would have to make sure one way or another that some
>>> watermarks will be emitted after recovery. As a last resort, you could
>>> manually store the watermarks in the operators/sources state and re-emit
>>> last seen watermark during recovery.
>>>
>>
>> Could you please point how I can checkpoint the watermarks on a source
>> operator? Is it done by this code below from here (
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>> )?
>>
>> FlinkKafkaConsumer<MyType> kafkaSource = new
>> FlinkKafkaConsumer<>("myTopic", schema, props);
>> kafkaSource.assignTimestampsAndWatermarks(
>>         WatermarkStrategy.
>>                 .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
>>
>> Thanks,
>> Felipe
>>
>>
>>>
>>> Best,
>>> Piotrek
>>>
>>> czw., 17 cze 2021 o 13:46 Felipe Gutierrez <felipe.o.gutier...@gmail.com>
>>> napisał(a):
>>>
>>>> Hi community,
>>>>
>>>> I have implemented a join function using CoProcessFunction with
>>>> CheckpointedFunction to recover from failures. I added some debug lines to
>>>> check if it is restoring and it does. Before the crash, I process events
>>>> that fall at processElement2. I create snapshots at snapshotState(), the
>>>> application comes back and restores the events. That is fine.
>>>>
>>>> After the restore, I process events that fall on processElement1. I
>>>> register event timers for them as I did on processElement2 before the
>>>> crash. But the onTimer() is never called. The point is that I don't have
>>>> any events to send to processElement2() to make the CoProcessFunction
>>>> register a time for them. They were sent before the crash.
>>>>
>>>> I suppose that the onTimer() is called only when there are
>>>> "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and
>>>> processElement2. Because when I test the same application without crashing
>>>> and the CoProcessFunction triggers the onTimer() method.
>>>>
>>>> But if I have a crash in the middle the CoProcessFunction does not call
>>>> onTimer(). Why is that? Is that normal? What do I have to do to make the
>>>> CoProcessFunction trigger the onTime() method even if only one stream is
>>>> processed let's say at the processElement2() method and the other stream is
>>>> restored from a snapshot? I imagine that I have to register a time during
>>>> the recovery (initializeState()). But how?
>>>>
>>>> thanks,
>>>> Felipe
>>>>
>>>

Reply via email to