Thanks Sihua. If it's reset to Long.MIN_VALUE I can't explain why
outOfOrderEvents are reported. Because the event time on the data will
always be greater than Long.MIN_VALUE.

Following are the steps to reproduce this scenario.
- A source to produce events with timestamps that is increasing for every
event produced
- Use TimeCharacteristic.EventTime
- Use BoundedOutOfOrdernessTimestampExtractor with maxOutOfOrderness set to
60s.
- Enable checkpoints
- ProcessFunction impl to report a counter to some metrics backend when the
timestamp of the event is less than currentWatermark
- No out of order events will be reported initially. After few checkpoints
are created, cancel and restart the job from a previous checkpoint.

*Note*: The event stream really doesn't have out of order data. Job restart
from a checkpoint causes this artificial out of order events because of the
watermark value.

Regards,
Nara




On Tue, May 29, 2018 at 7:54 PM, sihua zhou <summerle...@163.com> wrote:

> Hi Nara,
>
> yes, the watermark in TimerService is not covered by the checkpoint,
> everytime the job is restarted from a previous checkpoint, it is reset to
> Long.MIN_VALUE. I can see it a bit tricky to cover it into the checkpoint,
> especially when we need to support rescaling(it seems not like a purely
> keyed or a operate state), maybe @Stefan or @Aljoscha could give you more
> useful information about why it wasn't covered by the checkpoint.
>
> Best, Sihua
>
>
>
> On 05/30/2018 05:44,Narayanan Arunachalam<narayanan.arunacha...@gmail.com>
> <narayanan.arunacha...@gmail.com> wrote:
>
> Hi,
>
> Is it possible the watermark in TimerService not getting reset when a job
> is restarted from a previous checkpoint? I would expect the watermark in a
> TimerService also to go back in time.
>
> I have the following ProcessFunction implementation.
>
>   override def processElement(
>     e: TraceEvent,
>     ctx: ProcessFunction[
>       TraceEvent,
>       Trace
>     ]#Context,
>     out: Collector[Trace]
>   ): Unit = {
>
>     if (e.getAnnotationTime() < ctx.timerService().currentWatermark()) {
>       registry.counter("tracing.outOfOrderEvents").increment()
>     } else {
>     ....
>     }
>
> I am noticing the outOfOrderEvents are reported until new events are read
> in to the stream since the last restart.
>
> Regards,
> Nara
>
>

Reply via email to