Thanks for the explanation. I looked at this metric closely and noticed
there are some events arriving in out of order. The hypothesis I have is,
when the job is restarted, all of the small out of order chunks add up and
show a significant number. The graph below shows the number of out of order
events every min. The job was started with new state at 11:53 am and then
restarted with the previous checkpoint at 1:24 pm.

That said, after restart the out of order events number is very high though
:thinking_face:





On Wed, May 30, 2018 at 1:55 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Nara and Sihua,
>
> That's indeed an unexpected behavior and it would be good to identify the
> reason for the late data.
>
> As Sihua said, watermarks are currently not checkpointed and reset to
> Long.MIN_VALUE upon restart.
> AFAIK, the main reason why WMs are not checkpointed is that the special
> type of operator state that is required for this (union-list state) wasn't
> available when the mechanism was implemented.
> I think there are plans to address this shortcoming (see FLINK-5601 [1]).
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-5601
>
> 2018-05-30 19:00 GMT+02:00 Narayanan Arunachalam <
> narayanan.arunacha...@gmail.com>:
>
>> Thanks Sihua. If it's reset to Long.MIN_VALUE I can't explain why
>> outOfOrderEvents are reported. Because the event time on the data will
>> always be greater than Long.MIN_VALUE.
>>
>> Following are the steps to reproduce this scenario.
>> - A source to produce events with timestamps that is increasing for every
>> event produced
>> - Use TimeCharacteristic.EventTime
>> - Use BoundedOutOfOrdernessTimestampExtractor with maxOutOfOrderness set
>> to 60s.
>> - Enable checkpoints
>> - ProcessFunction impl to report a counter to some metrics backend when
>> the timestamp of the event is less than currentWatermark
>> - No out of order events will be reported initially. After few
>> checkpoints are created, cancel and restart the job from a previous
>> checkpoint.
>>
>> *Note*: The event stream really doesn't have out of order data. Job
>> restart from a checkpoint causes this artificial out of order events
>> because of the watermark value.
>>
>> Regards,
>> Nara
>>
>>
>>
>>
>> On Tue, May 29, 2018 at 7:54 PM, sihua zhou <summerle...@163.com> wrote:
>>
>>> Hi Nara,
>>>
>>> yes, the watermark in TimerService is not covered by the checkpoint,
>>> everytime the job is restarted from a previous checkpoint, it is reset to
>>> Long.MIN_VALUE. I can see it a bit tricky to cover it into the checkpoint,
>>> especially when we need to support rescaling(it seems not like a purely
>>> keyed or a operate state), maybe @Stefan or @Aljoscha could give you more
>>> useful information about why it wasn't covered by the checkpoint.
>>>
>>> Best, Sihua
>>>
>>>
>>>
>>> On 05/30/2018 05:44,Narayanan Arunachalam<narayanan.arunacha
>>> l...@gmail.com> <narayanan.arunacha...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> Is it possible the watermark in TimerService not getting reset when a
>>> job is restarted from a previous checkpoint? I would expect the watermark
>>> in a TimerService also to go back in time.
>>>
>>> I have the following ProcessFunction implementation.
>>>
>>>   override def processElement(
>>>     e: TraceEvent,
>>>     ctx: ProcessFunction[
>>>       TraceEvent,
>>>       Trace
>>>     ]#Context,
>>>     out: Collector[Trace]
>>>   ): Unit = {
>>>
>>>     if (e.getAnnotationTime() < ctx.timerService().currentWatermark()) {
>>>       registry.counter("tracing.outOfOrderEvents").increment()
>>>     } else {
>>>     ....
>>>     }
>>>
>>> I am noticing the outOfOrderEvents are reported until new events are
>>> read in to the stream since the last restart.
>>>
>>> Regards,
>>> Nara
>>>
>>>
>>
>

Reply via email to