Hi,

Is it possible the watermark in TimerService not getting reset when a job
is restarted from a previous checkpoint? I would expect the watermark in a
TimerService also to go back in time.

I have the following ProcessFunction implementation.

  override def processElement(
    e: TraceEvent,
    ctx: ProcessFunction[
      TraceEvent,
      Trace
    ]#Context,
    out: Collector[Trace]
  ): Unit = {

    if (e.getAnnotationTime() < ctx.timerService().currentWatermark()) {
      registry.counter("tracing.outOfOrderEvents").increment()
    } else {
    ....
    }

I am noticing the outOfOrderEvents are reported until new events are read
in to the stream since the last restart.

Regards,
Nara

Reply via email to