Bump – does anyone know if Stefan will be available to comment the latest findings? Thanks.
On Fri, Dec 21, 2018 at 2:33 PM Juho Autio <juho.au...@rovio.com> wrote: > Stefan, I managed to analyze savepoint with bravo. It seems that the data > that's missing from output *is* found in savepoint. > > I simplified my test case to the following: > > - job 1 has bee running for ~10 days > - savepoint X created & job 1 cancelled > - job 2 started with restore from savepoint X > > Then I waited until the next day so that job 2 has triggered the 24 hour > window. > > Then I analyzed the output & savepoint: > > - compare job 2 output with the output of a batch pyspark script => find > 4223 missing rows > - pick one of the missing rows (say, id Z) > - read savepoint X with bravo, filter for id Z => Z was found in the > savepoint! > > How can it be possible that the value is in state but doesn't end up in > output after state has been restored & window is eventually triggered? > > I also did similar analysis on the previous case where I savepointed & > restored the job multiple times (5) within the same 24-hour window. A > missing id that I drilled down to, was found in all of those savepoints, > yet missing from the output that gets written at the end of the day. This > is even more surprising: that the missing ID was written to the new > savepoints also after restoring. Is the reducer state somehow decoupled > from the window contents? > > Big thanks to bravo-developer Gyula for guiding me through to be able read > the reducer state! https://github.com/king/bravo/pull/11 > > Gyula also had an idea for how to troubleshoot the missing data in a > scalable way: I could add some "side effect kafka output" on individual > operators. This should allow tracking more closely at which point the data > gets lost. However, maybe this would have to be in some Flink's internal > components, and I'm not sure which those would be. > > Cheers, > Juho > > On Mon, Nov 19, 2018 at 11:52 AM Juho Autio <juho.au...@rovio.com> wrote: > >> >> Hi Stefan, >> >> Bravo doesn't currently support reading a reducer state. I gave it a try >> but couldn't get to a working implementation yet. If anyone can provide >> some insight on how to make this work, please share at github: >> https://github.com/king/bravo/pull/11 >> >> Thanks. >> >> On Tue, Oct 23, 2018 at 3:32 PM Juho Autio <juho.au...@rovio.com> wrote: >> >>> I was glad to find that bravo had now been updated to support installing >>> bravo to a local maven repo. >>> >>> I was able to load a checkpoint created by my job, thanks to the example >>> provided in bravo README, but I'm still missing the essential piece. >>> >>> My code was: >>> >>> OperatorStateReader reader = new OperatorStateReader(env2, >>> savepoint, "DistinctFunction"); >>> DontKnowWhatTypeThisIs reducingState = >>> reader.readKeyedStates(what should I put here?); >>> >>> I don't know how to read the values collected from reduce() calls in the >>> state. Is there a way to access the reducing state of the window with >>> bravo? I'm a bit confused how this works, because when I check with >>> debugger, flink internally uses a ReducingStateDescriptor >>> with name=window-contents, but still reading operator state for >>> "DistinctFunction" didn't at least throw an exception ("window-contents" >>> threw – obviously there's no operator by that name). >>> >>> Cheers, >>> Juho >>> >>> On Mon, Oct 15, 2018 at 2:25 PM Juho Autio <juho.au...@rovio.com> wrote: >>> >>>> Hi Stefan, >>>> >>>> Sorry but it doesn't seem immediately clear to me what's a good way to >>>> use https://github.com/king/bravo. >>>> >>>> How are people using it? Would you for example modify build.gradle >>>> somehow to publish the bravo as a library locally/internally? Or add code >>>> directly in the bravo project (locally) and run it from there (using an >>>> IDE, for example)? Also it doesn't seem like the bravo gradle project >>>> supports building a flink job jar, but if it does, how do I do it? >>>> >>>> Thanks. >>>> >>>> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio <juho.au...@rovio.com> wrote: >>>> >>>>> Good then, I'll try to analyze the savepoints with Bravo. Thanks! >>>>> >>>>> > How would you assume that backpressure would influence your updates? >>>>> Updates to each local state still happen event-by-event, in a single >>>>> reader/writing thread. >>>>> >>>>> Sure, just an ignorant guess by me. I'm not familiar with most of >>>>> Flink's internals. Any way high backpressure is not a seen on this job >>>>> after it has caught up the lag, so at I thought it would be worth >>>>> mentioning. >>>>> >>>>> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter < >>>>> s.rich...@data-artisans.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.au...@rovio.com>: >>>>>> >>>>>> > you could take a look at Bravo [1] to query your savepoints and to >>>>>> check if the state in the savepoint complete w.r.t your expectations >>>>>> >>>>>> Thanks. I'm not 100% if this is the case, but to me it seemed like >>>>>> the missed ids were being logged by the reducer soon after the job had >>>>>> started (after restoring a savepoint). But on the other hand, after that >>>>>> I >>>>>> also made another savepoint & restored that, so what I could check is: >>>>>> does >>>>>> that next savepoint have the missed ids that were logged (a couple of >>>>>> minutes before the savepoint was created, so there should've been more >>>>>> than >>>>>> enough time to add them to the state before the savepoint was triggered) >>>>>> or >>>>>> not. Any way, if I would be able to verify with Bravo that the ids are >>>>>> missing from the savepoint (even though reduced logged that it saw them), >>>>>> would that help in figuring out where they are lost? Is there some major >>>>>> difference compared to just looking at the final output after window has >>>>>> been triggered? >>>>>> >>>>>> >>>>>> >>>>>> I think that makes a difference. For example, you can investigate if >>>>>> there is a state loss or a problem with the windowing. In the savepoint >>>>>> you >>>>>> could see which keys exists and to which windows they are assigned. Also >>>>>> just to make sure there is no misunderstanding: only elements that are in >>>>>> the state at the start of a savepoint are expected to be part of the >>>>>> savepoint; all elements between start and completion of the savepoint are >>>>>> not expected to be part of the savepoint. >>>>>> >>>>>> >>>>>> > I also doubt that the problem is about backpressure after restore, >>>>>> because the job will only continue running after the state restore is >>>>>> already completed. >>>>>> >>>>>> Yes, I'm not suspecting that the state restoring would be the problem >>>>>> either. My concern was about backpressure possibly messing with the >>>>>> updates >>>>>> of reducing state? I would tend to suspect that updating the state >>>>>> consistently is what fails, where heavy load / backpressure might be a >>>>>> factor. >>>>>> >>>>>> >>>>>> >>>>>> How would you assume that backpressure would influence your updates? >>>>>> Updates to each local state still happen event-by-event, in a single >>>>>> reader/writing thread. >>>>>> >>>>>> >>>>>> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter < >>>>>> s.rich...@data-artisans.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> you could take a look at Bravo [1] to query your savepoints and to >>>>>>> check if the state in the savepoint complete w.r.t your expectations. I >>>>>>> somewhat doubt that there is a general problem with the state/savepoints >>>>>>> because many users are successfully running it on a large state and I am >>>>>>> not aware of any data loss problems, but nothing is impossible. What the >>>>>>> savepoint does is also straight forward: iterate a db snapshot and write >>>>>>> all key/value pairs to disk, so all data that was in the db at the time >>>>>>> of >>>>>>> the savepoint, should show up. I also doubt that the problem is about >>>>>>> backpressure after restore, because the job will only continue running >>>>>>> after the state restore is already completed. Did you check if you are >>>>>>> using exactly-one-semantics or at-least-once semantics? Also did you >>>>>>> check >>>>>>> that the kafka consumer start position is configured properly [2]? Are >>>>>>> watermarks generated as expected after restore? >>>>>>> >>>>>>> One more unrelated high-level comment that I have: for a granularity >>>>>>> of 24h windows, I wonder if it would not make sense to use a batch job >>>>>>> instead? >>>>>>> >>>>>>> Best, >>>>>>> Stefan >>>>>>> >>>>>>> [1] https://github.com/king/bravo >>>>>>> [2] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration >>>>>>> >>>>>>> Am 04.10.2018 um 14:53 schrieb Juho Autio <juho.au...@rovio.com>: >>>>>>> >>>>>>> Thanks for the suggestions! >>>>>>> >>>>>>> > In general, it would be tremendously helpful to have a minimal >>>>>>> working example which allows to reproduce the problem. >>>>>>> >>>>>>> Definitely. The problem with reproducing has been that this only >>>>>>> seems to happen in the bigger production data volumes. >>>>>>> >>>>>>> That's why I'm hoping to find a way to debug this with the >>>>>>> production data. With that it seems to consistently cause some misses >>>>>>> every >>>>>>> time the job is killed/restored. >>>>>>> >>>>>>> > check if it happens for shorter windows, like 1h etc >>>>>>> >>>>>>> What would be the benefit of that compared to 24h window? >>>>>>> >>>>>>> > simplify the job to not use a reduce window but simply a time >>>>>>> window which outputs the window events. Then counting the input and >>>>>>> output >>>>>>> events should allow you to verify the results. If you are not seeing >>>>>>> missing events, then it could have something to do with the reducing >>>>>>> state >>>>>>> used in the reduce function. >>>>>>> >>>>>>> Hm, maybe, but not sure how useful that would be, because it >>>>>>> wouldn't yet prove that it's related to reducing, because not having a >>>>>>> reduce function could also mean smaller load on the job, which might >>>>>>> alone >>>>>>> be enough to make the problem not manifest. >>>>>>> >>>>>>> Is there a way to debug what goes into the reducing state (including >>>>>>> what gets removed or overwritten and what restored), if that makes >>>>>>> sense..? >>>>>>> Maybe some suitable logging could be used to prove that the lost data is >>>>>>> written to the reducing state (or at least asked to be written), but not >>>>>>> found any more when the window closes and state is flushed? >>>>>>> >>>>>>> On configuration once more, we're using RocksDB state backend with >>>>>>> asynchronous incremental checkpointing. The state is restored from >>>>>>> savepoints though, we haven't been using those checkpoints in these >>>>>>> tests >>>>>>> (although they could be used in case of crashes – but we haven't had >>>>>>> those >>>>>>> now). >>>>>>> >>>>>>> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann <trohrm...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Juho, >>>>>>>> >>>>>>>> another idea to further narrow down the problem could be to >>>>>>>> simplify the job to not use a reduce window but simply a time window >>>>>>>> which >>>>>>>> outputs the window events. Then counting the input and output events >>>>>>>> should >>>>>>>> allow you to verify the results. If you are not seeing missing events, >>>>>>>> then >>>>>>>> it could have something to do with the reducing state used in the >>>>>>>> reduce >>>>>>>> function. >>>>>>>> >>>>>>>> In general, it would be tremendously helpful to have a minimal >>>>>>>> working example which allows to reproduce the problem. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Till >>>>>>>> >>>>>>>> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin < >>>>>>>> and...@data-artisans.com> wrote: >>>>>>>> >>>>>>>>> Hi Juho, >>>>>>>>> >>>>>>>>> can you try to reduce the job to minimal reproducible example and >>>>>>>>> share the job and input? >>>>>>>>> >>>>>>>>> For example: >>>>>>>>> - some simple records as input, e.g. tuples of primitive types >>>>>>>>> saved as cvs >>>>>>>>> - minimal deduplication job which processes them and misses records >>>>>>>>> - check if it happens for shorter windows, like 1h etc >>>>>>>>> - setup which you use for the job, ideally locally reproducible or >>>>>>>>> cloud >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Andrey >>>>>>>>> >>>>>>>>> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com> wrote: >>>>>>>>> >>>>>>>>> Sorry to insist, but we seem to be blocked for any serious usage >>>>>>>>> of state in Flink if we can't rely on it to not miss data in case of >>>>>>>>> restore. >>>>>>>>> >>>>>>>>> Would anyone have suggestions for how to troubleshoot this? So far >>>>>>>>> I have verified with DEBUG logs that our reduce function gets to >>>>>>>>> process >>>>>>>>> also the data that is missing from window output. >>>>>>>>> >>>>>>>>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <juho.au...@rovio.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Andrey, >>>>>>>>>> >>>>>>>>>> To rule out for good any questions about sink behaviour, the job >>>>>>>>>> was killed and started with an additional Kafka sink. >>>>>>>>>> >>>>>>>>>> The same number of ids were missed in both outputs: KafkaSink & >>>>>>>>>> BucketingSink. >>>>>>>>>> >>>>>>>>>> I wonder what would be the next steps in debugging? >>>>>>>>>> >>>>>>>>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <juho.au...@rovio.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks, Andrey. >>>>>>>>>>> >>>>>>>>>>> > so it means that the savepoint does not loose at least some >>>>>>>>>>> dropped records. >>>>>>>>>>> >>>>>>>>>>> I'm not sure what you mean by that? I mean, it was known from >>>>>>>>>>> the beginning, that not everything is lost before/after restoring a >>>>>>>>>>> savepoint, just some records around the time of restoration. It's >>>>>>>>>>> not 100% >>>>>>>>>>> clear whether records are lost before making a savepoint or after >>>>>>>>>>> restoring >>>>>>>>>>> it. Although, based on the new DEBUG logs it seems more like losing >>>>>>>>>>> some >>>>>>>>>>> records that are seen ~soon after restoring. It seems like Flink >>>>>>>>>>> would be >>>>>>>>>>> somehow confused either about the restored state vs. new inserts to >>>>>>>>>>> state. >>>>>>>>>>> This could also be somehow linked to the high back pressure on the >>>>>>>>>>> kafka >>>>>>>>>>> source while the stream is catching up. >>>>>>>>>>> >>>>>>>>>>> > If it is feasible for your setup, I suggest to insert one more >>>>>>>>>>> map function after reduce and before sink. >>>>>>>>>>> > etc. >>>>>>>>>>> >>>>>>>>>>> Isn't that the same thing that we discussed before? Nothing is >>>>>>>>>>> sent to BucketingSink before the window closes, so I don't see how >>>>>>>>>>> it would >>>>>>>>>>> make any difference if we replace the BucketingSink with a map >>>>>>>>>>> function or >>>>>>>>>>> another sink type. We don't create or restore savepoints during the >>>>>>>>>>> time >>>>>>>>>>> when BucketingSink gets input or has open buckets – that happens at >>>>>>>>>>> a much >>>>>>>>>>> later time of day. I would focus on figuring out why the records >>>>>>>>>>> are lost >>>>>>>>>>> while the window is open. But I don't know how to do that. Would >>>>>>>>>>> you have >>>>>>>>>>> any additional suggestions? >>>>>>>>>>> >>>>>>>>>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin < >>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Juho, >>>>>>>>>>>> >>>>>>>>>>>> so it means that the savepoint does not loose at least some >>>>>>>>>>>> dropped records. >>>>>>>>>>>> >>>>>>>>>>>> If it is feasible for your setup, I suggest to insert one more >>>>>>>>>>>> map function after reduce and before sink. >>>>>>>>>>>> The map function should be called right after window is >>>>>>>>>>>> triggered but before flushing to s3. >>>>>>>>>>>> The result of reduce (deduped record) could be logged there. >>>>>>>>>>>> This should allow to check whether the processed distinct >>>>>>>>>>>> records were buffered in the state after the restoration from the >>>>>>>>>>>> savepoint >>>>>>>>>>>> or not. If they were buffered we should see that there was an >>>>>>>>>>>> attempt to >>>>>>>>>>>> write them to the sink from the state. >>>>>>>>>>>> >>>>>>>>>>>> Another suggestion is to try to write records to some other >>>>>>>>>>>> sink or to both. >>>>>>>>>>>> E.g. if you can access file system of workers, maybe just into >>>>>>>>>>>> local files and check whether the records are also dropped there. >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Andrey >>>>>>>>>>>> >>>>>>>>>>>> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi Andrey! >>>>>>>>>>>> >>>>>>>>>>>> I was finally able to gather the DEBUG logs that you suggested. >>>>>>>>>>>> In short, the reducer logged that it processed at least some of >>>>>>>>>>>> the ids >>>>>>>>>>>> that were missing from the output. >>>>>>>>>>>> >>>>>>>>>>>> "At least some", because I didn't have the job running with >>>>>>>>>>>> DEBUG logs for the full 24-hour window period. So I was only able >>>>>>>>>>>> to look >>>>>>>>>>>> up if I can find *some* of the missing ids in the DEBUG logs. >>>>>>>>>>>> Which I did indeed. >>>>>>>>>>>> >>>>>>>>>>>> I changed the DistinctFunction.java to do this: >>>>>>>>>>>> >>>>>>>>>>>> @Override >>>>>>>>>>>> public Map<String, String> reduce(Map<String, String> >>>>>>>>>>>> value1, Map<String, String> value2) { >>>>>>>>>>>> LOG.debug("DistinctFunction.reduce returns: {}={}", >>>>>>>>>>>> value1.get("field"), value1.get("id")); >>>>>>>>>>>> return value1; >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> Then: >>>>>>>>>>>> >>>>>>>>>>>> vi flink-1.6.0/conf/log4j.properties >>>>>>>>>>>> >>>>>>>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG >>>>>>>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG >>>>>>>>>>>> >>>>>>>>>>>> Then I ran the following kind of test: >>>>>>>>>>>> >>>>>>>>>>>> - Cancelled the on-going job with savepoint created at ~Sep 18 >>>>>>>>>>>> 08:35 UTC 2018 >>>>>>>>>>>> - Started a new cluster & job with DEBUG enabled at ~09:13, >>>>>>>>>>>> restored from that previous cluster's savepoint >>>>>>>>>>>> - Ran until caught up offsets >>>>>>>>>>>> - Cancelled the job with a new savepoint >>>>>>>>>>>> - Started a new job _without_ DEBUG, which restored the new >>>>>>>>>>>> savepoint, let it keep running so that it will eventually write >>>>>>>>>>>> the output >>>>>>>>>>>> >>>>>>>>>>>> Then on the next day, after results had been flushed when the >>>>>>>>>>>> 24-hour window closed, I compared the results again with a batch >>>>>>>>>>>> version's >>>>>>>>>>>> output. And found some missing ids as usual. >>>>>>>>>>>> >>>>>>>>>>>> I drilled down to one specific missing id (I'm replacing the >>>>>>>>>>>> actual value with AN12345 below), which was not found in the >>>>>>>>>>>> stream output, >>>>>>>>>>>> but was found in batch output & flink DEBUG logs. >>>>>>>>>>>> >>>>>>>>>>>> Related to that id, I gathered the following information: >>>>>>>>>>>> >>>>>>>>>>>> 2018-09-18~09:13:21,000 job started & savepoint is restored >>>>>>>>>>>> >>>>>>>>>>>> 2018-09-18 09:14:29,085 missing id is processed for the first >>>>>>>>>>>> time, proved by this log line: >>>>>>>>>>>> 2018-09-18 09:14:29,085 DEBUG >>>>>>>>>>>> com.rovio.ds.flink.uniqueid.DistinctFunction - >>>>>>>>>>>> DistinctFunction.reduce returns: s.aid1=AN12345 >>>>>>>>>>>> >>>>>>>>>>>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint >>>>>>>>>>>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint >>>>>>>>>>>> >>>>>>>>>>>> ( >>>>>>>>>>>> more occurrences of checkpoints (~1 min checkpointing time + ~1 >>>>>>>>>>>> min delay before next) >>>>>>>>>>>> / >>>>>>>>>>>> more occurrences of DistinctFunction.reduce >>>>>>>>>>>> ) >>>>>>>>>>>> >>>>>>>>>>>> 2018-09-18 09:23:45,053 missing id is processed for the last >>>>>>>>>>>> time >>>>>>>>>>>> >>>>>>>>>>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled >>>>>>>>>>>> >>>>>>>>>>>> To be noted, there was high backpressure after restoring from >>>>>>>>>>>> savepoint until the stream caught up with the kafka offsets. >>>>>>>>>>>> Although, our >>>>>>>>>>>> job uses assign timestamps & watermarks on the flink kafka >>>>>>>>>>>> consumer itself, >>>>>>>>>>>> so event time of all partitions is synchronized. As expected, we >>>>>>>>>>>> don't get >>>>>>>>>>>> any late data in the late data side output. >>>>>>>>>>>> >>>>>>>>>>>> From this we can see that the missing ids are processed by the >>>>>>>>>>>> reducer, but they must get lost somewhere before the 24-hour >>>>>>>>>>>> window is >>>>>>>>>>>> triggered. >>>>>>>>>>>> >>>>>>>>>>>> I think it's worth mentioning once more that the stream doesn't >>>>>>>>>>>> miss any ids if we let it's running without interruptions / state >>>>>>>>>>>> restoring. >>>>>>>>>>>> >>>>>>>>>>>> What's next? >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin < >>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>> >>>>>>>>>>>>> > only when the 24-hour window triggers, BucketingSink gets a >>>>>>>>>>>>> burst of input >>>>>>>>>>>>> >>>>>>>>>>>>> This is of course totally true, my understanding is the same. >>>>>>>>>>>>> We cannot exclude problem there for sure, just savepoints are >>>>>>>>>>>>> used a lot >>>>>>>>>>>>> w/o problem reports and BucketingSink is known to be problematic >>>>>>>>>>>>> with s3. >>>>>>>>>>>>> That is why, I asked you: >>>>>>>>>>>>> >>>>>>>>>>>>> > You also wrote that the timestamps of lost event are >>>>>>>>>>>>> 'probably' around the time of the savepoint, if it is not yet for >>>>>>>>>>>>> sure I >>>>>>>>>>>>> would also check it. >>>>>>>>>>>>> >>>>>>>>>>>>> Although, bucketing sink might loose any data at the end of >>>>>>>>>>>>> the day (also from the middle). The fact, that it is always >>>>>>>>>>>>> around the time >>>>>>>>>>>>> of taking a savepoint and not random, is surely suspicious and >>>>>>>>>>>>> possible >>>>>>>>>>>>> savepoint failures need to be investigated. >>>>>>>>>>>>> >>>>>>>>>>>>> Regarding the s3 problem, s3 doc says: >>>>>>>>>>>>> >>>>>>>>>>>>> > The caveat is that if you make a HEAD or GET request to the >>>>>>>>>>>>> key name (to find if the object exists) before creating the >>>>>>>>>>>>> object, Amazon >>>>>>>>>>>>> S3 provides 'eventual consistency' for read-after-write. >>>>>>>>>>>>> >>>>>>>>>>>>> The algorithm you suggest is how it is roughly implemented now >>>>>>>>>>>>> (BucketingSink.openNewPartFile). My understanding is that >>>>>>>>>>>>> 'eventual consistency’ means that even if you just created file >>>>>>>>>>>>> (its name >>>>>>>>>>>>> is key) it can be that you do not get it in the list or exists >>>>>>>>>>>>> (HEAD) >>>>>>>>>>>>> returns false and you risk to rewrite the previous part. >>>>>>>>>>>>> >>>>>>>>>>>>> The BucketingSink was designed for a standard file system. s3 >>>>>>>>>>>>> is used over a file system wrapper atm but does not always >>>>>>>>>>>>> provide normal >>>>>>>>>>>>> file system guarantees. See also last example in [1]. >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers, >>>>>>>>>>>>> Andrey >>>>>>>>>>>>> >>>>>>>>>>>>> [1] >>>>>>>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82 >>>>>>>>>>>>> >>>>>>>>>>>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Andrey, thank you very much for the debugging suggestions, >>>>>>>>>>>>> I'll try them. >>>>>>>>>>>>> >>>>>>>>>>>>> In the meanwhile two more questions, please: >>>>>>>>>>>>> >>>>>>>>>>>>> > Just to keep in mind this problem with s3 and exclude it for >>>>>>>>>>>>> sure. I would also check whether the size of missing events is >>>>>>>>>>>>> around the >>>>>>>>>>>>> batch size of BucketingSink or not. >>>>>>>>>>>>> >>>>>>>>>>>>> Fair enough, but I also want to focus on debugging the most >>>>>>>>>>>>> probable subject first. So what do you think about this – true or >>>>>>>>>>>>> false: >>>>>>>>>>>>> only when the 24-hour window triggers, BucketinSink gets a burst >>>>>>>>>>>>> of input. >>>>>>>>>>>>> Around the state restoring point (middle of the day) it doesn't >>>>>>>>>>>>> get any >>>>>>>>>>>>> input, so it can't lose anything either. Isn't this true, or have >>>>>>>>>>>>> I totally >>>>>>>>>>>>> missed how Flink works in triggering window results? I would not >>>>>>>>>>>>> expect >>>>>>>>>>>>> there to be any optimization that speculatively triggers early >>>>>>>>>>>>> results of a >>>>>>>>>>>>> regular time window to the downstream operators. >>>>>>>>>>>>> >>>>>>>>>>>>> > The old BucketingSink has in general problem with s3. >>>>>>>>>>>>> Internally BucketingSink queries s3 as a file system to list >>>>>>>>>>>>> already >>>>>>>>>>>>> written file parts (batches) and determine index of the next part >>>>>>>>>>>>> to start. >>>>>>>>>>>>> Due to eventual consistency of checking file existence in s3 [1], >>>>>>>>>>>>> the >>>>>>>>>>>>> BucketingSink can rewrite the previously written part and >>>>>>>>>>>>> basically loose >>>>>>>>>>>>> it. >>>>>>>>>>>>> >>>>>>>>>>>>> I was wondering, what does S3's "read-after-write consistency" >>>>>>>>>>>>> (mentioned on the page you linked) actually mean. It seems that >>>>>>>>>>>>> this might >>>>>>>>>>>>> be possible: >>>>>>>>>>>>> - LIST keys, find current max index >>>>>>>>>>>>> - choose next index = max + 1 >>>>>>>>>>>>> - HEAD next index: if it exists, keep adding + 1 until key >>>>>>>>>>>>> doesn't exist on S3 >>>>>>>>>>>>> >>>>>>>>>>>>> But definitely sounds easier if a sink keeps track of files in >>>>>>>>>>>>> a way that's guaranteed to be consistent. >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers, >>>>>>>>>>>>> Juho >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin < >>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> true, StreamingFileSink does not support s3 in 1.6.0, it is >>>>>>>>>>>>>> planned for the next 1.7 release, sorry for confusion. >>>>>>>>>>>>>> The old BucketingSink has in general problem with s3. >>>>>>>>>>>>>> Internally BucketingSink queries s3 as a file system >>>>>>>>>>>>>> to list already written file parts (batches) and determine >>>>>>>>>>>>>> index of the next part to start. Due to eventual consistency of >>>>>>>>>>>>>> checking >>>>>>>>>>>>>> file existence in s3 [1], the BucketingSink can rewrite the >>>>>>>>>>>>>> previously >>>>>>>>>>>>>> written part and basically loose it. It should be fixed for >>>>>>>>>>>>>> StreamingFileSink in 1.7 where Flink keeps its own track of >>>>>>>>>>>>>> written parts >>>>>>>>>>>>>> and does not rely on s3 as a file system. >>>>>>>>>>>>>> I also include Kostas, he might add more details. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Just to keep in mind this problem with s3 and exclude it for >>>>>>>>>>>>>> sure I would also check whether the size of missing events is >>>>>>>>>>>>>> around the >>>>>>>>>>>>>> batch size of BucketingSink or not. You also wrote that the >>>>>>>>>>>>>> timestamps of >>>>>>>>>>>>>> lost event are 'probably' around the time of the savepoint, if >>>>>>>>>>>>>> it is not >>>>>>>>>>>>>> yet for sure I would also check it. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Have you already checked the log files of job manager and >>>>>>>>>>>>>> task managers for the job running before and after the restore >>>>>>>>>>>>>> from the >>>>>>>>>>>>>> check point? Is everything successful there, no errors, relevant >>>>>>>>>>>>>> warnings >>>>>>>>>>>>>> or exceptions? >>>>>>>>>>>>>> >>>>>>>>>>>>>> As the next step, I would suggest to log all encountered >>>>>>>>>>>>>> events in DistinctFunction.reduce if possible for production >>>>>>>>>>>>>> data and check >>>>>>>>>>>>>> whether the missed events are eventually processed before or >>>>>>>>>>>>>> after the >>>>>>>>>>>>>> savepoint. The following log message indicates a border between >>>>>>>>>>>>>> the events >>>>>>>>>>>>>> that should be included into the savepoint (logged before) or >>>>>>>>>>>>>> not: >>>>>>>>>>>>>> “{} ({}, synchronous part) in thread {} took {} ms” (template) >>>>>>>>>>>>>> Also check if the savepoint has been overall completed: >>>>>>>>>>>>>> "{} ({}, asynchronous part) in thread {} took {} ms." >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1] >>>>>>>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Using StreamingFileSink is not a convenient option for >>>>>>>>>>>>>> production use for us as it doesn't support s3*. I could use >>>>>>>>>>>>>> StreamingFileSink just to verify, but I don't see much point in >>>>>>>>>>>>>> doing so. >>>>>>>>>>>>>> Please consider my previous comment: >>>>>>>>>>>>>> >>>>>>>>>>>>>> > I realized that BucketingSink must not play any role in >>>>>>>>>>>>>> this problem. This is because only when the 24-hour window >>>>>>>>>>>>>> triggers, >>>>>>>>>>>>>> BucketingSink gets a burst of input. Around the state restoring >>>>>>>>>>>>>> point >>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose >>>>>>>>>>>>>> anything >>>>>>>>>>>>>> either (right?). >>>>>>>>>>>>>> >>>>>>>>>>>>>> I could also use a kafka sink instead, but I can't imagine >>>>>>>>>>>>>> how there could be any difference. It's very real that the sink >>>>>>>>>>>>>> doesn't get >>>>>>>>>>>>>> any input for a long time until the 24-hour window closes, and >>>>>>>>>>>>>> then it >>>>>>>>>>>>>> quickly writes out everything because it's not that much data >>>>>>>>>>>>>> eventually >>>>>>>>>>>>>> for the distinct values. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Any ideas for debugging what's happening around the savepoint >>>>>>>>>>>>>> & restoration time? >>>>>>>>>>>>>> >>>>>>>>>>>>>> *) I actually implemented StreamingFileSink as an alternative >>>>>>>>>>>>>> sink. This was before I came to realize that most likely the >>>>>>>>>>>>>> sink component >>>>>>>>>>>>>> has nothing to do with the data loss problem. I tried it with >>>>>>>>>>>>>> s3n:// path >>>>>>>>>>>>>> just to see an exception being thrown. In the source code I >>>>>>>>>>>>>> indeed then >>>>>>>>>>>>>> found an explicit check for the target path scheme to be >>>>>>>>>>>>>> "hdfs://". >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin < >>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Ok, I think before further debugging the window reduced >>>>>>>>>>>>>>> state, >>>>>>>>>>>>>>> could you try the new ‘StreamingFileSink’ [1] introduced in >>>>>>>>>>>>>>> Flink 1.6.0 instead of the previous 'BucketingSink’? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Yes, sorry for my confusing comment. I just meant that it >>>>>>>>>>>>>>> seems like there's a bug somewhere now that the output is >>>>>>>>>>>>>>> missing some data. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> > I would wait and check the actual output in s3 because it >>>>>>>>>>>>>>> is the main result of the job >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Yes, and that's what I have already done. There seems to be >>>>>>>>>>>>>>> always some data loss with the production data volumes, if the >>>>>>>>>>>>>>> job has been >>>>>>>>>>>>>>> restarted on that day. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Would you have any suggestions for how to debug this further? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Many thanks for stepping in. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin < >>>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So it is a per key deduplication job. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Yes, I would wait and check the actual output in s3 because >>>>>>>>>>>>>>>> it is the main result of the job and >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> > The late data around the time of taking savepoint might >>>>>>>>>>>>>>>> be not included into the savepoint but it should be behind the >>>>>>>>>>>>>>>> snapshotted >>>>>>>>>>>>>>>> offset in Kafka. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> is not a bug, it is a possible behaviour. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The savepoint is a snapshot of the data in transient which >>>>>>>>>>>>>>>> is already consumed from Kafka. >>>>>>>>>>>>>>>> Basically the full contents of the window result is split >>>>>>>>>>>>>>>> between the savepoint and what can come after the savepoint'ed >>>>>>>>>>>>>>>> offset in >>>>>>>>>>>>>>>> Kafka but before the window result is written into s3. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Allowed lateness should not affect it, I am just saying >>>>>>>>>>>>>>>> that the final result in s3 should include all records after >>>>>>>>>>>>>>>> it. >>>>>>>>>>>>>>>> This is what should be guaranteed but not the contents of >>>>>>>>>>>>>>>> the intermediate savepoint. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for your answer! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I check for the missed data from the final output on s3. So >>>>>>>>>>>>>>>> I wait until the next day, then run the same thing >>>>>>>>>>>>>>>> re-implemented in batch, >>>>>>>>>>>>>>>> and compare the output. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> > The late data around the time of taking savepoint might >>>>>>>>>>>>>>>> be not included into the savepoint but it should be behind the >>>>>>>>>>>>>>>> snapshotted >>>>>>>>>>>>>>>> offset in Kafka. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Yes, I would definitely expect that. It seems like there's >>>>>>>>>>>>>>>> a bug somewhere. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> > Then it should just come later after the restore and >>>>>>>>>>>>>>>> should be reduced within the allowed lateness into the final >>>>>>>>>>>>>>>> result which >>>>>>>>>>>>>>>> is saved into s3. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Well, as far as I know, allowed lateness doesn't play any >>>>>>>>>>>>>>>> role here, because I started running the job with >>>>>>>>>>>>>>>> allowedLateness=0, and >>>>>>>>>>>>>>>> still get the data loss, while my late data output doesn't >>>>>>>>>>>>>>>> receive anything. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> > Also, is this `DistinctFunction.reduce` just an example >>>>>>>>>>>>>>>> or the actual implementation, basically saving just one of >>>>>>>>>>>>>>>> records inside >>>>>>>>>>>>>>>> the 24h window in s3? then what is missing there? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Yes, it's the actual implementation. Note that there's a >>>>>>>>>>>>>>>> keyBy before the DistinctFunction. So there's one record for >>>>>>>>>>>>>>>> each key >>>>>>>>>>>>>>>> (which is the combination of a couple of fields). In practice >>>>>>>>>>>>>>>> I've seen >>>>>>>>>>>>>>>> that we're missing ~2000-4000 elements on each restore, and >>>>>>>>>>>>>>>> the total >>>>>>>>>>>>>>>> output is obviously much more than that. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Here's the full code for the key selector: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> public class MapKeySelector implements >>>>>>>>>>>>>>>> KeySelector<Map<String,String>, Object> { >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> private final String[] fields; >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> public MapKeySelector(String... fields) { >>>>>>>>>>>>>>>> this.fields = fields; >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>> public Object getKey(Map<String, String> event) throws >>>>>>>>>>>>>>>> Exception { >>>>>>>>>>>>>>>> Tuple key = >>>>>>>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance(); >>>>>>>>>>>>>>>> for (int i = 0; i < fields.length; i++) { >>>>>>>>>>>>>>>> key.setField(event.getOrDefault(fields[i], ""), >>>>>>>>>>>>>>>> i); >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> return key; >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> And a more exact example on how it's used: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .keyBy(new MapKeySelector("ID", >>>>>>>>>>>>>>>> "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE")) >>>>>>>>>>>>>>>> .timeWindow(Time.days(1)) >>>>>>>>>>>>>>>> .reduce(new DistinctFunction()) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin < >>>>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Where exactly does the data miss? When do you notice that? >>>>>>>>>>>>>>>>> Do you check it: >>>>>>>>>>>>>>>>> - debugging `DistinctFunction.reduce` right after resume >>>>>>>>>>>>>>>>> in the middle of the day >>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>> - some distinct records miss in the final output >>>>>>>>>>>>>>>>> of BucketingSink in s3 after window result is actually >>>>>>>>>>>>>>>>> triggered and saved >>>>>>>>>>>>>>>>> into s3 at the end of the day? is this the main output? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The late data around the time of taking savepoint might be >>>>>>>>>>>>>>>>> not included into the savepoint but it should be behind the >>>>>>>>>>>>>>>>> snapshotted >>>>>>>>>>>>>>>>> offset in Kafka. Then it should just come later after the >>>>>>>>>>>>>>>>> restore and >>>>>>>>>>>>>>>>> should be reduced within the allowed lateness into the final >>>>>>>>>>>>>>>>> result which >>>>>>>>>>>>>>>>> is saved into s3. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Also, is this `DistinctFunction.reduce` just an example or >>>>>>>>>>>>>>>>> the actual implementation, basically saving just one of >>>>>>>>>>>>>>>>> records inside the >>>>>>>>>>>>>>>>> 24h window in s3? then what is missing there? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I changed to allowedLateness=0, no change, still missing >>>>>>>>>>>>>>>>> data when restoring from savepoint. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio < >>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I realized that BucketingSink must not play any role in >>>>>>>>>>>>>>>>>> this problem. This is because only when the 24-hour window >>>>>>>>>>>>>>>>>> triggers, >>>>>>>>>>>>>>>>>> BucketinSink gets a burst of input. Around the state >>>>>>>>>>>>>>>>>> restoring point >>>>>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't >>>>>>>>>>>>>>>>>> lose anything >>>>>>>>>>>>>>>>>> either (right?). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I will next try removing the allowedLateness entirely >>>>>>>>>>>>>>>>>> from the equation. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> In the meanwhile, please let me know if you have any >>>>>>>>>>>>>>>>>> suggestions for debugging the lost data, for example what >>>>>>>>>>>>>>>>>> logs to enable. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known >>>>>>>>>>>>>>>>>> issues with that, that could contribute to lost data when >>>>>>>>>>>>>>>>>> restoring a >>>>>>>>>>>>>>>>>> savepoint? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio < >>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Some data is silently lost on my Flink stream job when >>>>>>>>>>>>>>>>>>> state is restored from a savepoint. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Do you have any debugging hints to find out where >>>>>>>>>>>>>>>>>>> exactly the data gets dropped? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> My job gathers distinct values using a 24-hour window. >>>>>>>>>>>>>>>>>>> It doesn't have any custom state management. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> When I cancel the job with savepoint and restore from >>>>>>>>>>>>>>>>>>> that savepoint, some data is missed. It seems to be losing >>>>>>>>>>>>>>>>>>> just a small >>>>>>>>>>>>>>>>>>> amount of data. The event time of lost data is probably >>>>>>>>>>>>>>>>>>> around the time of >>>>>>>>>>>>>>>>>>> savepoint. In other words the rest of the time window is >>>>>>>>>>>>>>>>>>> not entirely >>>>>>>>>>>>>>>>>>> missed – collection works correctly also for (most of the) >>>>>>>>>>>>>>>>>>> events that come >>>>>>>>>>>>>>>>>>> in after restoring. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> When the job processes a full 24-hour window without >>>>>>>>>>>>>>>>>>> interruptions it doesn't miss anything. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Usually the problem doesn't happen in test environments >>>>>>>>>>>>>>>>>>> that have smaller parallelism and smaller data volumes. But >>>>>>>>>>>>>>>>>>> in production >>>>>>>>>>>>>>>>>>> volumes the job seems to be consistently missing at least >>>>>>>>>>>>>>>>>>> something on >>>>>>>>>>>>>>>>>>> every restore. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> This issue has consistently happened since the job was >>>>>>>>>>>>>>>>>>> initially created. It was at first run on an older version >>>>>>>>>>>>>>>>>>> of Flink >>>>>>>>>>>>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & >>>>>>>>>>>>>>>>>>> 1.6.0. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I'm wondering if this could be for example some >>>>>>>>>>>>>>>>>>> synchronization issue between the kafka consumer offsets >>>>>>>>>>>>>>>>>>> vs. what's been >>>>>>>>>>>>>>>>>>> written by BucketingSink? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1. Job content, simplified >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> kafkaStream >>>>>>>>>>>>>>>>>>> .flatMap(new ExtractFieldsFunction()) >>>>>>>>>>>>>>>>>>> .keyBy(new MapKeySelector(1, 2, 3, 4)) >>>>>>>>>>>>>>>>>>> .timeWindow(Time.days(1)) >>>>>>>>>>>>>>>>>>> .allowedLateness(allowedLateness) >>>>>>>>>>>>>>>>>>> .sideOutputLateData(lateDataTag) >>>>>>>>>>>>>>>>>>> .reduce(new DistinctFunction()) >>>>>>>>>>>>>>>>>>> .addSink(sink) >>>>>>>>>>>>>>>>>>> // use a fixed number of output >>>>>>>>>>>>>>>>>>> partitions >>>>>>>>>>>>>>>>>>> .setParallelism(8)) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> /** >>>>>>>>>>>>>>>>>>> * Usage: .keyBy("the", "distinct", "fields").reduce(new >>>>>>>>>>>>>>>>>>> DistinctFunction()) >>>>>>>>>>>>>>>>>>> */ >>>>>>>>>>>>>>>>>>> public class DistinctFunction implements >>>>>>>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> { >>>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>>> public Map<String, String> reduce(Map<String, >>>>>>>>>>>>>>>>>>> String> value1, Map<String, String> value2) { >>>>>>>>>>>>>>>>>>> return value1; >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 2. State configuration >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> boolean enableIncrementalCheckpointing = true; >>>>>>>>>>>>>>>>>>> String statePath = "s3n://bucket/savepoints"; >>>>>>>>>>>>>>>>>>> new RocksDBStateBackend(statePath, >>>>>>>>>>>>>>>>>>> enableIncrementalCheckpointing); >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Checkpointing Mode Exactly Once >>>>>>>>>>>>>>>>>>> Interval 1m 0s >>>>>>>>>>>>>>>>>>> Timeout 10m 0s >>>>>>>>>>>>>>>>>>> Minimum Pause Between Checkpoints 1m 0s >>>>>>>>>>>>>>>>>>> Maximum Concurrent Checkpoints 1 >>>>>>>>>>>>>>>>>>> Persist Checkpoints Externally Enabled (retain on >>>>>>>>>>>>>>>>>>> cancellation) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 3. BucketingSink configuration >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> We use BucketingSink, I don't think there's anything >>>>>>>>>>>>>>>>>>> special here, if not the fact that we're writing to S3. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> String outputPath = "s3://bucket/output"; >>>>>>>>>>>>>>>>>>> BucketingSink<Map<String, String>> sink = new >>>>>>>>>>>>>>>>>>> BucketingSink<Map<String, String>>(outputPath) >>>>>>>>>>>>>>>>>>> .setBucketer(new ProcessdateBucketer()) >>>>>>>>>>>>>>>>>>> .setBatchSize(batchSize) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval); >>>>>>>>>>>>>>>>>>> sink.setWriter(new IdJsonWriter()); >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 4. Kafka & event time >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> My flink job reads the data from Kafka, using a >>>>>>>>>>>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka >>>>>>>>>>>>>>>>>>> consumer to >>>>>>>>>>>>>>>>>>> synchronize watermarks accross all kafka partitions. We >>>>>>>>>>>>>>>>>>> also write late >>>>>>>>>>>>>>>>>>> data to side output, but nothing is written there – if it >>>>>>>>>>>>>>>>>>> would, it could >>>>>>>>>>>>>>>>>>> explain missed data in the main output (I'm also sure that >>>>>>>>>>>>>>>>>>> our late data >>>>>>>>>>>>>>>>>>> writing works, because we previously had some actual late >>>>>>>>>>>>>>>>>>> data which ended >>>>>>>>>>>>>>>>>>> up there). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 5. allowedLateness >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> It may be or may not be relevant that I have also >>>>>>>>>>>>>>>>>>> enabled allowedLateness with 1 minute lateness on the >>>>>>>>>>>>>>>>>>> 24-hour window: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> If that makes sense, I could try removing >>>>>>>>>>>>>>>>>>> allowedLateness entirely? That would be just to rule out >>>>>>>>>>>>>>>>>>> that Flink doesn't >>>>>>>>>>>>>>>>>>> have a bug that's related to restoring state in combination >>>>>>>>>>>>>>>>>>> with the >>>>>>>>>>>>>>>>>>> allowedLateness feature. After all, all of our data should >>>>>>>>>>>>>>>>>>> be in a good >>>>>>>>>>>>>>>>>>> enough order to not be late, given the max out of orderness >>>>>>>>>>>>>>>>>>> used on kafka >>>>>>>>>>>>>>>>>>> consumer timestamp extractor. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thank you in advance! >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >