Hello Piotr,

Could you please help me to ensure that I am implementing it in the correct
way?

I created the WatermarkFunction [1] based on the FilterFunction from Flink
and the WatermarkStreamOperator [2] and I am doing unit test [3]. Then
there are things that I am not sure how to do.

How to make the ListState singleton on all parallel operators?

When my job restarts I don't even have to call "processWatermark(new
Watermark(maxWatermark));" on the end of the "initializeState()". I can see
that the job process the previous watermarks before it fails. Is it because
the source is one that I created at the end of the unit test "MySource"? Or
is it because I don't have a join on the stream pipeline? I have the output
of my unit test below at this message in case you are not able to runt the
test.

[1]
https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkFunction.java
[2]
https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperator.java
[3]
https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperatorTest.java#L113

$ cd explore-flink/docker/ops-playground-image/java/explore-flink/
$ mvn -Dtest=WatermarkStreamOperatorTest#testRestartWithLatestWatermark test

WatermarkStreamOperator.initializeState
WatermarkStreamOperator.initializeState
WatermarkStreamOperator.initializeState
WatermarkStreamOperator.initializeState
initializeState... 0
initializeState... 0
initializeState... 0
initializeState... 0
maxWatermark: 0
maxWatermark: 0
maxWatermark: 0
maxWatermark: 0
processing watermark: 0
processing watermark: 0
processing watermark: 0
processing watermark: 0
processing watermark: 0
processing watermark: 0
processing watermark: 0
processing watermark: 0
Attempts restart: 0
processing watermark: 1
processing watermark: 1
processing watermark: 1
processing watermark: 1
Attempts restart: 0
processing watermark: 2
processing watermark: 2
processing watermark: 2
processing watermark: 2
Attempts restart: 0
processing watermark: 3
processing watermark: 3
processing watermark: 3
processing watermark: 3
Attempts restart: 0
processing watermark: 9223372036854775807
processing watermark: 9223372036854775807
processing watermark: 9223372036854775807
processing watermark: 9223372036854775807
This exception will trigger until the reference time [2021-06-21
16:57:19.531] reaches the trigger time [2021-06-21 16:57:21.672] // HERE
THE JOB IS RESTARTING
initializeState... 1
initializeState... 1
initializeState... 1
WatermarkStreamOperator.initializeState
WatermarkStreamOperator.initializeState
WatermarkStreamOperator.initializeState
WatermarkStreamOperator.initializeState
watermarkList recovered: 0
watermarkList recovered: 0
watermarkList recovered: 0
watermarkList recovered: 0
watermarkList recovered: 0
watermarkList recovered: 1
watermarkList recovered: 2
initializeState... 1
maxWatermark: 2 // HERE IS THE LATEST WATERMARK
processing watermark: 2 // I PROCESS IT HERE
watermarkList recovered: 0
watermarkList recovered: 1
watermarkList recovered: 0
watermarkList recovered: 0
watermarkList recovered: 1
watermarkList recovered: 1
watermarkList recovered: 2
watermarkList recovered: 2
watermarkList recovered: 2
maxWatermark: 2
maxWatermark: 2
processing watermark: 2
processing watermark: 2
maxWatermark: 2
processing watermark: 2
processing watermark: 0 // IS IS ALSO PROCESSING THE OTHER WATERMARKS. WHY?
processing watermark: 0
processing watermark: 0
processing watermark: 0
Attempts restart: 1
processing watermark: 1
processing watermark: 1
processing watermark: 1
processing watermark: 1
Attempts restart: 1
processing watermark: 2
processing watermark: 2
processing watermark: 2
processing watermark: 2
Attempts restart: 1
processing watermark: 3
processing watermark: 3
processing watermark: 3
processing watermark: 3
Attempts restart: 1
processing watermark: 9223372036854775807
processing watermark: 9223372036854775807
processing watermark: 9223372036854775807
processing watermark: 9223372036854775807
This is a poison but we do NOT throw an exception because the reference
time passed :) [2021-06-21 16:57:22.849] >= [2021-06-21 16:57:21.672]
Attempts restart: 1
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 6.836 sec


On Fri, Jun 18, 2021 at 2:46 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

> I'm glad I could help, I hope it will solve your problem :)
>
> Best,
> Piotrek
>
> pt., 18 cze 2021 o 14:38 Felipe Gutierrez <felipe.o.gutier...@gmail.com>
> napisał(a):
>
>> On Fri, Jun 18, 2021 at 1:41 PM Piotr Nowojski <pnowoj...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> Keep in mind that this is a quite low level approach to this problem. It
>>> would be much better to make sure that after recovery watermarks are still
>>> being emitted.
>>>
>>
>> yes. Indeed it looks like a very low level. I did a small test to emit
>> one watermark for the stream that was recovered and then it can process
>> the join. It has the same behavior on using a CoGroupFunction nad a
>> CoProcessFunction. So in the end I don't need to implement
>> MyCoProcessFunction with checkpoint. I just need to emit a new watermark
>> after the job recovers.
>>
>> In my case, I am using Kafka source. so, if I make Kafka keeping emitting
>> watermarks I solve the problem. Otherwise, I have to implement this custom
>> operator.
>>
>> Thanks for your answer!
>> Felipe
>>
>>
>>>
>>> If you are using a built-in source, it's probably easier to do it in a
>>> custom operator. I would try to implement a custom one based on
>>> AbstractStreamOperator. Your class would also need to implement the
>>> OneInputStreamOperator interface. `processElement` you could implement as
>>> an identity function (just pass down the stream element unchanged). In
>>> `processWatermark` you would need to store the latest watermark on the
>>> `ListState<Long>` field (you can declare it inside
>>> `AbstractStreamOperator#initializeState` via `context.getListState(new
>>> ListStateDescriptor<>("your-field-name", Long.class));`). During normal
>>> processing (`processWatermark`) make sure it's a singleton list. During
>>> recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling,
>>> you would just access this state field and re-emit the only element on that
>>> list. However during recovery, depending if you are scaling up (a) or down
>>> (b), you could have a case where you sometimes have either (a) empty list
>>> (in that case you can not emit anything), or (b) many elements on the list
>>> (in that case you would need to calculate a minimum of all elements).
>>>
>>> As operator API is not a very oficial one, it's not well documented. For
>>> an example you would need to take a look in the Flink code itself by
>>> finding existing implementations of the `AbstractStreamOperator` or
>>> `OneInputStreamOperator`.
>>>
>>> Best,
>>> Piotrek
>>>
>>> pt., 18 cze 2021 o 12:49 Felipe Gutierrez <felipe.o.gutier...@gmail.com>
>>> napisał(a):
>>>
>>>> Hello Piotrek,
>>>>
>>>> On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski <pnowoj...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> As far as I can tell timers should be checkpointed and recovered. What
>>>>> may be happening is that the state of the last seen watermarks by 
>>>>> operators
>>>>> on different inputs and different channels inside an input is not
>>>>> persisted. Flink is assuming that after the restart, watermark assigners
>>>>> will emit newer watermarks after the recovery. However if one of your
>>>>> inputs is dormant and it has already emitted some very high watermark long
>>>>> time before the failure, after recovery if no new watermark is emitted,
>>>>> this input/input channel might be preventing timers from firing. Can you
>>>>> check if that's what's happening in your case?
>>>>>
>>>>
>>>> I think you are correct. at least when I reproduce the bug it is like
>>>> you said.
>>>>
>>>>
>>>>> If so you would have to make sure one way or another that some
>>>>> watermarks will be emitted after recovery. As a last resort, you could
>>>>> manually store the watermarks in the operators/sources state and re-emit
>>>>> last seen watermark during recovery.
>>>>>
>>>>
>>>> Could you please point how I can checkpoint the watermarks on a source
>>>> operator? Is it done by this code below from here (
>>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector
>>>> )?
>>>>
>>>> FlinkKafkaConsumer<MyType> kafkaSource = new
>>>> FlinkKafkaConsumer<>("myTopic", schema, props);
>>>> kafkaSource.assignTimestampsAndWatermarks(
>>>>         WatermarkStrategy.
>>>>                 .forBoundedOutOfOrderness(Duration.ofSeconds(20)));
>>>>
>>>> Thanks,
>>>> Felipe
>>>>
>>>>
>>>>>
>>>>> Best,
>>>>> Piotrek
>>>>>
>>>>> czw., 17 cze 2021 o 13:46 Felipe Gutierrez <
>>>>> felipe.o.gutier...@gmail.com> napisał(a):
>>>>>
>>>>>> Hi community,
>>>>>>
>>>>>> I have implemented a join function using CoProcessFunction with
>>>>>> CheckpointedFunction to recover from failures. I added some debug lines 
>>>>>> to
>>>>>> check if it is restoring and it does. Before the crash, I process events
>>>>>> that fall at processElement2. I create snapshots at snapshotState(), the
>>>>>> application comes back and restores the events. That is fine.
>>>>>>
>>>>>> After the restore, I process events that fall on processElement1. I
>>>>>> register event timers for them as I did on processElement2 before the
>>>>>> crash. But the onTimer() is never called. The point is that I don't have
>>>>>> any events to send to processElement2() to make the CoProcessFunction
>>>>>> register a time for them. They were sent before the crash.
>>>>>>
>>>>>> I suppose that the onTimer() is called only when there are
>>>>>> "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 
>>>>>> and
>>>>>> processElement2. Because when I test the same application without 
>>>>>> crashing
>>>>>> and the CoProcessFunction triggers the onTimer() method.
>>>>>>
>>>>>> But if I have a crash in the middle the CoProcessFunction does not
>>>>>> call onTimer(). Why is that? Is that normal? What do I have to do to make
>>>>>> the CoProcessFunction trigger the onTime() method even if only one stream
>>>>>> is processed let's say at the processElement2() method and the other 
>>>>>> stream
>>>>>> is restored from a snapshot? I imagine that I have to register a time
>>>>>> during the recovery (initializeState()). But how?
>>>>>>
>>>>>> thanks,
>>>>>> Felipe
>>>>>>
>>>>>

Reply via email to