Hi Nara and Sihua, That's indeed an unexpected behavior and it would be good to identify the reason for the late data.
As Sihua said, watermarks are currently not checkpointed and reset to Long.MIN_VALUE upon restart. AFAIK, the main reason why WMs are not checkpointed is that the special type of operator state that is required for this (union-list state) wasn't available when the mechanism was implemented. I think there are plans to address this shortcoming (see FLINK-5601 [1]). Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-5601 2018-05-30 19:00 GMT+02:00 Narayanan Arunachalam < narayanan.arunacha...@gmail.com>: > Thanks Sihua. If it's reset to Long.MIN_VALUE I can't explain why > outOfOrderEvents are reported. Because the event time on the data will > always be greater than Long.MIN_VALUE. > > Following are the steps to reproduce this scenario. > - A source to produce events with timestamps that is increasing for every > event produced > - Use TimeCharacteristic.EventTime > - Use BoundedOutOfOrdernessTimestampExtractor with maxOutOfOrderness set > to 60s. > - Enable checkpoints > - ProcessFunction impl to report a counter to some metrics backend when > the timestamp of the event is less than currentWatermark > - No out of order events will be reported initially. After few checkpoints > are created, cancel and restart the job from a previous checkpoint. > > *Note*: The event stream really doesn't have out of order data. Job > restart from a checkpoint causes this artificial out of order events > because of the watermark value. > > Regards, > Nara > > > > > On Tue, May 29, 2018 at 7:54 PM, sihua zhou <summerle...@163.com> wrote: > >> Hi Nara, >> >> yes, the watermark in TimerService is not covered by the checkpoint, >> everytime the job is restarted from a previous checkpoint, it is reset to >> Long.MIN_VALUE. I can see it a bit tricky to cover it into the checkpoint, >> especially when we need to support rescaling(it seems not like a purely >> keyed or a operate state), maybe @Stefan or @Aljoscha could give you more >> useful information about why it wasn't covered by the checkpoint. >> >> Best, Sihua >> >> >> >> On 05/30/2018 05:44,Narayanan Arunachalam<narayanan.arunacha >> l...@gmail.com> <narayanan.arunacha...@gmail.com> wrote: >> >> Hi, >> >> Is it possible the watermark in TimerService not getting reset when a job >> is restarted from a previous checkpoint? I would expect the watermark in a >> TimerService also to go back in time. >> >> I have the following ProcessFunction implementation. >> >> override def processElement( >> e: TraceEvent, >> ctx: ProcessFunction[ >> TraceEvent, >> Trace >> ]#Context, >> out: Collector[Trace] >> ): Unit = { >> >> if (e.getAnnotationTime() < ctx.timerService().currentWatermark()) { >> registry.counter("tracing.outOfOrderEvents").increment() >> } else { >> .... >> } >> >> I am noticing the outOfOrderEvents are reported until new events are read >> in to the stream since the last restart. >> >> Regards, >> Nara >> >> >