Hello Piotrek,

On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski <pnowoj...@apache.org>
wrote:

> Hi,
>
> As far as I can tell timers should be checkpointed and recovered. What may
> be happening is that the state of the last seen watermarks by operators on
> different inputs and different channels inside an input is not persisted.
> Flink is assuming that after the restart, watermark assigners will emit
> newer watermarks after the recovery. However if one of your inputs is
> dormant and it has already emitted some very high watermark long time
> before the failure, after recovery if no new watermark is emitted, this
> input/input channel might be preventing timers from firing. Can you check
> if that's what's happening in your case?
>

I think you are correct. at least when I reproduce the bug it is like you
said.


> If so you would have to make sure one way or another that some watermarks
> will be emitted after recovery. As a last resort, you could manually store
> the watermarks in the operators/sources state and re-emit last seen
> watermark during recovery.
>

Could you please point how I can checkpoint the watermarks on a source
operator? Is it done by this code below from here (
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
)?

FlinkKafkaConsumer<MyType> kafkaSource = new
FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(
        WatermarkStrategy.
                .forBoundedOutOfOrderness(Duration.ofSeconds(20)));

Thanks,
Felipe


>
> Best,
> Piotrek
>
> czw., 17 cze 2021 o 13:46 Felipe Gutierrez <felipe.o.gutier...@gmail.com>
> napisaƂ(a):
>
>> Hi community,
>>
>> I have implemented a join function using CoProcessFunction with
>> CheckpointedFunction to recover from failures. I added some debug lines to
>> check if it is restoring and it does. Before the crash, I process events
>> that fall at processElement2. I create snapshots at snapshotState(), the
>> application comes back and restores the events. That is fine.
>>
>> After the restore, I process events that fall on processElement1. I
>> register event timers for them as I did on processElement2 before the
>> crash. But the onTimer() is never called. The point is that I don't have
>> any events to send to processElement2() to make the CoProcessFunction
>> register a time for them. They were sent before the crash.
>>
>> I suppose that the onTimer() is called only when there are
>> "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 and
>> processElement2. Because when I test the same application without crashing
>> and the CoProcessFunction triggers the onTimer() method.
>>
>> But if I have a crash in the middle the CoProcessFunction does not call
>> onTimer(). Why is that? Is that normal? What do I have to do to make the
>> CoProcessFunction trigger the onTime() method even if only one stream is
>> processed let's say at the processElement2() method and the other stream is
>> restored from a snapshot? I imagine that I have to register a time during
>> the recovery (initializeState()). But how?
>>
>> thanks,
>> Felipe
>>
>

Reply via email to