Thanks Sihua. If it's reset to Long.MIN_VALUE I can't explain why outOfOrderEvents are reported. Because the event time on the data will always be greater than Long.MIN_VALUE.
Following are the steps to reproduce this scenario. - A source to produce events with timestamps that is increasing for every event produced - Use TimeCharacteristic.EventTime - Use BoundedOutOfOrdernessTimestampExtractor with maxOutOfOrderness set to 60s. - Enable checkpoints - ProcessFunction impl to report a counter to some metrics backend when the timestamp of the event is less than currentWatermark - No out of order events will be reported initially. After few checkpoints are created, cancel and restart the job from a previous checkpoint. *Note*: The event stream really doesn't have out of order data. Job restart from a checkpoint causes this artificial out of order events because of the watermark value. Regards, Nara On Tue, May 29, 2018 at 7:54 PM, sihua zhou <summerle...@163.com> wrote: > Hi Nara, > > yes, the watermark in TimerService is not covered by the checkpoint, > everytime the job is restarted from a previous checkpoint, it is reset to > Long.MIN_VALUE. I can see it a bit tricky to cover it into the checkpoint, > especially when we need to support rescaling(it seems not like a purely > keyed or a operate state), maybe @Stefan or @Aljoscha could give you more > useful information about why it wasn't covered by the checkpoint. > > Best, Sihua > > > > On 05/30/2018 05:44,Narayanan Arunachalam<narayanan.arunacha...@gmail.com> > <narayanan.arunacha...@gmail.com> wrote: > > Hi, > > Is it possible the watermark in TimerService not getting reset when a job > is restarted from a previous checkpoint? I would expect the watermark in a > TimerService also to go back in time. > > I have the following ProcessFunction implementation. > > override def processElement( > e: TraceEvent, > ctx: ProcessFunction[ > TraceEvent, > Trace > ]#Context, > out: Collector[Trace] > ): Unit = { > > if (e.getAnnotationTime() < ctx.timerService().currentWatermark()) { > registry.counter("tracing.outOfOrderEvents").increment() > } else { > .... > } > > I am noticing the outOfOrderEvents are reported until new events are read > in to the stream since the last restart. > > Regards, > Nara > >