Flink was NOT at fault. Turns out our Kafka producer had OS level clock
sync problems :(

Because of that, our Kafka occasionally had some messages in between with
an incorrect timestamp. In practice they were about 7 days older than they
should.

I'm really sorry for wasting your time on this. But thank you once more for
taking the time to answer.

For any similar case, I would first advise user to extra carefully compare
the actual timestamps of their input data. For me it was helpful to make
this change in my Flink job: for late data output, include both processing
time (DateTime.now()) along with the event time (original timestamp).

On Mon, May 14, 2018 at 12:42 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Thanks for correcting me Piotr. I didn't look close enough at the code.
> With the presently implemented logic, a record should not be emitted to a
> side output if its window wasn't closed yet.
>
> 2018-05-11 14:13 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>:
>
>> Generally speaking best practise is always to simplify your program as
>> much as possible to narrow down the scope of the search. Replace data
>> source with statically generated events, remove unnecessary components Etc.
>> Either such process help you figure out what’s wrong on your own and if
>> not, if you share us such minimal program that reproduces the issue, it
>> will allow  us to debug it.
>>
>> Piotrek
>>
>>
>> On 11 May 2018, at 13:54, Juho Autio <juho.au...@rovio.com> wrote:
>>
>> Thanks for that code snippet, I should try it out to simulate my DAG.. If
>> any suggestions how to debug futher what's causing late data on a
>> production stream job, please let me know.
>>
>> On Fri, May 11, 2018 at 2:18 PM, Piotr Nowojski <pi...@data-artisans.com>
>> wrote:
>>
>>> Hey,
>>>
>>> Actually I think Fabian initial message was incorrect. As far as I can
>>> see in the code of WindowOperator (last lines of org.apache.flink.streaming.
>>> runtime.operators.windowing.WindowOperator#processElement ), the
>>> element is sent to late side output if it is late AND it wasn’t assigned to
>>> any of the existing windows (because they were late as well). In other
>>> words, it should work as you Juho are wishing: element should be marked as
>>> late once they are overdue/late for the window after one full day.
>>>
>>> I have tested it and it works as expected. Following program:
>>>
>>> https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a
>>>
>>> Prints only ONE number to the standard err:
>>>
>>> > 1394
>>>
>>> And there is nothing on the side output.
>>>
>>> Piotrek
>>>
>>> On 11 May 2018, at 12:32, Juho Autio <juho.au...@rovio.com> wrote:
>>>
>>> Thanks. What I still don't get is why my message got filtered in the
>>> first place. Even if the allowed lateness filtering would be done "on the
>>> window", data should not be dropped as late if it's not in fact late by
>>> more than the allowedLateness setting.
>>>
>>> Assuming that these conditions hold:
>>> - messages (and thus the extracted timestamps) were not out of order by
>>> more than 5 secods (as far as I didn't make any mistake in my
>>> partition-level analysis)
>>> - allowedLateness=1 minute
>>> - watermarks are assigned on kafka consumer meaning that they are
>>> synchronized across all partitions
>>>
>>> I don't see how the watermark could have ever been more than 5 seconds
>>> further when the message arrives on the isElementLate filter. Do you have
>>> any idea on this? Is there some existing test that simulates out of order
>>> input to flink's kafka consumer? I could try to build a test case based on
>>> that to possibly reproduce my problem. I'm not sure how to gather enough
>>> debug information on the production stream so that it would clearly show
>>> the watermarks, how they progressed on each kafka partition & later in the
>>> chain in case isElementLate filters something.
>>>
>>> On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Hi Juho,
>>>>
>>>> Thanks for bringing up this topic! I share your intuition.
>>>> IMO, records should only be filtered out and send to a side output if
>>>> any of the windows they would be assigned to is closed already.
>>>>
>>>> I had a look into the code and found that records are filtered out as
>>>> late based on the following condition:
>>>>
>>>> protected boolean isElementLate(StreamRecord<IN> element){
>>>>    return (windowAssigner.isEventTime()) &&
>>>>       (element.getTimestamp() + allowedLateness <=
>>>> internalTimerService.currentWatermark());
>>>> }
>>>>
>>>>
>>>> This code shows that your analysis is correct.
>>>> Records are filtered out based on their timestamp and the current
>>>> watermark, even though they arrive before the window is closed.
>>>>
>>>> OTOH, filtering out records based on the window they would end up in
>>>> can also be tricky if records are assigned to multiple windows (e.g.,
>>>> sliding windows).
>>>> In this case, a side-outputted records could still be in some windows
>>>> and not in others.
>>>>
>>>> @Aljoscha (CC) Might have an explanation for the current behavior.
>>>>
>>>> Thanks,
>>>> Fabian
>>>>
>>>>
>>>> 2018-05-11 10:55 GMT+02:00 Juho Autio <juho.au...@rovio.com>:
>>>>
>>>>> I don't understand why I'm getting some data discarded as late on my
>>>>> Flink stream job a long time before the window even closes.
>>>>>
>>>>> I can not be 100% sure, but to me it seems like the kafka consumer is
>>>>> basically causing the data to be dropped as "late", not the window. I
>>>>> didn't expect this to ever happen?
>>>>>
>>>>> I have a Flink stream job that gathers distinct values using a 24-hour
>>>>> window. It reads the data from Kafka, using a 
>>>>> BoundedOutOfOrdernessTimestampExtractor
>>>>> on the kafka consumer to synchronize watermarks accross all kafka
>>>>> partitions. The maxOutOfOrderness of the extractor is set to 10 seconds.
>>>>>
>>>>> I have also enabled allowedLateness with 1 minute lateness on the
>>>>> 24-hour window:
>>>>>
>>>>> .timeWindow(Time.days(1))
>>>>> .allowedLateness(Time.minutes(1))
>>>>> .sideOutputLateData(lateDataTag)
>>>>> .reduce(new DistinctFunction())
>>>>>
>>>>> I have used accumulators to see that there is some late data. I have
>>>>> had multiple occurrences of those.
>>>>>
>>>>> Now focusing on a particular case that I was investigating more
>>>>> closely. Around ~12:15 o-clock my late data accumulator started showing
>>>>> that 1 message had been late. That's in the middle of the time window – so
>>>>> why would this happen? I would expect late data to be discarded only
>>>>> sometime after 00:01 if some data is arriving late for the window that 
>>>>> just
>>>>> closed at 00:00, and doesn't get emitted as part of 1 minute
>>>>> allowedLateness.
>>>>>
>>>>> To analyze the timestamps I read all messages in sequence separately
>>>>> from each kafka partition and calculated the difference in timestamps
>>>>> between consecutive messages. I had had exactly one message categorized as
>>>>> late by Flink in this case, and at the time i was using maxOutOfOrderness
>>>>> = 5 seconds. I found exactly one message in one kafka partition where the
>>>>> timestamp difference between messages was 5 seconds (they were out of 
>>>>> order
>>>>> by 5 s), which makes me wonder, did Flink drop the event as late because 
>>>>> it
>>>>> violated maxOutOfOrderness? Have I misunderstood the concept of late
>>>>> data somehow? I only expected late data to happen on window operations. I
>>>>> would expect kafka consumer to pass "late" messages onward even though
>>>>> watermark doesn't change.
>>>>>
>>>>> Thank you very much if you can find the time to look at this!
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>>
>

Reply via email to