Stefan (or anyone!), please, could I have some feedback on the findings that I reported on Dec 21, 2018? This is still a major blocker..
On Thu, Jan 31, 2019 at 11:46 AM Juho Autio <juho.au...@rovio.com> wrote: > Hello, is there anyone that could help with this? > > On Fri, Jan 11, 2019 at 8:14 AM Juho Autio <juho.au...@rovio.com> wrote: > >> Stefan, would you have time to comment? >> >> On Wednesday, January 2, 2019, Juho Autio <juho.au...@rovio.com> wrote: >> >>> Bump – does anyone know if Stefan will be available to comment the >>> latest findings? Thanks. >>> >>> On Fri, Dec 21, 2018 at 2:33 PM Juho Autio <juho.au...@rovio.com> wrote: >>> >>>> Stefan, I managed to analyze savepoint with bravo. It seems that the >>>> data that's missing from output *is* found in savepoint. >>>> >>>> I simplified my test case to the following: >>>> >>>> - job 1 has bee running for ~10 days >>>> - savepoint X created & job 1 cancelled >>>> - job 2 started with restore from savepoint X >>>> >>>> Then I waited until the next day so that job 2 has triggered the 24 >>>> hour window. >>>> >>>> Then I analyzed the output & savepoint: >>>> >>>> - compare job 2 output with the output of a batch pyspark script => >>>> find 4223 missing rows >>>> - pick one of the missing rows (say, id Z) >>>> - read savepoint X with bravo, filter for id Z => Z was found in the >>>> savepoint! >>>> >>>> How can it be possible that the value is in state but doesn't end up in >>>> output after state has been restored & window is eventually triggered? >>>> >>>> I also did similar analysis on the previous case where I savepointed & >>>> restored the job multiple times (5) within the same 24-hour window. A >>>> missing id that I drilled down to, was found in all of those savepoints, >>>> yet missing from the output that gets written at the end of the day. This >>>> is even more surprising: that the missing ID was written to the new >>>> savepoints also after restoring. Is the reducer state somehow decoupled >>>> from the window contents? >>>> >>>> Big thanks to bravo-developer Gyula for guiding me through to be able >>>> read the reducer state! https://github.com/king/bravo/pull/11 >>>> >>>> Gyula also had an idea for how to troubleshoot the missing data in a >>>> scalable way: I could add some "side effect kafka output" on individual >>>> operators. This should allow tracking more closely at which point the data >>>> gets lost. However, maybe this would have to be in some Flink's internal >>>> components, and I'm not sure which those would be. >>>> >>>> Cheers, >>>> Juho >>>> >>>> On Mon, Nov 19, 2018 at 11:52 AM Juho Autio <juho.au...@rovio.com> >>>> wrote: >>>> >>>>> >>>>> Hi Stefan, >>>>> >>>>> Bravo doesn't currently support reading a reducer state. I gave it a >>>>> try but couldn't get to a working implementation yet. If anyone can >>>>> provide >>>>> some insight on how to make this work, please share at github: >>>>> https://github.com/king/bravo/pull/11 >>>>> >>>>> Thanks. >>>>> >>>>> On Tue, Oct 23, 2018 at 3:32 PM Juho Autio <juho.au...@rovio.com> >>>>> wrote: >>>>> >>>>>> I was glad to find that bravo had now been updated to support >>>>>> installing bravo to a local maven repo. >>>>>> >>>>>> I was able to load a checkpoint created by my job, thanks to the >>>>>> example provided in bravo README, but I'm still missing the essential >>>>>> piece. >>>>>> >>>>>> My code was: >>>>>> >>>>>> OperatorStateReader reader = new OperatorStateReader(env2, >>>>>> savepoint, "DistinctFunction"); >>>>>> DontKnowWhatTypeThisIs reducingState = >>>>>> reader.readKeyedStates(what should I put here?); >>>>>> >>>>>> I don't know how to read the values collected from reduce() calls in >>>>>> the state. Is there a way to access the reducing state of the window with >>>>>> bravo? I'm a bit confused how this works, because when I check with >>>>>> debugger, flink internally uses a ReducingStateDescriptor >>>>>> with name=window-contents, but still reading operator state for >>>>>> "DistinctFunction" didn't at least throw an exception ("window-contents" >>>>>> threw – obviously there's no operator by that name). >>>>>> >>>>>> Cheers, >>>>>> Juho >>>>>> >>>>>> On Mon, Oct 15, 2018 at 2:25 PM Juho Autio <juho.au...@rovio.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Stefan, >>>>>>> >>>>>>> Sorry but it doesn't seem immediately clear to me what's a good way >>>>>>> to use https://github.com/king/bravo. >>>>>>> >>>>>>> How are people using it? Would you for example modify build.gradle >>>>>>> somehow to publish the bravo as a library locally/internally? Or add >>>>>>> code >>>>>>> directly in the bravo project (locally) and run it from there (using an >>>>>>> IDE, for example)? Also it doesn't seem like the bravo gradle project >>>>>>> supports building a flink job jar, but if it does, how do I do it? >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio <juho.au...@rovio.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Good then, I'll try to analyze the savepoints with Bravo. Thanks! >>>>>>>> >>>>>>>> > How would you assume that backpressure would influence your >>>>>>>> updates? Updates to each local state still happen event-by-event, in a >>>>>>>> single reader/writing thread. >>>>>>>> >>>>>>>> Sure, just an ignorant guess by me. I'm not familiar with most of >>>>>>>> Flink's internals. Any way high backpressure is not a seen on this job >>>>>>>> after it has caught up the lag, so at I thought it would be worth >>>>>>>> mentioning. >>>>>>>> >>>>>>>> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter < >>>>>>>> s.rich...@data-artisans.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.au...@rovio.com>: >>>>>>>>> >>>>>>>>> > you could take a look at Bravo [1] to query your savepoints and >>>>>>>>> to check if the state in the savepoint complete w.r.t your >>>>>>>>> expectations >>>>>>>>> >>>>>>>>> Thanks. I'm not 100% if this is the case, but to me it seemed like >>>>>>>>> the missed ids were being logged by the reducer soon after the job had >>>>>>>>> started (after restoring a savepoint). But on the other hand, after >>>>>>>>> that I >>>>>>>>> also made another savepoint & restored that, so what I could check >>>>>>>>> is: does >>>>>>>>> that next savepoint have the missed ids that were logged (a couple of >>>>>>>>> minutes before the savepoint was created, so there should've been >>>>>>>>> more than >>>>>>>>> enough time to add them to the state before the savepoint was >>>>>>>>> triggered) or >>>>>>>>> not. Any way, if I would be able to verify with Bravo that the ids are >>>>>>>>> missing from the savepoint (even though reduced logged that it saw >>>>>>>>> them), >>>>>>>>> would that help in figuring out where they are lost? Is there some >>>>>>>>> major >>>>>>>>> difference compared to just looking at the final output after window >>>>>>>>> has >>>>>>>>> been triggered? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> I think that makes a difference. For example, you can investigate >>>>>>>>> if there is a state loss or a problem with the windowing. In the >>>>>>>>> savepoint >>>>>>>>> you could see which keys exists and to which windows they are >>>>>>>>> assigned. >>>>>>>>> Also just to make sure there is no misunderstanding: only elements >>>>>>>>> that are >>>>>>>>> in the state at the start of a savepoint are expected to be part of >>>>>>>>> the >>>>>>>>> savepoint; all elements between start and completion of the savepoint >>>>>>>>> are >>>>>>>>> not expected to be part of the savepoint. >>>>>>>>> >>>>>>>>> >>>>>>>>> > I also doubt that the problem is about backpressure after >>>>>>>>> restore, because the job will only continue running after the state >>>>>>>>> restore >>>>>>>>> is already completed. >>>>>>>>> >>>>>>>>> Yes, I'm not suspecting that the state restoring would be the >>>>>>>>> problem either. My concern was about backpressure possibly messing >>>>>>>>> with the >>>>>>>>> updates of reducing state? I would tend to suspect that updating the >>>>>>>>> state >>>>>>>>> consistently is what fails, where heavy load / backpressure might be a >>>>>>>>> factor. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> How would you assume that backpressure would influence your >>>>>>>>> updates? Updates to each local state still happen event-by-event, in a >>>>>>>>> single reader/writing thread. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter < >>>>>>>>> s.rich...@data-artisans.com> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> you could take a look at Bravo [1] to query your savepoints and >>>>>>>>>> to check if the state in the savepoint complete w.r.t your >>>>>>>>>> expectations. I >>>>>>>>>> somewhat doubt that there is a general problem with the >>>>>>>>>> state/savepoints >>>>>>>>>> because many users are successfully running it on a large state and >>>>>>>>>> I am >>>>>>>>>> not aware of any data loss problems, but nothing is impossible. What >>>>>>>>>> the >>>>>>>>>> savepoint does is also straight forward: iterate a db snapshot and >>>>>>>>>> write >>>>>>>>>> all key/value pairs to disk, so all data that was in the db at the >>>>>>>>>> time of >>>>>>>>>> the savepoint, should show up. I also doubt that the problem is about >>>>>>>>>> backpressure after restore, because the job will only continue >>>>>>>>>> running >>>>>>>>>> after the state restore is already completed. Did you check if you >>>>>>>>>> are >>>>>>>>>> using exactly-one-semantics or at-least-once semantics? Also did you >>>>>>>>>> check >>>>>>>>>> that the kafka consumer start position is configured properly [2]? >>>>>>>>>> Are >>>>>>>>>> watermarks generated as expected after restore? >>>>>>>>>> >>>>>>>>>> One more unrelated high-level comment that I have: for a >>>>>>>>>> granularity of 24h windows, I wonder if it would not make sense to >>>>>>>>>> use a >>>>>>>>>> batch job instead? >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Stefan >>>>>>>>>> >>>>>>>>>> [1] https://github.com/king/bravo >>>>>>>>>> [2] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration >>>>>>>>>> >>>>>>>>>> Am 04.10.2018 um 14:53 schrieb Juho Autio <juho.au...@rovio.com>: >>>>>>>>>> >>>>>>>>>> Thanks for the suggestions! >>>>>>>>>> >>>>>>>>>> > In general, it would be tremendously helpful to have a minimal >>>>>>>>>> working example which allows to reproduce the problem. >>>>>>>>>> >>>>>>>>>> Definitely. The problem with reproducing has been that this only >>>>>>>>>> seems to happen in the bigger production data volumes. >>>>>>>>>> >>>>>>>>>> That's why I'm hoping to find a way to debug this with the >>>>>>>>>> production data. With that it seems to consistently cause some >>>>>>>>>> misses every >>>>>>>>>> time the job is killed/restored. >>>>>>>>>> >>>>>>>>>> > check if it happens for shorter windows, like 1h etc >>>>>>>>>> >>>>>>>>>> What would be the benefit of that compared to 24h window? >>>>>>>>>> >>>>>>>>>> > simplify the job to not use a reduce window but simply a time >>>>>>>>>> window which outputs the window events. Then counting the input and >>>>>>>>>> output >>>>>>>>>> events should allow you to verify the results. If you are not seeing >>>>>>>>>> missing events, then it could have something to do with the reducing >>>>>>>>>> state >>>>>>>>>> used in the reduce function. >>>>>>>>>> >>>>>>>>>> Hm, maybe, but not sure how useful that would be, because it >>>>>>>>>> wouldn't yet prove that it's related to reducing, because not having >>>>>>>>>> a >>>>>>>>>> reduce function could also mean smaller load on the job, which might >>>>>>>>>> alone >>>>>>>>>> be enough to make the problem not manifest. >>>>>>>>>> >>>>>>>>>> Is there a way to debug what goes into the reducing state >>>>>>>>>> (including what gets removed or overwritten and what restored), if >>>>>>>>>> that >>>>>>>>>> makes sense..? Maybe some suitable logging could be used to prove >>>>>>>>>> that the >>>>>>>>>> lost data is written to the reducing state (or at least asked to be >>>>>>>>>> written), but not found any more when the window closes and state is >>>>>>>>>> flushed? >>>>>>>>>> >>>>>>>>>> On configuration once more, we're using RocksDB state backend >>>>>>>>>> with asynchronous incremental checkpointing. The state is restored >>>>>>>>>> from >>>>>>>>>> savepoints though, we haven't been using those checkpoints in these >>>>>>>>>> tests >>>>>>>>>> (although they could be used in case of crashes – but we haven't had >>>>>>>>>> those >>>>>>>>>> now). >>>>>>>>>> >>>>>>>>>> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann < >>>>>>>>>> trohrm...@apache.org> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Juho, >>>>>>>>>>> >>>>>>>>>>> another idea to further narrow down the problem could be to >>>>>>>>>>> simplify the job to not use a reduce window but simply a time >>>>>>>>>>> window which >>>>>>>>>>> outputs the window events. Then counting the input and output >>>>>>>>>>> events should >>>>>>>>>>> allow you to verify the results. If you are not seeing missing >>>>>>>>>>> events, then >>>>>>>>>>> it could have something to do with the reducing state used in the >>>>>>>>>>> reduce >>>>>>>>>>> function. >>>>>>>>>>> >>>>>>>>>>> In general, it would be tremendously helpful to have a minimal >>>>>>>>>>> working example which allows to reproduce the problem. >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Till >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin < >>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Juho, >>>>>>>>>>>> >>>>>>>>>>>> can you try to reduce the job to minimal reproducible example >>>>>>>>>>>> and share the job and input? >>>>>>>>>>>> >>>>>>>>>>>> For example: >>>>>>>>>>>> - some simple records as input, e.g. tuples of primitive types >>>>>>>>>>>> saved as cvs >>>>>>>>>>>> - minimal deduplication job which processes them and misses >>>>>>>>>>>> records >>>>>>>>>>>> - check if it happens for shorter windows, like 1h etc >>>>>>>>>>>> - setup which you use for the job, ideally locally reproducible >>>>>>>>>>>> or cloud >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Andrey >>>>>>>>>>>> >>>>>>>>>>>> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Sorry to insist, but we seem to be blocked for any serious >>>>>>>>>>>> usage of state in Flink if we can't rely on it to not miss data in >>>>>>>>>>>> case of >>>>>>>>>>>> restore. >>>>>>>>>>>> >>>>>>>>>>>> Would anyone have suggestions for how to troubleshoot this? So >>>>>>>>>>>> far I have verified with DEBUG logs that our reduce function gets >>>>>>>>>>>> to >>>>>>>>>>>> process also the data that is missing from window output. >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio < >>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Andrey, >>>>>>>>>>>>> >>>>>>>>>>>>> To rule out for good any questions about sink behaviour, the >>>>>>>>>>>>> job was killed and started with an additional Kafka sink. >>>>>>>>>>>>> >>>>>>>>>>>>> The same number of ids were missed in both outputs: KafkaSink >>>>>>>>>>>>> & BucketingSink. >>>>>>>>>>>>> >>>>>>>>>>>>> I wonder what would be the next steps in debugging? >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio < >>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, Andrey. >>>>>>>>>>>>>> >>>>>>>>>>>>>> > so it means that the savepoint does not loose at least some >>>>>>>>>>>>>> dropped records. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm not sure what you mean by that? I mean, it was known from >>>>>>>>>>>>>> the beginning, that not everything is lost before/after >>>>>>>>>>>>>> restoring a >>>>>>>>>>>>>> savepoint, just some records around the time of restoration. >>>>>>>>>>>>>> It's not 100% >>>>>>>>>>>>>> clear whether records are lost before making a savepoint or >>>>>>>>>>>>>> after restoring >>>>>>>>>>>>>> it. Although, based on the new DEBUG logs it seems more like >>>>>>>>>>>>>> losing some >>>>>>>>>>>>>> records that are seen ~soon after restoring. It seems like Flink >>>>>>>>>>>>>> would be >>>>>>>>>>>>>> somehow confused either about the restored state vs. new inserts >>>>>>>>>>>>>> to state. >>>>>>>>>>>>>> This could also be somehow linked to the high back pressure on >>>>>>>>>>>>>> the kafka >>>>>>>>>>>>>> source while the stream is catching up. >>>>>>>>>>>>>> >>>>>>>>>>>>>> > If it is feasible for your setup, I suggest to insert one >>>>>>>>>>>>>> more map function after reduce and before sink. >>>>>>>>>>>>>> > etc. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Isn't that the same thing that we discussed before? Nothing >>>>>>>>>>>>>> is sent to BucketingSink before the window closes, so I don't >>>>>>>>>>>>>> see how it >>>>>>>>>>>>>> would make any difference if we replace the BucketingSink with a >>>>>>>>>>>>>> map >>>>>>>>>>>>>> function or another sink type. We don't create or restore >>>>>>>>>>>>>> savepoints during >>>>>>>>>>>>>> the time when BucketingSink gets input or has open buckets – >>>>>>>>>>>>>> that happens >>>>>>>>>>>>>> at a much later time of day. I would focus on figuring out why >>>>>>>>>>>>>> the records >>>>>>>>>>>>>> are lost while the window is open. But I don't know how to do >>>>>>>>>>>>>> that. Would >>>>>>>>>>>>>> you have any additional suggestions? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin < >>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> so it means that the savepoint does not loose at least some >>>>>>>>>>>>>>> dropped records. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> If it is feasible for your setup, I suggest to insert one >>>>>>>>>>>>>>> more map function after reduce and before sink. >>>>>>>>>>>>>>> The map function should be called right after window is >>>>>>>>>>>>>>> triggered but before flushing to s3. >>>>>>>>>>>>>>> The result of reduce (deduped record) could be logged there. >>>>>>>>>>>>>>> This should allow to check whether the processed distinct >>>>>>>>>>>>>>> records were buffered in the state after the restoration from >>>>>>>>>>>>>>> the savepoint >>>>>>>>>>>>>>> or not. If they were buffered we should see that there was an >>>>>>>>>>>>>>> attempt to >>>>>>>>>>>>>>> write them to the sink from the state. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Another suggestion is to try to write records to some other >>>>>>>>>>>>>>> sink or to both. >>>>>>>>>>>>>>> E.g. if you can access file system of workers, maybe just >>>>>>>>>>>>>>> into local files and check whether the records are also dropped >>>>>>>>>>>>>>> there. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Andrey! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I was finally able to gather the DEBUG logs that you >>>>>>>>>>>>>>> suggested. In short, the reducer logged that it processed at >>>>>>>>>>>>>>> least some of >>>>>>>>>>>>>>> the ids that were missing from the output. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> "At least some", because I didn't have the job running with >>>>>>>>>>>>>>> DEBUG logs for the full 24-hour window period. So I was only >>>>>>>>>>>>>>> able to look >>>>>>>>>>>>>>> up if I can find *some* of the missing ids in the DEBUG >>>>>>>>>>>>>>> logs. Which I did indeed. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I changed the DistinctFunction.java to do this: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>> public Map<String, String> reduce(Map<String, String> >>>>>>>>>>>>>>> value1, Map<String, String> value2) { >>>>>>>>>>>>>>> LOG.debug("DistinctFunction.reduce returns: {}={}", >>>>>>>>>>>>>>> value1.get("field"), value1.get("id")); >>>>>>>>>>>>>>> return value1; >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Then: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> vi flink-1.6.0/conf/log4j.properties >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Then I ran the following kind of test: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - Cancelled the on-going job with savepoint created at ~Sep >>>>>>>>>>>>>>> 18 08:35 UTC 2018 >>>>>>>>>>>>>>> - Started a new cluster & job with DEBUG enabled at ~09:13, >>>>>>>>>>>>>>> restored from that previous cluster's savepoint >>>>>>>>>>>>>>> - Ran until caught up offsets >>>>>>>>>>>>>>> - Cancelled the job with a new savepoint >>>>>>>>>>>>>>> - Started a new job _without_ DEBUG, which restored the new >>>>>>>>>>>>>>> savepoint, let it keep running so that it will eventually write >>>>>>>>>>>>>>> the output >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Then on the next day, after results had been flushed when >>>>>>>>>>>>>>> the 24-hour window closed, I compared the results again with a >>>>>>>>>>>>>>> batch >>>>>>>>>>>>>>> version's output. And found some missing ids as usual. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I drilled down to one specific missing id (I'm replacing the >>>>>>>>>>>>>>> actual value with AN12345 below), which was not found in the >>>>>>>>>>>>>>> stream output, >>>>>>>>>>>>>>> but was found in batch output & flink DEBUG logs. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Related to that id, I gathered the following information: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2018-09-18~09:13:21,000 job started & savepoint is restored >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2018-09-18 09:14:29,085 missing id is processed for the >>>>>>>>>>>>>>> first time, proved by this log line: >>>>>>>>>>>>>>> 2018-09-18 09:14:29,085 DEBUG >>>>>>>>>>>>>>> com.rovio.ds.flink.uniqueid.DistinctFunction - >>>>>>>>>>>>>>> DistinctFunction.reduce returns: s.aid1=AN12345 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint >>>>>>>>>>>>>>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ( >>>>>>>>>>>>>>> more occurrences of checkpoints (~1 min checkpointing time + >>>>>>>>>>>>>>> ~1 min delay before next) >>>>>>>>>>>>>>> / >>>>>>>>>>>>>>> more occurrences of DistinctFunction.reduce >>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2018-09-18 09:23:45,053 missing id is processed for the last >>>>>>>>>>>>>>> time >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> To be noted, there was high backpressure after restoring >>>>>>>>>>>>>>> from savepoint until the stream caught up with the kafka >>>>>>>>>>>>>>> offsets. Although, >>>>>>>>>>>>>>> our job uses assign timestamps & watermarks on the flink kafka >>>>>>>>>>>>>>> consumer >>>>>>>>>>>>>>> itself, so event time of all partitions is synchronized. As >>>>>>>>>>>>>>> expected, we >>>>>>>>>>>>>>> don't get any late data in the late data side output. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> From this we can see that the missing ids are processed by >>>>>>>>>>>>>>> the reducer, but they must get lost somewhere before the >>>>>>>>>>>>>>> 24-hour window is >>>>>>>>>>>>>>> triggered. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think it's worth mentioning once more that the stream >>>>>>>>>>>>>>> doesn't miss any ids if we let it's running without >>>>>>>>>>>>>>> interruptions / state >>>>>>>>>>>>>>> restoring. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> What's next? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin < >>>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> > only when the 24-hour window triggers, BucketingSink gets >>>>>>>>>>>>>>>> a burst of input >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This is of course totally true, my understanding is the >>>>>>>>>>>>>>>> same. We cannot exclude problem there for sure, just >>>>>>>>>>>>>>>> savepoints are used a >>>>>>>>>>>>>>>> lot w/o problem reports and BucketingSink is known to be >>>>>>>>>>>>>>>> problematic with >>>>>>>>>>>>>>>> s3. That is why, I asked you: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> > You also wrote that the timestamps of lost event are >>>>>>>>>>>>>>>> 'probably' around the time of the savepoint, if it is not yet >>>>>>>>>>>>>>>> for sure I >>>>>>>>>>>>>>>> would also check it. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Although, bucketing sink might loose any data at the end of >>>>>>>>>>>>>>>> the day (also from the middle). The fact, that it is always >>>>>>>>>>>>>>>> around the time >>>>>>>>>>>>>>>> of taking a savepoint and not random, is surely suspicious and >>>>>>>>>>>>>>>> possible >>>>>>>>>>>>>>>> savepoint failures need to be investigated. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regarding the s3 problem, s3 doc says: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> > The caveat is that if you make a HEAD or GET request to >>>>>>>>>>>>>>>> the key name (to find if the object exists) before creating >>>>>>>>>>>>>>>> the object, >>>>>>>>>>>>>>>> Amazon S3 provides 'eventual consistency' for read-after-write. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The algorithm you suggest is how it is roughly implemented >>>>>>>>>>>>>>>> now (BucketingSink.openNewPartFile). My understanding is that >>>>>>>>>>>>>>>> 'eventual consistency’ means that even if you just created >>>>>>>>>>>>>>>> file (its name >>>>>>>>>>>>>>>> is key) it can be that you do not get it in the list or exists >>>>>>>>>>>>>>>> (HEAD) >>>>>>>>>>>>>>>> returns false and you risk to rewrite the previous part. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The BucketingSink was designed for a standard file system. >>>>>>>>>>>>>>>> s3 is used over a file system wrapper atm but does not always >>>>>>>>>>>>>>>> provide >>>>>>>>>>>>>>>> normal file system guarantees. See also last example in [1]. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Andrey, thank you very much for the debugging suggestions, >>>>>>>>>>>>>>>> I'll try them. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> In the meanwhile two more questions, please: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> > Just to keep in mind this problem with s3 and exclude it >>>>>>>>>>>>>>>> for sure. I would also check whether the size of missing >>>>>>>>>>>>>>>> events is around >>>>>>>>>>>>>>>> the batch size of BucketingSink or not. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Fair enough, but I also want to focus on debugging the most >>>>>>>>>>>>>>>> probable subject first. So what do you think about this – true >>>>>>>>>>>>>>>> or false: >>>>>>>>>>>>>>>> only when the 24-hour window triggers, BucketinSink gets a >>>>>>>>>>>>>>>> burst of input. >>>>>>>>>>>>>>>> Around the state restoring point (middle of the day) it >>>>>>>>>>>>>>>> doesn't get any >>>>>>>>>>>>>>>> input, so it can't lose anything either. Isn't this true, or >>>>>>>>>>>>>>>> have I totally >>>>>>>>>>>>>>>> missed how Flink works in triggering window results? I would >>>>>>>>>>>>>>>> not expect >>>>>>>>>>>>>>>> there to be any optimization that speculatively triggers early >>>>>>>>>>>>>>>> results of a >>>>>>>>>>>>>>>> regular time window to the downstream operators. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> > The old BucketingSink has in general problem with s3. >>>>>>>>>>>>>>>> Internally BucketingSink queries s3 as a file system to list >>>>>>>>>>>>>>>> already >>>>>>>>>>>>>>>> written file parts (batches) and determine index of the next >>>>>>>>>>>>>>>> part to start. >>>>>>>>>>>>>>>> Due to eventual consistency of checking file existence in s3 >>>>>>>>>>>>>>>> [1], the >>>>>>>>>>>>>>>> BucketingSink can rewrite the previously written part and >>>>>>>>>>>>>>>> basically loose >>>>>>>>>>>>>>>> it. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I was wondering, what does S3's "read-after-write >>>>>>>>>>>>>>>> consistency" (mentioned on the page you linked) actually mean. >>>>>>>>>>>>>>>> It seems >>>>>>>>>>>>>>>> that this might be possible: >>>>>>>>>>>>>>>> - LIST keys, find current max index >>>>>>>>>>>>>>>> - choose next index = max + 1 >>>>>>>>>>>>>>>> - HEAD next index: if it exists, keep adding + 1 until key >>>>>>>>>>>>>>>> doesn't exist on S3 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> But definitely sounds easier if a sink keeps track of files >>>>>>>>>>>>>>>> in a way that's guaranteed to be consistent. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>> Juho >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin < >>>>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> true, StreamingFileSink does not support s3 in 1.6.0, it >>>>>>>>>>>>>>>>> is planned for the next 1.7 release, sorry for confusion. >>>>>>>>>>>>>>>>> The old BucketingSink has in general problem with s3. >>>>>>>>>>>>>>>>> Internally BucketingSink queries s3 as a file system >>>>>>>>>>>>>>>>> to list already written file parts (batches) and determine >>>>>>>>>>>>>>>>> index of the next part to start. Due to eventual consistency >>>>>>>>>>>>>>>>> of checking >>>>>>>>>>>>>>>>> file existence in s3 [1], the BucketingSink can rewrite the >>>>>>>>>>>>>>>>> previously >>>>>>>>>>>>>>>>> written part and basically loose it. It should be fixed for >>>>>>>>>>>>>>>>> StreamingFileSink in 1.7 where Flink keeps its own track of >>>>>>>>>>>>>>>>> written parts >>>>>>>>>>>>>>>>> and does not rely on s3 as a file system. >>>>>>>>>>>>>>>>> I also include Kostas, he might add more details. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Just to keep in mind this problem with s3 and exclude it >>>>>>>>>>>>>>>>> for sure I would also check whether the size of missing >>>>>>>>>>>>>>>>> events is around >>>>>>>>>>>>>>>>> the batch size of BucketingSink or not. You also wrote that >>>>>>>>>>>>>>>>> the timestamps >>>>>>>>>>>>>>>>> of lost event are 'probably' around the time of the >>>>>>>>>>>>>>>>> savepoint, if it is not >>>>>>>>>>>>>>>>> yet for sure I would also check it. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Have you already checked the log files of job manager and >>>>>>>>>>>>>>>>> task managers for the job running before and after the >>>>>>>>>>>>>>>>> restore from the >>>>>>>>>>>>>>>>> check point? Is everything successful there, no errors, >>>>>>>>>>>>>>>>> relevant warnings >>>>>>>>>>>>>>>>> or exceptions? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> As the next step, I would suggest to log all encountered >>>>>>>>>>>>>>>>> events in DistinctFunction.reduce if possible for production >>>>>>>>>>>>>>>>> data and check >>>>>>>>>>>>>>>>> whether the missed events are eventually processed before or >>>>>>>>>>>>>>>>> after the >>>>>>>>>>>>>>>>> savepoint. The following log message indicates a border >>>>>>>>>>>>>>>>> between the events >>>>>>>>>>>>>>>>> that should be included into the savepoint (logged before) or >>>>>>>>>>>>>>>>> not: >>>>>>>>>>>>>>>>> “{} ({}, synchronous part) in thread {} took {} ms” >>>>>>>>>>>>>>>>> (template) >>>>>>>>>>>>>>>>> Also check if the savepoint has been overall completed: >>>>>>>>>>>>>>>>> "{} ({}, asynchronous part) in thread {} took {} ms." >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Using StreamingFileSink is not a convenient option for >>>>>>>>>>>>>>>>> production use for us as it doesn't support s3*. I could use >>>>>>>>>>>>>>>>> StreamingFileSink just to verify, but I don't see much point >>>>>>>>>>>>>>>>> in doing so. >>>>>>>>>>>>>>>>> Please consider my previous comment: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > I realized that BucketingSink must not play any role in >>>>>>>>>>>>>>>>> this problem. This is because only when the 24-hour window >>>>>>>>>>>>>>>>> triggers, >>>>>>>>>>>>>>>>> BucketingSink gets a burst of input. Around the state >>>>>>>>>>>>>>>>> restoring point >>>>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't >>>>>>>>>>>>>>>>> lose anything >>>>>>>>>>>>>>>>> either (right?). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I could also use a kafka sink instead, but I can't imagine >>>>>>>>>>>>>>>>> how there could be any difference. It's very real that the >>>>>>>>>>>>>>>>> sink doesn't get >>>>>>>>>>>>>>>>> any input for a long time until the 24-hour window closes, >>>>>>>>>>>>>>>>> and then it >>>>>>>>>>>>>>>>> quickly writes out everything because it's not that much data >>>>>>>>>>>>>>>>> eventually >>>>>>>>>>>>>>>>> for the distinct values. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Any ideas for debugging what's happening around the >>>>>>>>>>>>>>>>> savepoint & restoration time? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> *) I actually implemented StreamingFileSink as an >>>>>>>>>>>>>>>>> alternative sink. This was before I came to realize that most >>>>>>>>>>>>>>>>> likely the >>>>>>>>>>>>>>>>> sink component has nothing to do with the data loss problem. >>>>>>>>>>>>>>>>> I tried it >>>>>>>>>>>>>>>>> with s3n:// path just to see an exception being thrown. In >>>>>>>>>>>>>>>>> the source code >>>>>>>>>>>>>>>>> I indeed then found an explicit check for the target path >>>>>>>>>>>>>>>>> scheme to be >>>>>>>>>>>>>>>>> "hdfs://". >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin < >>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Ok, I think before further debugging the window reduced >>>>>>>>>>>>>>>>>> state, >>>>>>>>>>>>>>>>>> could you try the new ‘StreamingFileSink’ [1] introduced >>>>>>>>>>>>>>>>>> in Flink 1.6.0 instead of the previous 'BucketingSink’? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 24 Aug 2018, at 18:03, Juho Autio < >>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Yes, sorry for my confusing comment. I just meant that it >>>>>>>>>>>>>>>>>> seems like there's a bug somewhere now that the output is >>>>>>>>>>>>>>>>>> missing some data. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> > I would wait and check the actual output in s3 because >>>>>>>>>>>>>>>>>> it is the main result of the job >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Yes, and that's what I have already done. There seems to >>>>>>>>>>>>>>>>>> be always some data loss with the production data volumes, >>>>>>>>>>>>>>>>>> if the job has >>>>>>>>>>>>>>>>>> been restarted on that day. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Would you have any suggestions for how to debug this >>>>>>>>>>>>>>>>>> further? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Many thanks for stepping in. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin < >>>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> So it is a per key deduplication job. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Yes, I would wait and check the actual output in s3 >>>>>>>>>>>>>>>>>>> because it is the main result of the job and >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> > The late data around the time of taking savepoint >>>>>>>>>>>>>>>>>>> might be not included into the savepoint but it should be >>>>>>>>>>>>>>>>>>> behind the >>>>>>>>>>>>>>>>>>> snapshotted offset in Kafka. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> is not a bug, it is a possible behaviour. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The savepoint is a snapshot of the data in transient >>>>>>>>>>>>>>>>>>> which is already consumed from Kafka. >>>>>>>>>>>>>>>>>>> Basically the full contents of the window result is >>>>>>>>>>>>>>>>>>> split between the savepoint and what can come after the >>>>>>>>>>>>>>>>>>> savepoint'ed offset >>>>>>>>>>>>>>>>>>> in Kafka but before the window result is written into s3. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Allowed lateness should not affect it, I am just saying >>>>>>>>>>>>>>>>>>> that the final result in s3 should include all records >>>>>>>>>>>>>>>>>>> after it. >>>>>>>>>>>>>>>>>>> This is what should be guaranteed but not the contents >>>>>>>>>>>>>>>>>>> of the intermediate savepoint. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio < >>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks for your answer! >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I check for the missed data from the final output on s3. >>>>>>>>>>>>>>>>>>> So I wait until the next day, then run the same thing >>>>>>>>>>>>>>>>>>> re-implemented in >>>>>>>>>>>>>>>>>>> batch, and compare the output. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> > The late data around the time of taking savepoint >>>>>>>>>>>>>>>>>>> might be not included into the savepoint but it should be >>>>>>>>>>>>>>>>>>> behind the >>>>>>>>>>>>>>>>>>> snapshotted offset in Kafka. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Yes, I would definitely expect that. It seems like >>>>>>>>>>>>>>>>>>> there's a bug somewhere. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> > Then it should just come later after the restore and >>>>>>>>>>>>>>>>>>> should be reduced within the allowed lateness into the >>>>>>>>>>>>>>>>>>> final result which >>>>>>>>>>>>>>>>>>> is saved into s3. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Well, as far as I know, allowed lateness doesn't play >>>>>>>>>>>>>>>>>>> any role here, because I started running the job with >>>>>>>>>>>>>>>>>>> allowedLateness=0, >>>>>>>>>>>>>>>>>>> and still get the data loss, while my late data output >>>>>>>>>>>>>>>>>>> doesn't receive >>>>>>>>>>>>>>>>>>> anything. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> > Also, is this `DistinctFunction.reduce` just an >>>>>>>>>>>>>>>>>>> example or the actual implementation, basically saving just >>>>>>>>>>>>>>>>>>> one of records >>>>>>>>>>>>>>>>>>> inside the 24h window in s3? then what is missing there? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Yes, it's the actual implementation. Note that there's a >>>>>>>>>>>>>>>>>>> keyBy before the DistinctFunction. So there's one record >>>>>>>>>>>>>>>>>>> for each key >>>>>>>>>>>>>>>>>>> (which is the combination of a couple of fields). In >>>>>>>>>>>>>>>>>>> practice I've seen >>>>>>>>>>>>>>>>>>> that we're missing ~2000-4000 elements on each restore, and >>>>>>>>>>>>>>>>>>> the total >>>>>>>>>>>>>>>>>>> output is obviously much more than that. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Here's the full code for the key selector: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> public class MapKeySelector implements >>>>>>>>>>>>>>>>>>> KeySelector<Map<String,String>, Object> { >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> private final String[] fields; >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> public MapKeySelector(String... fields) { >>>>>>>>>>>>>>>>>>> this.fields = fields; >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>>> public Object getKey(Map<String, String> event) >>>>>>>>>>>>>>>>>>> throws Exception { >>>>>>>>>>>>>>>>>>> Tuple key = >>>>>>>>>>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance(); >>>>>>>>>>>>>>>>>>> for (int i = 0; i < fields.length; i++) { >>>>>>>>>>>>>>>>>>> key.setField(event.getOrDefault(fields[i], >>>>>>>>>>>>>>>>>>> ""), i); >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> return key; >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> And a more exact example on how it's used: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> .keyBy(new MapKeySelector("ID", >>>>>>>>>>>>>>>>>>> "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE")) >>>>>>>>>>>>>>>>>>> .timeWindow(Time.days(1)) >>>>>>>>>>>>>>>>>>> .reduce(new DistinctFunction()) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin < >>>>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Where exactly does the data miss? When do you notice >>>>>>>>>>>>>>>>>>>> that? >>>>>>>>>>>>>>>>>>>> Do you check it: >>>>>>>>>>>>>>>>>>>> - debugging `DistinctFunction.reduce` right after >>>>>>>>>>>>>>>>>>>> resume in the middle of the day >>>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>> - some distinct records miss in the final output >>>>>>>>>>>>>>>>>>>> of BucketingSink in s3 after window result is actually >>>>>>>>>>>>>>>>>>>> triggered and saved >>>>>>>>>>>>>>>>>>>> into s3 at the end of the day? is this the main output? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The late data around the time of taking savepoint might >>>>>>>>>>>>>>>>>>>> be not included into the savepoint but it should be behind >>>>>>>>>>>>>>>>>>>> the snapshotted >>>>>>>>>>>>>>>>>>>> offset in Kafka. Then it should just come later after the >>>>>>>>>>>>>>>>>>>> restore and >>>>>>>>>>>>>>>>>>>> should be reduced within the allowed lateness into the >>>>>>>>>>>>>>>>>>>> final result which >>>>>>>>>>>>>>>>>>>> is saved into s3. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Also, is this `DistinctFunction.reduce` just an example >>>>>>>>>>>>>>>>>>>> or the actual implementation, basically saving just one of >>>>>>>>>>>>>>>>>>>> records inside >>>>>>>>>>>>>>>>>>>> the 24h window in s3? then what is missing there? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio < >>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I changed to allowedLateness=0, no change, still >>>>>>>>>>>>>>>>>>>> missing data when restoring from savepoint. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio < >>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I realized that BucketingSink must not play any role >>>>>>>>>>>>>>>>>>>>> in this problem. This is because only when the 24-hour >>>>>>>>>>>>>>>>>>>>> window triggers, >>>>>>>>>>>>>>>>>>>>> BucketinSink gets a burst of input. Around the state >>>>>>>>>>>>>>>>>>>>> restoring point >>>>>>>>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't >>>>>>>>>>>>>>>>>>>>> lose anything >>>>>>>>>>>>>>>>>>>>> either (right?). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I will next try removing the allowedLateness entirely >>>>>>>>>>>>>>>>>>>>> from the equation. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> In the meanwhile, please let me know if you have any >>>>>>>>>>>>>>>>>>>>> suggestions for debugging the lost data, for example what >>>>>>>>>>>>>>>>>>>>> logs to enable. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known >>>>>>>>>>>>>>>>>>>>> issues with that, that could contribute to lost data when >>>>>>>>>>>>>>>>>>>>> restoring a >>>>>>>>>>>>>>>>>>>>> savepoint? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio < >>>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Some data is silently lost on my Flink stream job >>>>>>>>>>>>>>>>>>>>>> when state is restored from a savepoint. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Do you have any debugging hints to find out where >>>>>>>>>>>>>>>>>>>>>> exactly the data gets dropped? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> My job gathers distinct values using a 24-hour >>>>>>>>>>>>>>>>>>>>>> window. It doesn't have any custom state management. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> When I cancel the job with savepoint and restore from >>>>>>>>>>>>>>>>>>>>>> that savepoint, some data is missed. It seems to be >>>>>>>>>>>>>>>>>>>>>> losing just a small >>>>>>>>>>>>>>>>>>>>>> amount of data. The event time of lost data is probably >>>>>>>>>>>>>>>>>>>>>> around the time of >>>>>>>>>>>>>>>>>>>>>> savepoint. In other words the rest of the time window is >>>>>>>>>>>>>>>>>>>>>> not entirely >>>>>>>>>>>>>>>>>>>>>> missed – collection works correctly also for (most of >>>>>>>>>>>>>>>>>>>>>> the) events that come >>>>>>>>>>>>>>>>>>>>>> in after restoring. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> When the job processes a full 24-hour window without >>>>>>>>>>>>>>>>>>>>>> interruptions it doesn't miss anything. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Usually the problem doesn't happen in test >>>>>>>>>>>>>>>>>>>>>> environments that have smaller parallelism and smaller >>>>>>>>>>>>>>>>>>>>>> data volumes. But in >>>>>>>>>>>>>>>>>>>>>> production volumes the job seems to be consistently >>>>>>>>>>>>>>>>>>>>>> missing at least >>>>>>>>>>>>>>>>>>>>>> something on every restore. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> This issue has consistently happened since the job >>>>>>>>>>>>>>>>>>>>>> was initially created. It was at first run on an older >>>>>>>>>>>>>>>>>>>>>> version of Flink >>>>>>>>>>>>>>>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & >>>>>>>>>>>>>>>>>>>>>> 1.6.0. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I'm wondering if this could be for example some >>>>>>>>>>>>>>>>>>>>>> synchronization issue between the kafka consumer offsets >>>>>>>>>>>>>>>>>>>>>> vs. what's been >>>>>>>>>>>>>>>>>>>>>> written by BucketingSink? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 1. Job content, simplified >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> kafkaStream >>>>>>>>>>>>>>>>>>>>>> .flatMap(new ExtractFieldsFunction()) >>>>>>>>>>>>>>>>>>>>>> .keyBy(new MapKeySelector(1, 2, 3, 4)) >>>>>>>>>>>>>>>>>>>>>> .timeWindow(Time.days(1)) >>>>>>>>>>>>>>>>>>>>>> .allowedLateness(allowedLateness) >>>>>>>>>>>>>>>>>>>>>> .sideOutputLateData(lateDataTag) >>>>>>>>>>>>>>>>>>>>>> .reduce(new DistinctFunction()) >>>>>>>>>>>>>>>>>>>>>> .addSink(sink) >>>>>>>>>>>>>>>>>>>>>> // use a fixed number of output >>>>>>>>>>>>>>>>>>>>>> partitions >>>>>>>>>>>>>>>>>>>>>> .setParallelism(8)) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> /** >>>>>>>>>>>>>>>>>>>>>> * Usage: .keyBy("the", "distinct", >>>>>>>>>>>>>>>>>>>>>> "fields").reduce(new DistinctFunction()) >>>>>>>>>>>>>>>>>>>>>> */ >>>>>>>>>>>>>>>>>>>>>> public class DistinctFunction implements >>>>>>>>>>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> { >>>>>>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>>>>>> public Map<String, String> reduce(Map<String, >>>>>>>>>>>>>>>>>>>>>> String> value1, Map<String, String> value2) { >>>>>>>>>>>>>>>>>>>>>> return value1; >>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 2. State configuration >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> boolean enableIncrementalCheckpointing = true; >>>>>>>>>>>>>>>>>>>>>> String statePath = "s3n://bucket/savepoints"; >>>>>>>>>>>>>>>>>>>>>> new RocksDBStateBackend(statePath, >>>>>>>>>>>>>>>>>>>>>> enableIncrementalCheckpointing); >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Checkpointing Mode Exactly Once >>>>>>>>>>>>>>>>>>>>>> Interval 1m 0s >>>>>>>>>>>>>>>>>>>>>> Timeout 10m 0s >>>>>>>>>>>>>>>>>>>>>> Minimum Pause Between Checkpoints 1m 0s >>>>>>>>>>>>>>>>>>>>>> Maximum Concurrent Checkpoints 1 >>>>>>>>>>>>>>>>>>>>>> Persist Checkpoints Externally Enabled (retain on >>>>>>>>>>>>>>>>>>>>>> cancellation) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 3. BucketingSink configuration >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> We use BucketingSink, I don't think there's anything >>>>>>>>>>>>>>>>>>>>>> special here, if not the fact that we're writing to S3. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> String outputPath = "s3://bucket/output"; >>>>>>>>>>>>>>>>>>>>>> BucketingSink<Map<String, String>> sink = new >>>>>>>>>>>>>>>>>>>>>> BucketingSink<Map<String, String>>(outputPath) >>>>>>>>>>>>>>>>>>>>>> .setBucketer(new >>>>>>>>>>>>>>>>>>>>>> ProcessdateBucketer()) >>>>>>>>>>>>>>>>>>>>>> .setBatchSize(batchSize) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval); >>>>>>>>>>>>>>>>>>>>>> sink.setWriter(new IdJsonWriter()); >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 4. Kafka & event time >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> My flink job reads the data from Kafka, using a >>>>>>>>>>>>>>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka >>>>>>>>>>>>>>>>>>>>>> consumer to >>>>>>>>>>>>>>>>>>>>>> synchronize watermarks accross all kafka partitions. We >>>>>>>>>>>>>>>>>>>>>> also write late >>>>>>>>>>>>>>>>>>>>>> data to side output, but nothing is written there – if >>>>>>>>>>>>>>>>>>>>>> it would, it could >>>>>>>>>>>>>>>>>>>>>> explain missed data in the main output (I'm also sure >>>>>>>>>>>>>>>>>>>>>> that our late data >>>>>>>>>>>>>>>>>>>>>> writing works, because we previously had some actual >>>>>>>>>>>>>>>>>>>>>> late data which ended >>>>>>>>>>>>>>>>>>>>>> up there). >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 5. allowedLateness >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> It may be or may not be relevant that I have also >>>>>>>>>>>>>>>>>>>>>> enabled allowedLateness with 1 minute lateness on the >>>>>>>>>>>>>>>>>>>>>> 24-hour window: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> If that makes sense, I could try removing >>>>>>>>>>>>>>>>>>>>>> allowedLateness entirely? That would be just to rule out >>>>>>>>>>>>>>>>>>>>>> that Flink doesn't >>>>>>>>>>>>>>>>>>>>>> have a bug that's related to restoring state in >>>>>>>>>>>>>>>>>>>>>> combination with the >>>>>>>>>>>>>>>>>>>>>> allowedLateness feature. After all, all of our data >>>>>>>>>>>>>>>>>>>>>> should be in a good >>>>>>>>>>>>>>>>>>>>>> enough order to not be late, given the max out of >>>>>>>>>>>>>>>>>>>>>> orderness used on kafka >>>>>>>>>>>>>>>>>>>>>> consumer timestamp extractor. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thank you in advance! >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >>