Hi, Is it possible the watermark in TimerService not getting reset when a job is restarted from a previous checkpoint? I would expect the watermark in a TimerService also to go back in time.
I have the following ProcessFunction implementation. override def processElement( e: TraceEvent, ctx: ProcessFunction[ TraceEvent, Trace ]#Context, out: Collector[Trace] ): Unit = { if (e.getAnnotationTime() < ctx.timerService().currentWatermark()) { registry.counter("tracing.outOfOrderEvents").increment() } else { .... } I am noticing the outOfOrderEvents are reported until new events are read in to the stream since the last restart. Regards, Nara