I'm glad I could help, I hope it will solve your problem :) Best, Piotrek
pt., 18 cze 2021 o 14:38 Felipe Gutierrez <felipe.o.gutier...@gmail.com> napisał(a): > On Fri, Jun 18, 2021 at 1:41 PM Piotr Nowojski <pnowoj...@apache.org> > wrote: > >> Hi, >> >> Keep in mind that this is a quite low level approach to this problem. It >> would be much better to make sure that after recovery watermarks are still >> being emitted. >> > > yes. Indeed it looks like a very low level. I did a small test to emit one > watermark for the stream that was recovered and then it can process > the join. It has the same behavior on using a CoGroupFunction nad a > CoProcessFunction. So in the end I don't need to implement > MyCoProcessFunction with checkpoint. I just need to emit a new watermark > after the job recovers. > > In my case, I am using Kafka source. so, if I make Kafka keeping emitting > watermarks I solve the problem. Otherwise, I have to implement this custom > operator. > > Thanks for your answer! > Felipe > > >> >> If you are using a built-in source, it's probably easier to do it in a >> custom operator. I would try to implement a custom one based on >> AbstractStreamOperator. Your class would also need to implement the >> OneInputStreamOperator interface. `processElement` you could implement as >> an identity function (just pass down the stream element unchanged). In >> `processWatermark` you would need to store the latest watermark on the >> `ListState<Long>` field (you can declare it inside >> `AbstractStreamOperator#initializeState` via `context.getListState(new >> ListStateDescriptor<>("your-field-name", Long.class));`). During normal >> processing (`processWatermark`) make sure it's a singleton list. During >> recovery (`AbstractStreamOpeartor#initializeState()`) without rescaling, >> you would just access this state field and re-emit the only element on that >> list. However during recovery, depending if you are scaling up (a) or down >> (b), you could have a case where you sometimes have either (a) empty list >> (in that case you can not emit anything), or (b) many elements on the list >> (in that case you would need to calculate a minimum of all elements). >> >> As operator API is not a very oficial one, it's not well documented. For >> an example you would need to take a look in the Flink code itself by >> finding existing implementations of the `AbstractStreamOperator` or >> `OneInputStreamOperator`. >> >> Best, >> Piotrek >> >> pt., 18 cze 2021 o 12:49 Felipe Gutierrez <felipe.o.gutier...@gmail.com> >> napisał(a): >> >>> Hello Piotrek, >>> >>> On Fri, Jun 18, 2021 at 11:48 AM Piotr Nowojski <pnowoj...@apache.org> >>> wrote: >>> >>>> Hi, >>>> >>>> As far as I can tell timers should be checkpointed and recovered. What >>>> may be happening is that the state of the last seen watermarks by operators >>>> on different inputs and different channels inside an input is not >>>> persisted. Flink is assuming that after the restart, watermark assigners >>>> will emit newer watermarks after the recovery. However if one of your >>>> inputs is dormant and it has already emitted some very high watermark long >>>> time before the failure, after recovery if no new watermark is emitted, >>>> this input/input channel might be preventing timers from firing. Can you >>>> check if that's what's happening in your case? >>>> >>> >>> I think you are correct. at least when I reproduce the bug it is like >>> you said. >>> >>> >>>> If so you would have to make sure one way or another that some >>>> watermarks will be emitted after recovery. As a last resort, you could >>>> manually store the watermarks in the operators/sources state and re-emit >>>> last seen watermark during recovery. >>>> >>> >>> Could you please point how I can checkpoint the watermarks on a source >>> operator? Is it done by this code below from here ( >>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector >>> )? >>> >>> FlinkKafkaConsumer<MyType> kafkaSource = new >>> FlinkKafkaConsumer<>("myTopic", schema, props); >>> kafkaSource.assignTimestampsAndWatermarks( >>> WatermarkStrategy. >>> .forBoundedOutOfOrderness(Duration.ofSeconds(20))); >>> >>> Thanks, >>> Felipe >>> >>> >>>> >>>> Best, >>>> Piotrek >>>> >>>> czw., 17 cze 2021 o 13:46 Felipe Gutierrez < >>>> felipe.o.gutier...@gmail.com> napisał(a): >>>> >>>>> Hi community, >>>>> >>>>> I have implemented a join function using CoProcessFunction with >>>>> CheckpointedFunction to recover from failures. I added some debug lines to >>>>> check if it is restoring and it does. Before the crash, I process events >>>>> that fall at processElement2. I create snapshots at snapshotState(), the >>>>> application comes back and restores the events. That is fine. >>>>> >>>>> After the restore, I process events that fall on processElement1. I >>>>> register event timers for them as I did on processElement2 before the >>>>> crash. But the onTimer() is never called. The point is that I don't have >>>>> any events to send to processElement2() to make the CoProcessFunction >>>>> register a time for them. They were sent before the crash. >>>>> >>>>> I suppose that the onTimer() is called only when there are >>>>> "timerService.registerEventTimeTimer(endOfWindow);" for processElement1 >>>>> and >>>>> processElement2. Because when I test the same application without crashing >>>>> and the CoProcessFunction triggers the onTimer() method. >>>>> >>>>> But if I have a crash in the middle the CoProcessFunction does not >>>>> call onTimer(). Why is that? Is that normal? What do I have to do to make >>>>> the CoProcessFunction trigger the onTime() method even if only one stream >>>>> is processed let's say at the processElement2() method and the other >>>>> stream >>>>> is restored from a snapshot? I imagine that I have to register a time >>>>> during the recovery (initializeState()). But how? >>>>> >>>>> thanks, >>>>> Felipe >>>>> >>>>