Thanks for the explanation. I looked at this metric closely and noticed there are some events arriving in out of order. The hypothesis I have is, when the job is restarted, all of the small out of order chunks add up and show a significant number. The graph below shows the number of out of order events every min. The job was started with new state at 11:53 am and then restarted with the previous checkpoint at 1:24 pm.
That said, after restart the out of order events number is very high though :thinking_face: On Wed, May 30, 2018 at 1:55 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Nara and Sihua, > > That's indeed an unexpected behavior and it would be good to identify the > reason for the late data. > > As Sihua said, watermarks are currently not checkpointed and reset to > Long.MIN_VALUE upon restart. > AFAIK, the main reason why WMs are not checkpointed is that the special > type of operator state that is required for this (union-list state) wasn't > available when the mechanism was implemented. > I think there are plans to address this shortcoming (see FLINK-5601 [1]). > > Best, Fabian > > [1] https://issues.apache.org/jira/browse/FLINK-5601 > > 2018-05-30 19:00 GMT+02:00 Narayanan Arunachalam < > narayanan.arunacha...@gmail.com>: > >> Thanks Sihua. If it's reset to Long.MIN_VALUE I can't explain why >> outOfOrderEvents are reported. Because the event time on the data will >> always be greater than Long.MIN_VALUE. >> >> Following are the steps to reproduce this scenario. >> - A source to produce events with timestamps that is increasing for every >> event produced >> - Use TimeCharacteristic.EventTime >> - Use BoundedOutOfOrdernessTimestampExtractor with maxOutOfOrderness set >> to 60s. >> - Enable checkpoints >> - ProcessFunction impl to report a counter to some metrics backend when >> the timestamp of the event is less than currentWatermark >> - No out of order events will be reported initially. After few >> checkpoints are created, cancel and restart the job from a previous >> checkpoint. >> >> *Note*: The event stream really doesn't have out of order data. Job >> restart from a checkpoint causes this artificial out of order events >> because of the watermark value. >> >> Regards, >> Nara >> >> >> >> >> On Tue, May 29, 2018 at 7:54 PM, sihua zhou <summerle...@163.com> wrote: >> >>> Hi Nara, >>> >>> yes, the watermark in TimerService is not covered by the checkpoint, >>> everytime the job is restarted from a previous checkpoint, it is reset to >>> Long.MIN_VALUE. I can see it a bit tricky to cover it into the checkpoint, >>> especially when we need to support rescaling(it seems not like a purely >>> keyed or a operate state), maybe @Stefan or @Aljoscha could give you more >>> useful information about why it wasn't covered by the checkpoint. >>> >>> Best, Sihua >>> >>> >>> >>> On 05/30/2018 05:44,Narayanan Arunachalam<narayanan.arunacha >>> l...@gmail.com> <narayanan.arunacha...@gmail.com> wrote: >>> >>> Hi, >>> >>> Is it possible the watermark in TimerService not getting reset when a >>> job is restarted from a previous checkpoint? I would expect the watermark >>> in a TimerService also to go back in time. >>> >>> I have the following ProcessFunction implementation. >>> >>> override def processElement( >>> e: TraceEvent, >>> ctx: ProcessFunction[ >>> TraceEvent, >>> Trace >>> ]#Context, >>> out: Collector[Trace] >>> ): Unit = { >>> >>> if (e.getAnnotationTime() < ctx.timerService().currentWatermark()) { >>> registry.counter("tracing.outOfOrderEvents").increment() >>> } else { >>> .... >>> } >>> >>> I am noticing the outOfOrderEvents are reported until new events are >>> read in to the stream since the last restart. >>> >>> Regards, >>> Nara >>> >>> >> >