Flink was NOT at fault. Turns out our Kafka producer had OS level clock sync problems :(
Because of that, our Kafka occasionally had some messages in between with an incorrect timestamp. In practice they were about 7 days older than they should. I'm really sorry for wasting your time on this. But thank you once more for taking the time to answer. For any similar case, I would first advise user to extra carefully compare the actual timestamps of their input data. For me it was helpful to make this change in my Flink job: for late data output, include both processing time (DateTime.now()) along with the event time (original timestamp). On Mon, May 14, 2018 at 12:42 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Thanks for correcting me Piotr. I didn't look close enough at the code. > With the presently implemented logic, a record should not be emitted to a > side output if its window wasn't closed yet. > > 2018-05-11 14:13 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>: > >> Generally speaking best practise is always to simplify your program as >> much as possible to narrow down the scope of the search. Replace data >> source with statically generated events, remove unnecessary components Etc. >> Either such process help you figure out what’s wrong on your own and if >> not, if you share us such minimal program that reproduces the issue, it >> will allow us to debug it. >> >> Piotrek >> >> >> On 11 May 2018, at 13:54, Juho Autio <juho.au...@rovio.com> wrote: >> >> Thanks for that code snippet, I should try it out to simulate my DAG.. If >> any suggestions how to debug futher what's causing late data on a >> production stream job, please let me know. >> >> On Fri, May 11, 2018 at 2:18 PM, Piotr Nowojski <pi...@data-artisans.com> >> wrote: >> >>> Hey, >>> >>> Actually I think Fabian initial message was incorrect. As far as I can >>> see in the code of WindowOperator (last lines of org.apache.flink.streaming. >>> runtime.operators.windowing.WindowOperator#processElement ), the >>> element is sent to late side output if it is late AND it wasn’t assigned to >>> any of the existing windows (because they were late as well). In other >>> words, it should work as you Juho are wishing: element should be marked as >>> late once they are overdue/late for the window after one full day. >>> >>> I have tested it and it works as expected. Following program: >>> >>> https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a >>> >>> Prints only ONE number to the standard err: >>> >>> > 1394 >>> >>> And there is nothing on the side output. >>> >>> Piotrek >>> >>> On 11 May 2018, at 12:32, Juho Autio <juho.au...@rovio.com> wrote: >>> >>> Thanks. What I still don't get is why my message got filtered in the >>> first place. Even if the allowed lateness filtering would be done "on the >>> window", data should not be dropped as late if it's not in fact late by >>> more than the allowedLateness setting. >>> >>> Assuming that these conditions hold: >>> - messages (and thus the extracted timestamps) were not out of order by >>> more than 5 secods (as far as I didn't make any mistake in my >>> partition-level analysis) >>> - allowedLateness=1 minute >>> - watermarks are assigned on kafka consumer meaning that they are >>> synchronized across all partitions >>> >>> I don't see how the watermark could have ever been more than 5 seconds >>> further when the message arrives on the isElementLate filter. Do you have >>> any idea on this? Is there some existing test that simulates out of order >>> input to flink's kafka consumer? I could try to build a test case based on >>> that to possibly reproduce my problem. I'm not sure how to gather enough >>> debug information on the production stream so that it would clearly show >>> the watermarks, how they progressed on each kafka partition & later in the >>> chain in case isElementLate filters something. >>> >>> On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi Juho, >>>> >>>> Thanks for bringing up this topic! I share your intuition. >>>> IMO, records should only be filtered out and send to a side output if >>>> any of the windows they would be assigned to is closed already. >>>> >>>> I had a look into the code and found that records are filtered out as >>>> late based on the following condition: >>>> >>>> protected boolean isElementLate(StreamRecord<IN> element){ >>>> return (windowAssigner.isEventTime()) && >>>> (element.getTimestamp() + allowedLateness <= >>>> internalTimerService.currentWatermark()); >>>> } >>>> >>>> >>>> This code shows that your analysis is correct. >>>> Records are filtered out based on their timestamp and the current >>>> watermark, even though they arrive before the window is closed. >>>> >>>> OTOH, filtering out records based on the window they would end up in >>>> can also be tricky if records are assigned to multiple windows (e.g., >>>> sliding windows). >>>> In this case, a side-outputted records could still be in some windows >>>> and not in others. >>>> >>>> @Aljoscha (CC) Might have an explanation for the current behavior. >>>> >>>> Thanks, >>>> Fabian >>>> >>>> >>>> 2018-05-11 10:55 GMT+02:00 Juho Autio <juho.au...@rovio.com>: >>>> >>>>> I don't understand why I'm getting some data discarded as late on my >>>>> Flink stream job a long time before the window even closes. >>>>> >>>>> I can not be 100% sure, but to me it seems like the kafka consumer is >>>>> basically causing the data to be dropped as "late", not the window. I >>>>> didn't expect this to ever happen? >>>>> >>>>> I have a Flink stream job that gathers distinct values using a 24-hour >>>>> window. It reads the data from Kafka, using a >>>>> BoundedOutOfOrdernessTimestampExtractor >>>>> on the kafka consumer to synchronize watermarks accross all kafka >>>>> partitions. The maxOutOfOrderness of the extractor is set to 10 seconds. >>>>> >>>>> I have also enabled allowedLateness with 1 minute lateness on the >>>>> 24-hour window: >>>>> >>>>> .timeWindow(Time.days(1)) >>>>> .allowedLateness(Time.minutes(1)) >>>>> .sideOutputLateData(lateDataTag) >>>>> .reduce(new DistinctFunction()) >>>>> >>>>> I have used accumulators to see that there is some late data. I have >>>>> had multiple occurrences of those. >>>>> >>>>> Now focusing on a particular case that I was investigating more >>>>> closely. Around ~12:15 o-clock my late data accumulator started showing >>>>> that 1 message had been late. That's in the middle of the time window – so >>>>> why would this happen? I would expect late data to be discarded only >>>>> sometime after 00:01 if some data is arriving late for the window that >>>>> just >>>>> closed at 00:00, and doesn't get emitted as part of 1 minute >>>>> allowedLateness. >>>>> >>>>> To analyze the timestamps I read all messages in sequence separately >>>>> from each kafka partition and calculated the difference in timestamps >>>>> between consecutive messages. I had had exactly one message categorized as >>>>> late by Flink in this case, and at the time i was using maxOutOfOrderness >>>>> = 5 seconds. I found exactly one message in one kafka partition where the >>>>> timestamp difference between messages was 5 seconds (they were out of >>>>> order >>>>> by 5 s), which makes me wonder, did Flink drop the event as late because >>>>> it >>>>> violated maxOutOfOrderness? Have I misunderstood the concept of late >>>>> data somehow? I only expected late data to happen on window operations. I >>>>> would expect kafka consumer to pass "late" messages onward even though >>>>> watermark doesn't change. >>>>> >>>>> Thank you very much if you can find the time to look at this! >>>>> >>>> >>>> >>> >>> >> >> >> >