Hi,

Keep in mind that this is a quite low level approach to this problem. It
would be much better to make sure that after recovery watermarks are still
being emitted.

If you are using a built-in source, it's probably easier to do it in a
custom operator. I would try to implement a custom one based on
AbstractStreamOperator. Your class would also need to implement the
OneInputStreamOperator interface. `processElement` you could implement as
an identity function (just pass down the stream element unchanged). In
`processWatermark` you would need to store the latest watermark on the
`ListState<Long>` field (you can declare it inside
`AbstractStreamOperator#initializeState` via `context.getListState(new
ListStateDescriptor<>("your-field-name", Long.class));`). During normal
processing (`processWatermark`) make sure it's a singleton list. During
recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling,
you would just access this state field and re-emit the only element on that
list. However during recovery, depending if you are scaling up (a) or down
(b), you could have a case where you sometimes have either (a) empty list
(in that case you can not emit anything), or (b) many elements on the list
(in that case you would need to calculate a minimum of all elements).

As operator API is not a very oficial one, it's not well documented. For an
example you would need to take a look in the Flink code itself by finding
existing implementations of the `AbstractStreamOperator` or
`OneInputStreamOperator`.

Best,
Piotrek

pt., 18 cze 2021 o 12:49 Felipe Gutierrez <felipe.o.gutier...@gmail.com>
napisał(a):

> Hello Piotrek,
>
> On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Hi,
>>
>> As far as I can tell timers should be checkpointed and recovered. What
>> may be happening is that the state of the last seen watermarks by operators
>> on different inputs and different channels inside an input is not
>> persisted. Flink is assuming that after the restart, watermark assigners
>> will emit newer watermarks after the recovery. However if one of your
>> inputs is dormant and it has already emitted some very high watermark long
>> time before the failure, after recovery if no new watermark is emitted,
>> this input/input channel might be preventing timers from firing. Can you
>> check if that's what's happening in your case?
>>
>
> I think you are correct. at least when I reproduce the bug it is like you
> said.
>
>
>> If so you would have to make sure one way or another that some watermarks
>> will be emitted after recovery. As a last resort, you could manually store
>> the watermarks in the operators/sources state and re-emit last seen
>> watermark during recovery.
>>
>
> Could you please point how I can checkpoint the watermarks on a source
> operator? Is it done by this code below from here (
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
> )?
>
> FlinkKafkaConsumer<MyType> kafkaSource = new
> FlinkKafkaConsumer<>("myTopic", schema, props);
> kafkaSource.assignTimestampsAndWatermarks(
>         WatermarkStrategy.
>                 .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
>
> Thanks,
> Felipe
>
>
>>
>> Best,
>> Piotrek
>>
>> czw., 17 cze 2021 o 13:46 Felipe Gutierrez <felipe.o.gutier...@gmail.com>
>> napisał(a):
>>
>>> Hi community,
>>>
>>> I have implemented a join function using CoProcessFunction with
>>> CheckpointedFunction to recover from failures. I added some debug lines to
>>> check if it is restoring and it does. Before the crash, I process events
>>> that fall at processElement2. I create snapshots at snapshotState(), the
>>> application comes back and restores the events. That is fine.
>>>
>>> After the restore, I process events that fall on processElement1. I
>>> register event timers for them as I did on processElement2 before the
>>> crash. But the onTimer() is never called. The point is that I don't have
>>> any events to send to processElement2() to make the CoProcessFunction
>>> register a time for them. They were sent before the crash.
>>>
>>> I suppose that the onTimer() is called only when there are
>>> "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and
>>> processElement2. Because when I test the same application without crashing
>>> and the CoProcessFunction triggers the onTimer() method.
>>>
>>> But if I have a crash in the middle the CoProcessFunction does not call
>>> onTimer(). Why is that? Is that normal? What do I have to do to make the
>>> CoProcessFunction trigger the onTime() method even if only one stream is
>>> processed let's say at the processElement2() method and the other stream is
>>> restored from a snapshot? I imagine that I have to register a time during
>>> the recovery (initializeState()). But how?
>>>
>>> thanks,
>>> Felipe
>>>
>>

Reply via email to