Hello Piotr, Could you please help me to ensure that I am implementing it in the correct way?
I created the WatermarkFunction [1] based on the FilterFunction from Flink and the WatermarkStreamOperator [2] and I am doing unit test [3]. Then there are things that I am not sure how to do. How to make the ListState singleton on all parallel operators? When my job restarts I don't even have to call "processWatermark(new Watermark(maxWatermark));" on the end of the "initializeState()". I can see that the job process the previous watermarks before it fails. Is it because the source is one that I created at the end of the unit test "MySource"? Or is it because I don't have a join on the stream pipeline? I have the output of my unit test below at this message in case you are not able to runt the test. [1] https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkFunction.java [2] https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/main/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperator.java [3] https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperatorTest.java#L113 $ cd explore-flink/docker/ops-playground-image/java/explore-flink/ $ mvn -Dtest=WatermarkStreamOperatorTest#testRestartWithLatestWatermark test WatermarkStreamOperator.initializeState WatermarkStreamOperator.initializeState WatermarkStreamOperator.initializeState WatermarkStreamOperator.initializeState initializeState... 0 initializeState... 0 initializeState... 0 initializeState... 0 maxWatermark: 0 maxWatermark: 0 maxWatermark: 0 maxWatermark: 0 processing watermark: 0 processing watermark: 0 processing watermark: 0 processing watermark: 0 processing watermark: 0 processing watermark: 0 processing watermark: 0 processing watermark: 0 Attempts restart: 0 processing watermark: 1 processing watermark: 1 processing watermark: 1 processing watermark: 1 Attempts restart: 0 processing watermark: 2 processing watermark: 2 processing watermark: 2 processing watermark: 2 Attempts restart: 0 processing watermark: 3 processing watermark: 3 processing watermark: 3 processing watermark: 3 Attempts restart: 0 processing watermark: 9223372036854775807 processing watermark: 9223372036854775807 processing watermark: 9223372036854775807 processing watermark: 9223372036854775807 This exception will trigger until the reference time [2021-06-21 16:57:19.531] reaches the trigger time [2021-06-21 16:57:21.672] // HERE THE JOB IS RESTARTING initializeState... 1 initializeState... 1 initializeState... 1 WatermarkStreamOperator.initializeState WatermarkStreamOperator.initializeState WatermarkStreamOperator.initializeState WatermarkStreamOperator.initializeState watermarkList recovered: 0 watermarkList recovered: 0 watermarkList recovered: 0 watermarkList recovered: 0 watermarkList recovered: 0 watermarkList recovered: 1 watermarkList recovered: 2 initializeState... 1 maxWatermark: 2 // HERE IS THE LATEST WATERMARK processing watermark: 2 // I PROCESS IT HERE watermarkList recovered: 0 watermarkList recovered: 1 watermarkList recovered: 0 watermarkList recovered: 0 watermarkList recovered: 1 watermarkList recovered: 1 watermarkList recovered: 2 watermarkList recovered: 2 watermarkList recovered: 2 maxWatermark: 2 maxWatermark: 2 processing watermark: 2 processing watermark: 2 maxWatermark: 2 processing watermark: 2 processing watermark: 0 // IS IS ALSO PROCESSING THE OTHER WATERMARKS. WHY? processing watermark: 0 processing watermark: 0 processing watermark: 0 Attempts restart: 1 processing watermark: 1 processing watermark: 1 processing watermark: 1 processing watermark: 1 Attempts restart: 1 processing watermark: 2 processing watermark: 2 processing watermark: 2 processing watermark: 2 Attempts restart: 1 processing watermark: 3 processing watermark: 3 processing watermark: 3 processing watermark: 3 Attempts restart: 1 processing watermark: 9223372036854775807 processing watermark: 9223372036854775807 processing watermark: 9223372036854775807 processing watermark: 9223372036854775807 This is a poison but we do NOT throw an exception because the reference time passed :) [2021-06-21 16:57:22.849] >= [2021-06-21 16:57:21.672] Attempts restart: 1 Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 6.836 sec On Fri, Jun 18, 2021 at 2:46 PM Piotr Nowojski <pnowoj...@apache.org> wrote: > I'm glad I could help, I hope it will solve your problem :) > > Best, > Piotrek > > pt., 18 cze 2021 o 14:38 Felipe Gutierrez <felipe.o.gutier...@gmail.com> > napisał(a): > >> On Fri, Jun 18, 2021 at 1:41 PM Piotr Nowojski <pnowoj...@apache.org> >> wrote: >> >>> Hi, >>> >>> Keep in mind that this is a quite low level approach to this problem. It >>> would be much better to make sure that after recovery watermarks are still >>> being emitted. >>> >> >> yes. Indeed it looks like a very low level. I did a small test to emit >> one watermark for the stream that was recovered and then it can process >> the join. It has the same behavior on using a CoGroupFunction nad a >> CoProcessFunction. So in the end I don't need to implement >> MyCoProcessFunction with checkpoint. I just need to emit a new watermark >> after the job recovers. >> >> In my case, I am using Kafka source. so, if I make Kafka keeping emitting >> watermarks I solve the problem. Otherwise, I have to implement this custom >> operator. >> >> Thanks for your answer! >> Felipe >> >> >>> >>> If you are using a built-in source, it's probably easier to do it in a >>> custom operator. I would try to implement a custom one based on >>> AbstractStreamOperator. Your class would also need to implement the >>> OneInputStreamOperator interface. `processElement` you could implement as >>> an identity function (just pass down the stream element unchanged). In >>> `processWatermark` you would need to store the latest watermark on the >>> `ListState<Long>` field (you can declare it inside >>> `AbstractStreamOperator#initializeState` via `context.getListState(new >>> ListStateDescriptor<>("your-field-name", Long.class));`). During normal >>> processing (`processWatermark`) make sure it's a singleton list. During >>> recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling, >>> you would just access this state field and re-emit the only element on that >>> list. However during recovery, depending if you are scaling up (a) or down >>> (b), you could have a case where you sometimes have either (a) empty list >>> (in that case you can not emit anything), or (b) many elements on the list >>> (in that case you would need to calculate a minimum of all elements). >>> >>> As operator API is not a very oficial one, it's not well documented. For >>> an example you would need to take a look in the Flink code itself by >>> finding existing implementations of the `AbstractStreamOperator` or >>> `OneInputStreamOperator`. >>> >>> Best, >>> Piotrek >>> >>> pt., 18 cze 2021 o 12:49 Felipe Gutierrez <felipe.o.gutier...@gmail.com> >>> napisał(a): >>> >>>> Hello Piotrek, >>>> >>>> On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski <pnowoj...@apache.org> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> As far as I can tell timers should be checkpointed and recovered. What >>>>> may be happening is that the state of the last seen watermarks by >>>>> operators >>>>> on different inputs and different channels inside an input is not >>>>> persisted. Flink is assuming that after the restart, watermark assigners >>>>> will emit newer watermarks after the recovery. However if one of your >>>>> inputs is dormant and it has already emitted some very high watermark long >>>>> time before the failure, after recovery if no new watermark is emitted, >>>>> this input/input channel might be preventing timers from firing. Can you >>>>> check if that's what's happening in your case? >>>>> >>>> >>>> I think you are correct. at least when I reproduce the bug it is like >>>> you said. >>>> >>>> >>>>> If so you would have to make sure one way or another that some >>>>> watermarks will be emitted after recovery. As a last resort, you could >>>>> manually store the watermarks in the operators/sources state and re-emit >>>>> last seen watermark during recovery. >>>>> >>>> >>>> Could you please point how I can checkpoint the watermarks on a source >>>> operator? Is it done by this code below from here ( >>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector >>>> )? >>>> >>>> FlinkKafkaConsumer<MyType> kafkaSource = new >>>> FlinkKafkaConsumer<>("myTopic", schema, props); >>>> kafkaSource.assignTimestampsAndWatermarks( >>>> WatermarkStrategy. >>>> .forBoundedOutOfOrderness(Duration.ofSeconds(20))); >>>> >>>> Thanks, >>>> Felipe >>>> >>>> >>>>> >>>>> Best, >>>>> Piotrek >>>>> >>>>> czw., 17 cze 2021 o 13:46 Felipe Gutierrez < >>>>> felipe.o.gutier...@gmail.com> napisał(a): >>>>> >>>>>> Hi community, >>>>>> >>>>>> I have implemented a join function using CoProcessFunction with >>>>>> CheckpointedFunction to recover from failures. I added some debug lines >>>>>> to >>>>>> check if it is restoring and it does. Before the crash, I process events >>>>>> that fall at processElement2. I create snapshots at snapshotState(), the >>>>>> application comes back and restores the events. That is fine. >>>>>> >>>>>> After the restore, I process events that fall on processElement1. I >>>>>> register event timers for them as I did on processElement2 before the >>>>>> crash. But the onTimer() is never called. The point is that I don't have >>>>>> any events to send to processElement2() to make the CoProcessFunction >>>>>> register a time for them. They were sent before the crash. >>>>>> >>>>>> I suppose that the onTimer() is called only when there are >>>>>> "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 >>>>>> and >>>>>> processElement2. Because when I test the same application without >>>>>> crashing >>>>>> and the CoProcessFunction triggers the onTimer() method. >>>>>> >>>>>> But if I have a crash in the middle the CoProcessFunction does not >>>>>> call onTimer(). Why is that? Is that normal? What do I have to do to make >>>>>> the CoProcessFunction trigger the onTime() method even if only one stream >>>>>> is processed let's say at the processElement2() method and the other >>>>>> stream >>>>>> is restored from a snapshot? I imagine that I have to register a time >>>>>> during the recovery (initializeState()). But how? >>>>>> >>>>>> thanks, >>>>>> Felipe >>>>>> >>>>>