Hello, is there anyone that could help with this? On Fri, Jan 11, 2019 at 8:14 AM Juho Autio <juho.au...@rovio.com> wrote:
> Stefan, would you have time to comment? > > On Wednesday, January 2, 2019, Juho Autio <juho.au...@rovio.com> wrote: > >> Bump – does anyone know if Stefan will be available to comment the latest >> findings? Thanks. >> >> On Fri, Dec 21, 2018 at 2:33 PM Juho Autio <juho.au...@rovio.com> wrote: >> >>> Stefan, I managed to analyze savepoint with bravo. It seems that the >>> data that's missing from output *is* found in savepoint. >>> >>> I simplified my test case to the following: >>> >>> - job 1 has bee running for ~10 days >>> - savepoint X created & job 1 cancelled >>> - job 2 started with restore from savepoint X >>> >>> Then I waited until the next day so that job 2 has triggered the 24 hour >>> window. >>> >>> Then I analyzed the output & savepoint: >>> >>> - compare job 2 output with the output of a batch pyspark script => find >>> 4223 missing rows >>> - pick one of the missing rows (say, id Z) >>> - read savepoint X with bravo, filter for id Z => Z was found in the >>> savepoint! >>> >>> How can it be possible that the value is in state but doesn't end up in >>> output after state has been restored & window is eventually triggered? >>> >>> I also did similar analysis on the previous case where I savepointed & >>> restored the job multiple times (5) within the same 24-hour window. A >>> missing id that I drilled down to, was found in all of those savepoints, >>> yet missing from the output that gets written at the end of the day. This >>> is even more surprising: that the missing ID was written to the new >>> savepoints also after restoring. Is the reducer state somehow decoupled >>> from the window contents? >>> >>> Big thanks to bravo-developer Gyula for guiding me through to be able >>> read the reducer state! https://github.com/king/bravo/pull/11 >>> >>> Gyula also had an idea for how to troubleshoot the missing data in a >>> scalable way: I could add some "side effect kafka output" on individual >>> operators. This should allow tracking more closely at which point the data >>> gets lost. However, maybe this would have to be in some Flink's internal >>> components, and I'm not sure which those would be. >>> >>> Cheers, >>> Juho >>> >>> On Mon, Nov 19, 2018 at 11:52 AM Juho Autio <juho.au...@rovio.com> >>> wrote: >>> >>>> >>>> Hi Stefan, >>>> >>>> Bravo doesn't currently support reading a reducer state. I gave it a >>>> try but couldn't get to a working implementation yet. If anyone can provide >>>> some insight on how to make this work, please share at github: >>>> https://github.com/king/bravo/pull/11 >>>> >>>> Thanks. >>>> >>>> On Tue, Oct 23, 2018 at 3:32 PM Juho Autio <juho.au...@rovio.com> >>>> wrote: >>>> >>>>> I was glad to find that bravo had now been updated to support >>>>> installing bravo to a local maven repo. >>>>> >>>>> I was able to load a checkpoint created by my job, thanks to the >>>>> example provided in bravo README, but I'm still missing the essential >>>>> piece. >>>>> >>>>> My code was: >>>>> >>>>> OperatorStateReader reader = new OperatorStateReader(env2, >>>>> savepoint, "DistinctFunction"); >>>>> DontKnowWhatTypeThisIs reducingState = >>>>> reader.readKeyedStates(what should I put here?); >>>>> >>>>> I don't know how to read the values collected from reduce() calls in >>>>> the state. Is there a way to access the reducing state of the window with >>>>> bravo? I'm a bit confused how this works, because when I check with >>>>> debugger, flink internally uses a ReducingStateDescriptor >>>>> with name=window-contents, but still reading operator state for >>>>> "DistinctFunction" didn't at least throw an exception ("window-contents" >>>>> threw – obviously there's no operator by that name). >>>>> >>>>> Cheers, >>>>> Juho >>>>> >>>>> On Mon, Oct 15, 2018 at 2:25 PM Juho Autio <juho.au...@rovio.com> >>>>> wrote: >>>>> >>>>>> Hi Stefan, >>>>>> >>>>>> Sorry but it doesn't seem immediately clear to me what's a good way >>>>>> to use https://github.com/king/bravo. >>>>>> >>>>>> How are people using it? Would you for example modify build.gradle >>>>>> somehow to publish the bravo as a library locally/internally? Or add code >>>>>> directly in the bravo project (locally) and run it from there (using an >>>>>> IDE, for example)? Also it doesn't seem like the bravo gradle project >>>>>> supports building a flink job jar, but if it does, how do I do it? >>>>>> >>>>>> Thanks. >>>>>> >>>>>> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio <juho.au...@rovio.com> >>>>>> wrote: >>>>>> >>>>>>> Good then, I'll try to analyze the savepoints with Bravo. Thanks! >>>>>>> >>>>>>> > How would you assume that backpressure would influence your >>>>>>> updates? Updates to each local state still happen event-by-event, in a >>>>>>> single reader/writing thread. >>>>>>> >>>>>>> Sure, just an ignorant guess by me. I'm not familiar with most of >>>>>>> Flink's internals. Any way high backpressure is not a seen on this job >>>>>>> after it has caught up the lag, so at I thought it would be worth >>>>>>> mentioning. >>>>>>> >>>>>>> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter < >>>>>>> s.rich...@data-artisans.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.au...@rovio.com>: >>>>>>>> >>>>>>>> > you could take a look at Bravo [1] to query your savepoints and >>>>>>>> to check if the state in the savepoint complete w.r.t your expectations >>>>>>>> >>>>>>>> Thanks. I'm not 100% if this is the case, but to me it seemed like >>>>>>>> the missed ids were being logged by the reducer soon after the job had >>>>>>>> started (after restoring a savepoint). But on the other hand, after >>>>>>>> that I >>>>>>>> also made another savepoint & restored that, so what I could check is: >>>>>>>> does >>>>>>>> that next savepoint have the missed ids that were logged (a couple of >>>>>>>> minutes before the savepoint was created, so there should've been more >>>>>>>> than >>>>>>>> enough time to add them to the state before the savepoint was >>>>>>>> triggered) or >>>>>>>> not. Any way, if I would be able to verify with Bravo that the ids are >>>>>>>> missing from the savepoint (even though reduced logged that it saw >>>>>>>> them), >>>>>>>> would that help in figuring out where they are lost? Is there some >>>>>>>> major >>>>>>>> difference compared to just looking at the final output after window >>>>>>>> has >>>>>>>> been triggered? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I think that makes a difference. For example, you can investigate >>>>>>>> if there is a state loss or a problem with the windowing. In the >>>>>>>> savepoint >>>>>>>> you could see which keys exists and to which windows they are assigned. >>>>>>>> Also just to make sure there is no misunderstanding: only elements >>>>>>>> that are >>>>>>>> in the state at the start of a savepoint are expected to be part of the >>>>>>>> savepoint; all elements between start and completion of the savepoint >>>>>>>> are >>>>>>>> not expected to be part of the savepoint. >>>>>>>> >>>>>>>> >>>>>>>> > I also doubt that the problem is about backpressure after >>>>>>>> restore, because the job will only continue running after the state >>>>>>>> restore >>>>>>>> is already completed. >>>>>>>> >>>>>>>> Yes, I'm not suspecting that the state restoring would be the >>>>>>>> problem either. My concern was about backpressure possibly messing >>>>>>>> with the >>>>>>>> updates of reducing state? I would tend to suspect that updating the >>>>>>>> state >>>>>>>> consistently is what fails, where heavy load / backpressure might be a >>>>>>>> factor. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> How would you assume that backpressure would influence your >>>>>>>> updates? Updates to each local state still happen event-by-event, in a >>>>>>>> single reader/writing thread. >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter < >>>>>>>> s.rich...@data-artisans.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> you could take a look at Bravo [1] to query your savepoints and to >>>>>>>>> check if the state in the savepoint complete w.r.t your expectations. >>>>>>>>> I >>>>>>>>> somewhat doubt that there is a general problem with the >>>>>>>>> state/savepoints >>>>>>>>> because many users are successfully running it on a large state and I >>>>>>>>> am >>>>>>>>> not aware of any data loss problems, but nothing is impossible. What >>>>>>>>> the >>>>>>>>> savepoint does is also straight forward: iterate a db snapshot and >>>>>>>>> write >>>>>>>>> all key/value pairs to disk, so all data that was in the db at the >>>>>>>>> time of >>>>>>>>> the savepoint, should show up. I also doubt that the problem is about >>>>>>>>> backpressure after restore, because the job will only continue running >>>>>>>>> after the state restore is already completed. Did you check if you are >>>>>>>>> using exactly-one-semantics or at-least-once semantics? Also did you >>>>>>>>> check >>>>>>>>> that the kafka consumer start position is configured properly [2]? Are >>>>>>>>> watermarks generated as expected after restore? >>>>>>>>> >>>>>>>>> One more unrelated high-level comment that I have: for a >>>>>>>>> granularity of 24h windows, I wonder if it would not make sense to >>>>>>>>> use a >>>>>>>>> batch job instead? >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Stefan >>>>>>>>> >>>>>>>>> [1] https://github.com/king/bravo >>>>>>>>> [2] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration >>>>>>>>> >>>>>>>>> Am 04.10.2018 um 14:53 schrieb Juho Autio <juho.au...@rovio.com>: >>>>>>>>> >>>>>>>>> Thanks for the suggestions! >>>>>>>>> >>>>>>>>> > In general, it would be tremendously helpful to have a minimal >>>>>>>>> working example which allows to reproduce the problem. >>>>>>>>> >>>>>>>>> Definitely. The problem with reproducing has been that this only >>>>>>>>> seems to happen in the bigger production data volumes. >>>>>>>>> >>>>>>>>> That's why I'm hoping to find a way to debug this with the >>>>>>>>> production data. With that it seems to consistently cause some misses >>>>>>>>> every >>>>>>>>> time the job is killed/restored. >>>>>>>>> >>>>>>>>> > check if it happens for shorter windows, like 1h etc >>>>>>>>> >>>>>>>>> What would be the benefit of that compared to 24h window? >>>>>>>>> >>>>>>>>> > simplify the job to not use a reduce window but simply a time >>>>>>>>> window which outputs the window events. Then counting the input and >>>>>>>>> output >>>>>>>>> events should allow you to verify the results. If you are not seeing >>>>>>>>> missing events, then it could have something to do with the reducing >>>>>>>>> state >>>>>>>>> used in the reduce function. >>>>>>>>> >>>>>>>>> Hm, maybe, but not sure how useful that would be, because it >>>>>>>>> wouldn't yet prove that it's related to reducing, because not having a >>>>>>>>> reduce function could also mean smaller load on the job, which might >>>>>>>>> alone >>>>>>>>> be enough to make the problem not manifest. >>>>>>>>> >>>>>>>>> Is there a way to debug what goes into the reducing state >>>>>>>>> (including what gets removed or overwritten and what restored), if >>>>>>>>> that >>>>>>>>> makes sense..? Maybe some suitable logging could be used to prove >>>>>>>>> that the >>>>>>>>> lost data is written to the reducing state (or at least asked to be >>>>>>>>> written), but not found any more when the window closes and state is >>>>>>>>> flushed? >>>>>>>>> >>>>>>>>> On configuration once more, we're using RocksDB state backend with >>>>>>>>> asynchronous incremental checkpointing. The state is restored from >>>>>>>>> savepoints though, we haven't been using those checkpoints in these >>>>>>>>> tests >>>>>>>>> (although they could be used in case of crashes – but we haven't had >>>>>>>>> those >>>>>>>>> now). >>>>>>>>> >>>>>>>>> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann <trohrm...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Juho, >>>>>>>>>> >>>>>>>>>> another idea to further narrow down the problem could be to >>>>>>>>>> simplify the job to not use a reduce window but simply a time window >>>>>>>>>> which >>>>>>>>>> outputs the window events. Then counting the input and output events >>>>>>>>>> should >>>>>>>>>> allow you to verify the results. If you are not seeing missing >>>>>>>>>> events, then >>>>>>>>>> it could have something to do with the reducing state used in the >>>>>>>>>> reduce >>>>>>>>>> function. >>>>>>>>>> >>>>>>>>>> In general, it would be tremendously helpful to have a minimal >>>>>>>>>> working example which allows to reproduce the problem. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Till >>>>>>>>>> >>>>>>>>>> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin < >>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Juho, >>>>>>>>>>> >>>>>>>>>>> can you try to reduce the job to minimal reproducible example >>>>>>>>>>> and share the job and input? >>>>>>>>>>> >>>>>>>>>>> For example: >>>>>>>>>>> - some simple records as input, e.g. tuples of primitive types >>>>>>>>>>> saved as cvs >>>>>>>>>>> - minimal deduplication job which processes them and misses >>>>>>>>>>> records >>>>>>>>>>> - check if it happens for shorter windows, like 1h etc >>>>>>>>>>> - setup which you use for the job, ideally locally reproducible >>>>>>>>>>> or cloud >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Andrey >>>>>>>>>>> >>>>>>>>>>> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Sorry to insist, but we seem to be blocked for any serious usage >>>>>>>>>>> of state in Flink if we can't rely on it to not miss data in case of >>>>>>>>>>> restore. >>>>>>>>>>> >>>>>>>>>>> Would anyone have suggestions for how to troubleshoot this? So >>>>>>>>>>> far I have verified with DEBUG logs that our reduce function gets to >>>>>>>>>>> process also the data that is missing from window output. >>>>>>>>>>> >>>>>>>>>>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <juho.au...@rovio.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Andrey, >>>>>>>>>>>> >>>>>>>>>>>> To rule out for good any questions about sink behaviour, the >>>>>>>>>>>> job was killed and started with an additional Kafka sink. >>>>>>>>>>>> >>>>>>>>>>>> The same number of ids were missed in both outputs: KafkaSink & >>>>>>>>>>>> BucketingSink. >>>>>>>>>>>> >>>>>>>>>>>> I wonder what would be the next steps in debugging? >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio < >>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks, Andrey. >>>>>>>>>>>>> >>>>>>>>>>>>> > so it means that the savepoint does not loose at least some >>>>>>>>>>>>> dropped records. >>>>>>>>>>>>> >>>>>>>>>>>>> I'm not sure what you mean by that? I mean, it was known from >>>>>>>>>>>>> the beginning, that not everything is lost before/after restoring >>>>>>>>>>>>> a >>>>>>>>>>>>> savepoint, just some records around the time of restoration. It's >>>>>>>>>>>>> not 100% >>>>>>>>>>>>> clear whether records are lost before making a savepoint or after >>>>>>>>>>>>> restoring >>>>>>>>>>>>> it. Although, based on the new DEBUG logs it seems more like >>>>>>>>>>>>> losing some >>>>>>>>>>>>> records that are seen ~soon after restoring. It seems like Flink >>>>>>>>>>>>> would be >>>>>>>>>>>>> somehow confused either about the restored state vs. new inserts >>>>>>>>>>>>> to state. >>>>>>>>>>>>> This could also be somehow linked to the high back pressure on >>>>>>>>>>>>> the kafka >>>>>>>>>>>>> source while the stream is catching up. >>>>>>>>>>>>> >>>>>>>>>>>>> > If it is feasible for your setup, I suggest to insert one >>>>>>>>>>>>> more map function after reduce and before sink. >>>>>>>>>>>>> > etc. >>>>>>>>>>>>> >>>>>>>>>>>>> Isn't that the same thing that we discussed before? Nothing is >>>>>>>>>>>>> sent to BucketingSink before the window closes, so I don't see >>>>>>>>>>>>> how it would >>>>>>>>>>>>> make any difference if we replace the BucketingSink with a map >>>>>>>>>>>>> function or >>>>>>>>>>>>> another sink type. We don't create or restore savepoints during >>>>>>>>>>>>> the time >>>>>>>>>>>>> when BucketingSink gets input or has open buckets – that happens >>>>>>>>>>>>> at a much >>>>>>>>>>>>> later time of day. I would focus on figuring out why the records >>>>>>>>>>>>> are lost >>>>>>>>>>>>> while the window is open. But I don't know how to do that. Would >>>>>>>>>>>>> you have >>>>>>>>>>>>> any additional suggestions? >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin < >>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>> >>>>>>>>>>>>>> so it means that the savepoint does not loose at least some >>>>>>>>>>>>>> dropped records. >>>>>>>>>>>>>> >>>>>>>>>>>>>> If it is feasible for your setup, I suggest to insert one >>>>>>>>>>>>>> more map function after reduce and before sink. >>>>>>>>>>>>>> The map function should be called right after window is >>>>>>>>>>>>>> triggered but before flushing to s3. >>>>>>>>>>>>>> The result of reduce (deduped record) could be logged there. >>>>>>>>>>>>>> This should allow to check whether the processed distinct >>>>>>>>>>>>>> records were buffered in the state after the restoration from >>>>>>>>>>>>>> the savepoint >>>>>>>>>>>>>> or not. If they were buffered we should see that there was an >>>>>>>>>>>>>> attempt to >>>>>>>>>>>>>> write them to the sink from the state. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Another suggestion is to try to write records to some other >>>>>>>>>>>>>> sink or to both. >>>>>>>>>>>>>> E.g. if you can access file system of workers, maybe just >>>>>>>>>>>>>> into local files and check whether the records are also dropped >>>>>>>>>>>>>> there. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Andrey! >>>>>>>>>>>>>> >>>>>>>>>>>>>> I was finally able to gather the DEBUG logs that you >>>>>>>>>>>>>> suggested. In short, the reducer logged that it processed at >>>>>>>>>>>>>> least some of >>>>>>>>>>>>>> the ids that were missing from the output. >>>>>>>>>>>>>> >>>>>>>>>>>>>> "At least some", because I didn't have the job running with >>>>>>>>>>>>>> DEBUG logs for the full 24-hour window period. So I was only >>>>>>>>>>>>>> able to look >>>>>>>>>>>>>> up if I can find *some* of the missing ids in the DEBUG >>>>>>>>>>>>>> logs. Which I did indeed. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I changed the DistinctFunction.java to do this: >>>>>>>>>>>>>> >>>>>>>>>>>>>> @Override >>>>>>>>>>>>>> public Map<String, String> reduce(Map<String, String> >>>>>>>>>>>>>> value1, Map<String, String> value2) { >>>>>>>>>>>>>> LOG.debug("DistinctFunction.reduce returns: {}={}", >>>>>>>>>>>>>> value1.get("field"), value1.get("id")); >>>>>>>>>>>>>> return value1; >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> Then: >>>>>>>>>>>>>> >>>>>>>>>>>>>> vi flink-1.6.0/conf/log4j.properties >>>>>>>>>>>>>> >>>>>>>>>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG >>>>>>>>>>>>>> >>>>>>>>>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG >>>>>>>>>>>>>> >>>>>>>>>>>>>> Then I ran the following kind of test: >>>>>>>>>>>>>> >>>>>>>>>>>>>> - Cancelled the on-going job with savepoint created at ~Sep >>>>>>>>>>>>>> 18 08:35 UTC 2018 >>>>>>>>>>>>>> - Started a new cluster & job with DEBUG enabled at ~09:13, >>>>>>>>>>>>>> restored from that previous cluster's savepoint >>>>>>>>>>>>>> - Ran until caught up offsets >>>>>>>>>>>>>> - Cancelled the job with a new savepoint >>>>>>>>>>>>>> - Started a new job _without_ DEBUG, which restored the new >>>>>>>>>>>>>> savepoint, let it keep running so that it will eventually write >>>>>>>>>>>>>> the output >>>>>>>>>>>>>> >>>>>>>>>>>>>> Then on the next day, after results had been flushed when the >>>>>>>>>>>>>> 24-hour window closed, I compared the results again with a batch >>>>>>>>>>>>>> version's >>>>>>>>>>>>>> output. And found some missing ids as usual. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I drilled down to one specific missing id (I'm replacing the >>>>>>>>>>>>>> actual value with AN12345 below), which was not found in the >>>>>>>>>>>>>> stream output, >>>>>>>>>>>>>> but was found in batch output & flink DEBUG logs. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Related to that id, I gathered the following information: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2018-09-18~09:13:21,000 job started & savepoint is restored >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2018-09-18 09:14:29,085 missing id is processed for the first >>>>>>>>>>>>>> time, proved by this log line: >>>>>>>>>>>>>> 2018-09-18 09:14:29,085 DEBUG >>>>>>>>>>>>>> com.rovio.ds.flink.uniqueid.DistinctFunction - >>>>>>>>>>>>>> DistinctFunction.reduce returns: s.aid1=AN12345 >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint >>>>>>>>>>>>>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint >>>>>>>>>>>>>> >>>>>>>>>>>>>> ( >>>>>>>>>>>>>> more occurrences of checkpoints (~1 min checkpointing time + >>>>>>>>>>>>>> ~1 min delay before next) >>>>>>>>>>>>>> / >>>>>>>>>>>>>> more occurrences of DistinctFunction.reduce >>>>>>>>>>>>>> ) >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2018-09-18 09:23:45,053 missing id is processed for the last >>>>>>>>>>>>>> time >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled >>>>>>>>>>>>>> >>>>>>>>>>>>>> To be noted, there was high backpressure after restoring from >>>>>>>>>>>>>> savepoint until the stream caught up with the kafka offsets. >>>>>>>>>>>>>> Although, our >>>>>>>>>>>>>> job uses assign timestamps & watermarks on the flink kafka >>>>>>>>>>>>>> consumer itself, >>>>>>>>>>>>>> so event time of all partitions is synchronized. As expected, we >>>>>>>>>>>>>> don't get >>>>>>>>>>>>>> any late data in the late data side output. >>>>>>>>>>>>>> >>>>>>>>>>>>>> From this we can see that the missing ids are processed by >>>>>>>>>>>>>> the reducer, but they must get lost somewhere before the 24-hour >>>>>>>>>>>>>> window is >>>>>>>>>>>>>> triggered. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think it's worth mentioning once more that the stream >>>>>>>>>>>>>> doesn't miss any ids if we let it's running without >>>>>>>>>>>>>> interruptions / state >>>>>>>>>>>>>> restoring. >>>>>>>>>>>>>> >>>>>>>>>>>>>> What's next? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin < >>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> > only when the 24-hour window triggers, BucketingSink gets >>>>>>>>>>>>>>> a burst of input >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This is of course totally true, my understanding is the >>>>>>>>>>>>>>> same. We cannot exclude problem there for sure, just savepoints >>>>>>>>>>>>>>> are used a >>>>>>>>>>>>>>> lot w/o problem reports and BucketingSink is known to be >>>>>>>>>>>>>>> problematic with >>>>>>>>>>>>>>> s3. That is why, I asked you: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> > You also wrote that the timestamps of lost event are >>>>>>>>>>>>>>> 'probably' around the time of the savepoint, if it is not yet >>>>>>>>>>>>>>> for sure I >>>>>>>>>>>>>>> would also check it. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Although, bucketing sink might loose any data at the end of >>>>>>>>>>>>>>> the day (also from the middle). The fact, that it is always >>>>>>>>>>>>>>> around the time >>>>>>>>>>>>>>> of taking a savepoint and not random, is surely suspicious and >>>>>>>>>>>>>>> possible >>>>>>>>>>>>>>> savepoint failures need to be investigated. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regarding the s3 problem, s3 doc says: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> > The caveat is that if you make a HEAD or GET request to >>>>>>>>>>>>>>> the key name (to find if the object exists) before creating the >>>>>>>>>>>>>>> object, >>>>>>>>>>>>>>> Amazon S3 provides 'eventual consistency' for read-after-write. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The algorithm you suggest is how it is roughly implemented >>>>>>>>>>>>>>> now (BucketingSink.openNewPartFile). My understanding is that >>>>>>>>>>>>>>> 'eventual consistency’ means that even if you just created file >>>>>>>>>>>>>>> (its name >>>>>>>>>>>>>>> is key) it can be that you do not get it in the list or exists >>>>>>>>>>>>>>> (HEAD) >>>>>>>>>>>>>>> returns false and you risk to rewrite the previous part. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The BucketingSink was designed for a standard file system. >>>>>>>>>>>>>>> s3 is used over a file system wrapper atm but does not always >>>>>>>>>>>>>>> provide >>>>>>>>>>>>>>> normal file system guarantees. See also last example in [1]. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Andrey, thank you very much for the debugging suggestions, >>>>>>>>>>>>>>> I'll try them. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> In the meanwhile two more questions, please: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> > Just to keep in mind this problem with s3 and exclude it >>>>>>>>>>>>>>> for sure. I would also check whether the size of missing events >>>>>>>>>>>>>>> is around >>>>>>>>>>>>>>> the batch size of BucketingSink or not. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Fair enough, but I also want to focus on debugging the most >>>>>>>>>>>>>>> probable subject first. So what do you think about this – true >>>>>>>>>>>>>>> or false: >>>>>>>>>>>>>>> only when the 24-hour window triggers, BucketinSink gets a >>>>>>>>>>>>>>> burst of input. >>>>>>>>>>>>>>> Around the state restoring point (middle of the day) it doesn't >>>>>>>>>>>>>>> get any >>>>>>>>>>>>>>> input, so it can't lose anything either. Isn't this true, or >>>>>>>>>>>>>>> have I totally >>>>>>>>>>>>>>> missed how Flink works in triggering window results? I would >>>>>>>>>>>>>>> not expect >>>>>>>>>>>>>>> there to be any optimization that speculatively triggers early >>>>>>>>>>>>>>> results of a >>>>>>>>>>>>>>> regular time window to the downstream operators. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> > The old BucketingSink has in general problem with s3. >>>>>>>>>>>>>>> Internally BucketingSink queries s3 as a file system to list >>>>>>>>>>>>>>> already >>>>>>>>>>>>>>> written file parts (batches) and determine index of the next >>>>>>>>>>>>>>> part to start. >>>>>>>>>>>>>>> Due to eventual consistency of checking file existence in s3 >>>>>>>>>>>>>>> [1], the >>>>>>>>>>>>>>> BucketingSink can rewrite the previously written part and >>>>>>>>>>>>>>> basically loose >>>>>>>>>>>>>>> it. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I was wondering, what does S3's "read-after-write >>>>>>>>>>>>>>> consistency" (mentioned on the page you linked) actually mean. >>>>>>>>>>>>>>> It seems >>>>>>>>>>>>>>> that this might be possible: >>>>>>>>>>>>>>> - LIST keys, find current max index >>>>>>>>>>>>>>> - choose next index = max + 1 >>>>>>>>>>>>>>> - HEAD next index: if it exists, keep adding + 1 until key >>>>>>>>>>>>>>> doesn't exist on S3 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> But definitely sounds easier if a sink keeps track of files >>>>>>>>>>>>>>> in a way that's guaranteed to be consistent. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>> Juho >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin < >>>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> true, StreamingFileSink does not support s3 in 1.6.0, it is >>>>>>>>>>>>>>>> planned for the next 1.7 release, sorry for confusion. >>>>>>>>>>>>>>>> The old BucketingSink has in general problem with s3. >>>>>>>>>>>>>>>> Internally BucketingSink queries s3 as a file system >>>>>>>>>>>>>>>> to list already written file parts (batches) and determine >>>>>>>>>>>>>>>> index of the next part to start. Due to eventual consistency >>>>>>>>>>>>>>>> of checking >>>>>>>>>>>>>>>> file existence in s3 [1], the BucketingSink can rewrite the >>>>>>>>>>>>>>>> previously >>>>>>>>>>>>>>>> written part and basically loose it. It should be fixed for >>>>>>>>>>>>>>>> StreamingFileSink in 1.7 where Flink keeps its own track of >>>>>>>>>>>>>>>> written parts >>>>>>>>>>>>>>>> and does not rely on s3 as a file system. >>>>>>>>>>>>>>>> I also include Kostas, he might add more details. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Just to keep in mind this problem with s3 and exclude it >>>>>>>>>>>>>>>> for sure I would also check whether the size of missing >>>>>>>>>>>>>>>> events is around >>>>>>>>>>>>>>>> the batch size of BucketingSink or not. You also wrote that >>>>>>>>>>>>>>>> the timestamps >>>>>>>>>>>>>>>> of lost event are 'probably' around the time of the savepoint, >>>>>>>>>>>>>>>> if it is not >>>>>>>>>>>>>>>> yet for sure I would also check it. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Have you already checked the log files of job manager and >>>>>>>>>>>>>>>> task managers for the job running before and after the restore >>>>>>>>>>>>>>>> from the >>>>>>>>>>>>>>>> check point? Is everything successful there, no errors, >>>>>>>>>>>>>>>> relevant warnings >>>>>>>>>>>>>>>> or exceptions? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> As the next step, I would suggest to log all encountered >>>>>>>>>>>>>>>> events in DistinctFunction.reduce if possible for production >>>>>>>>>>>>>>>> data and check >>>>>>>>>>>>>>>> whether the missed events are eventually processed before or >>>>>>>>>>>>>>>> after the >>>>>>>>>>>>>>>> savepoint. The following log message indicates a border >>>>>>>>>>>>>>>> between the events >>>>>>>>>>>>>>>> that should be included into the savepoint (logged before) or >>>>>>>>>>>>>>>> not: >>>>>>>>>>>>>>>> “{} ({}, synchronous part) in thread {} took {} ms” >>>>>>>>>>>>>>>> (template) >>>>>>>>>>>>>>>> Also check if the savepoint has been overall completed: >>>>>>>>>>>>>>>> "{} ({}, asynchronous part) in thread {} took {} ms." >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Using StreamingFileSink is not a convenient option for >>>>>>>>>>>>>>>> production use for us as it doesn't support s3*. I could use >>>>>>>>>>>>>>>> StreamingFileSink just to verify, but I don't see much point >>>>>>>>>>>>>>>> in doing so. >>>>>>>>>>>>>>>> Please consider my previous comment: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> > I realized that BucketingSink must not play any role in >>>>>>>>>>>>>>>> this problem. This is because only when the 24-hour window >>>>>>>>>>>>>>>> triggers, >>>>>>>>>>>>>>>> BucketingSink gets a burst of input. Around the state >>>>>>>>>>>>>>>> restoring point >>>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose >>>>>>>>>>>>>>>> anything >>>>>>>>>>>>>>>> either (right?). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I could also use a kafka sink instead, but I can't imagine >>>>>>>>>>>>>>>> how there could be any difference. It's very real that the >>>>>>>>>>>>>>>> sink doesn't get >>>>>>>>>>>>>>>> any input for a long time until the 24-hour window closes, and >>>>>>>>>>>>>>>> then it >>>>>>>>>>>>>>>> quickly writes out everything because it's not that much data >>>>>>>>>>>>>>>> eventually >>>>>>>>>>>>>>>> for the distinct values. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Any ideas for debugging what's happening around the >>>>>>>>>>>>>>>> savepoint & restoration time? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *) I actually implemented StreamingFileSink as an >>>>>>>>>>>>>>>> alternative sink. This was before I came to realize that most >>>>>>>>>>>>>>>> likely the >>>>>>>>>>>>>>>> sink component has nothing to do with the data loss problem. I >>>>>>>>>>>>>>>> tried it >>>>>>>>>>>>>>>> with s3n:// path just to see an exception being thrown. In the >>>>>>>>>>>>>>>> source code >>>>>>>>>>>>>>>> I indeed then found an explicit check for the target path >>>>>>>>>>>>>>>> scheme to be >>>>>>>>>>>>>>>> "hdfs://". >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin < >>>>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Ok, I think before further debugging the window reduced >>>>>>>>>>>>>>>>> state, >>>>>>>>>>>>>>>>> could you try the new ‘StreamingFileSink’ [1] introduced >>>>>>>>>>>>>>>>> in Flink 1.6.0 instead of the previous 'BucketingSink’? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Yes, sorry for my confusing comment. I just meant that it >>>>>>>>>>>>>>>>> seems like there's a bug somewhere now that the output is >>>>>>>>>>>>>>>>> missing some data. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > I would wait and check the actual output in s3 because >>>>>>>>>>>>>>>>> it is the main result of the job >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Yes, and that's what I have already done. There seems to >>>>>>>>>>>>>>>>> be always some data loss with the production data volumes, if >>>>>>>>>>>>>>>>> the job has >>>>>>>>>>>>>>>>> been restarted on that day. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Would you have any suggestions for how to debug this >>>>>>>>>>>>>>>>> further? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Many thanks for stepping in. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin < >>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> So it is a per key deduplication job. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Yes, I would wait and check the actual output in s3 >>>>>>>>>>>>>>>>>> because it is the main result of the job and >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> > The late data around the time of taking savepoint might >>>>>>>>>>>>>>>>>> be not included into the savepoint but it should be behind >>>>>>>>>>>>>>>>>> the snapshotted >>>>>>>>>>>>>>>>>> offset in Kafka. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> is not a bug, it is a possible behaviour. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The savepoint is a snapshot of the data in transient >>>>>>>>>>>>>>>>>> which is already consumed from Kafka. >>>>>>>>>>>>>>>>>> Basically the full contents of the window result is split >>>>>>>>>>>>>>>>>> between the savepoint and what can come after the >>>>>>>>>>>>>>>>>> savepoint'ed offset in >>>>>>>>>>>>>>>>>> Kafka but before the window result is written into s3. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Allowed lateness should not affect it, I am just saying >>>>>>>>>>>>>>>>>> that the final result in s3 should include all records after >>>>>>>>>>>>>>>>>> it. >>>>>>>>>>>>>>>>>> This is what should be guaranteed but not the contents of >>>>>>>>>>>>>>>>>> the intermediate savepoint. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio < >>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for your answer! >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I check for the missed data from the final output on s3. >>>>>>>>>>>>>>>>>> So I wait until the next day, then run the same thing >>>>>>>>>>>>>>>>>> re-implemented in >>>>>>>>>>>>>>>>>> batch, and compare the output. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> > The late data around the time of taking savepoint might >>>>>>>>>>>>>>>>>> be not included into the savepoint but it should be behind >>>>>>>>>>>>>>>>>> the snapshotted >>>>>>>>>>>>>>>>>> offset in Kafka. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Yes, I would definitely expect that. It seems like >>>>>>>>>>>>>>>>>> there's a bug somewhere. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> > Then it should just come later after the restore and >>>>>>>>>>>>>>>>>> should be reduced within the allowed lateness into the final >>>>>>>>>>>>>>>>>> result which >>>>>>>>>>>>>>>>>> is saved into s3. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Well, as far as I know, allowed lateness doesn't play any >>>>>>>>>>>>>>>>>> role here, because I started running the job with >>>>>>>>>>>>>>>>>> allowedLateness=0, and >>>>>>>>>>>>>>>>>> still get the data loss, while my late data output doesn't >>>>>>>>>>>>>>>>>> receive anything. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> > Also, is this `DistinctFunction.reduce` just an example >>>>>>>>>>>>>>>>>> or the actual implementation, basically saving just one of >>>>>>>>>>>>>>>>>> records inside >>>>>>>>>>>>>>>>>> the 24h window in s3? then what is missing there? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Yes, it's the actual implementation. Note that there's a >>>>>>>>>>>>>>>>>> keyBy before the DistinctFunction. So there's one record for >>>>>>>>>>>>>>>>>> each key >>>>>>>>>>>>>>>>>> (which is the combination of a couple of fields). In >>>>>>>>>>>>>>>>>> practice I've seen >>>>>>>>>>>>>>>>>> that we're missing ~2000-4000 elements on each restore, and >>>>>>>>>>>>>>>>>> the total >>>>>>>>>>>>>>>>>> output is obviously much more than that. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Here's the full code for the key selector: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> public class MapKeySelector implements >>>>>>>>>>>>>>>>>> KeySelector<Map<String,String>, Object> { >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> private final String[] fields; >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> public MapKeySelector(String... fields) { >>>>>>>>>>>>>>>>>> this.fields = fields; >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>> public Object getKey(Map<String, String> event) >>>>>>>>>>>>>>>>>> throws Exception { >>>>>>>>>>>>>>>>>> Tuple key = >>>>>>>>>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance(); >>>>>>>>>>>>>>>>>> for (int i = 0; i < fields.length; i++) { >>>>>>>>>>>>>>>>>> key.setField(event.getOrDefault(fields[i], >>>>>>>>>>>>>>>>>> ""), i); >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> return key; >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> And a more exact example on how it's used: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> .keyBy(new MapKeySelector("ID", >>>>>>>>>>>>>>>>>> "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE")) >>>>>>>>>>>>>>>>>> .timeWindow(Time.days(1)) >>>>>>>>>>>>>>>>>> .reduce(new DistinctFunction()) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin < >>>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Where exactly does the data miss? When do you notice >>>>>>>>>>>>>>>>>>> that? >>>>>>>>>>>>>>>>>>> Do you check it: >>>>>>>>>>>>>>>>>>> - debugging `DistinctFunction.reduce` right after resume >>>>>>>>>>>>>>>>>>> in the middle of the day >>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>> - some distinct records miss in the final output >>>>>>>>>>>>>>>>>>> of BucketingSink in s3 after window result is actually >>>>>>>>>>>>>>>>>>> triggered and saved >>>>>>>>>>>>>>>>>>> into s3 at the end of the day? is this the main output? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The late data around the time of taking savepoint might >>>>>>>>>>>>>>>>>>> be not included into the savepoint but it should be behind >>>>>>>>>>>>>>>>>>> the snapshotted >>>>>>>>>>>>>>>>>>> offset in Kafka. Then it should just come later after the >>>>>>>>>>>>>>>>>>> restore and >>>>>>>>>>>>>>>>>>> should be reduced within the allowed lateness into the >>>>>>>>>>>>>>>>>>> final result which >>>>>>>>>>>>>>>>>>> is saved into s3. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Also, is this `DistinctFunction.reduce` just an example >>>>>>>>>>>>>>>>>>> or the actual implementation, basically saving just one of >>>>>>>>>>>>>>>>>>> records inside >>>>>>>>>>>>>>>>>>> the 24h window in s3? then what is missing there? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio < >>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I changed to allowedLateness=0, no change, still missing >>>>>>>>>>>>>>>>>>> data when restoring from savepoint. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio < >>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I realized that BucketingSink must not play any role in >>>>>>>>>>>>>>>>>>>> this problem. This is because only when the 24-hour window >>>>>>>>>>>>>>>>>>>> triggers, >>>>>>>>>>>>>>>>>>>> BucketinSink gets a burst of input. Around the state >>>>>>>>>>>>>>>>>>>> restoring point >>>>>>>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't >>>>>>>>>>>>>>>>>>>> lose anything >>>>>>>>>>>>>>>>>>>> either (right?). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I will next try removing the allowedLateness entirely >>>>>>>>>>>>>>>>>>>> from the equation. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> In the meanwhile, please let me know if you have any >>>>>>>>>>>>>>>>>>>> suggestions for debugging the lost data, for example what >>>>>>>>>>>>>>>>>>>> logs to enable. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known >>>>>>>>>>>>>>>>>>>> issues with that, that could contribute to lost data when >>>>>>>>>>>>>>>>>>>> restoring a >>>>>>>>>>>>>>>>>>>> savepoint? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio < >>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Some data is silently lost on my Flink stream job when >>>>>>>>>>>>>>>>>>>>> state is restored from a savepoint. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Do you have any debugging hints to find out where >>>>>>>>>>>>>>>>>>>>> exactly the data gets dropped? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> My job gathers distinct values using a 24-hour window. >>>>>>>>>>>>>>>>>>>>> It doesn't have any custom state management. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> When I cancel the job with savepoint and restore from >>>>>>>>>>>>>>>>>>>>> that savepoint, some data is missed. It seems to be >>>>>>>>>>>>>>>>>>>>> losing just a small >>>>>>>>>>>>>>>>>>>>> amount of data. The event time of lost data is probably >>>>>>>>>>>>>>>>>>>>> around the time of >>>>>>>>>>>>>>>>>>>>> savepoint. In other words the rest of the time window is >>>>>>>>>>>>>>>>>>>>> not entirely >>>>>>>>>>>>>>>>>>>>> missed – collection works correctly also for (most of >>>>>>>>>>>>>>>>>>>>> the) events that come >>>>>>>>>>>>>>>>>>>>> in after restoring. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> When the job processes a full 24-hour window without >>>>>>>>>>>>>>>>>>>>> interruptions it doesn't miss anything. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Usually the problem doesn't happen in test >>>>>>>>>>>>>>>>>>>>> environments that have smaller parallelism and smaller >>>>>>>>>>>>>>>>>>>>> data volumes. But in >>>>>>>>>>>>>>>>>>>>> production volumes the job seems to be consistently >>>>>>>>>>>>>>>>>>>>> missing at least >>>>>>>>>>>>>>>>>>>>> something on every restore. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> This issue has consistently happened since the job was >>>>>>>>>>>>>>>>>>>>> initially created. It was at first run on an older >>>>>>>>>>>>>>>>>>>>> version of Flink >>>>>>>>>>>>>>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & >>>>>>>>>>>>>>>>>>>>> 1.6.0. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I'm wondering if this could be for example some >>>>>>>>>>>>>>>>>>>>> synchronization issue between the kafka consumer offsets >>>>>>>>>>>>>>>>>>>>> vs. what's been >>>>>>>>>>>>>>>>>>>>> written by BucketingSink? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 1. Job content, simplified >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> kafkaStream >>>>>>>>>>>>>>>>>>>>> .flatMap(new ExtractFieldsFunction()) >>>>>>>>>>>>>>>>>>>>> .keyBy(new MapKeySelector(1, 2, 3, 4)) >>>>>>>>>>>>>>>>>>>>> .timeWindow(Time.days(1)) >>>>>>>>>>>>>>>>>>>>> .allowedLateness(allowedLateness) >>>>>>>>>>>>>>>>>>>>> .sideOutputLateData(lateDataTag) >>>>>>>>>>>>>>>>>>>>> .reduce(new DistinctFunction()) >>>>>>>>>>>>>>>>>>>>> .addSink(sink) >>>>>>>>>>>>>>>>>>>>> // use a fixed number of output >>>>>>>>>>>>>>>>>>>>> partitions >>>>>>>>>>>>>>>>>>>>> .setParallelism(8)) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> /** >>>>>>>>>>>>>>>>>>>>> * Usage: .keyBy("the", "distinct", >>>>>>>>>>>>>>>>>>>>> "fields").reduce(new DistinctFunction()) >>>>>>>>>>>>>>>>>>>>> */ >>>>>>>>>>>>>>>>>>>>> public class DistinctFunction implements >>>>>>>>>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> { >>>>>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>>>>> public Map<String, String> reduce(Map<String, >>>>>>>>>>>>>>>>>>>>> String> value1, Map<String, String> value2) { >>>>>>>>>>>>>>>>>>>>> return value1; >>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 2. State configuration >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> boolean enableIncrementalCheckpointing = true; >>>>>>>>>>>>>>>>>>>>> String statePath = "s3n://bucket/savepoints"; >>>>>>>>>>>>>>>>>>>>> new RocksDBStateBackend(statePath, >>>>>>>>>>>>>>>>>>>>> enableIncrementalCheckpointing); >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Checkpointing Mode Exactly Once >>>>>>>>>>>>>>>>>>>>> Interval 1m 0s >>>>>>>>>>>>>>>>>>>>> Timeout 10m 0s >>>>>>>>>>>>>>>>>>>>> Minimum Pause Between Checkpoints 1m 0s >>>>>>>>>>>>>>>>>>>>> Maximum Concurrent Checkpoints 1 >>>>>>>>>>>>>>>>>>>>> Persist Checkpoints Externally Enabled (retain on >>>>>>>>>>>>>>>>>>>>> cancellation) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 3. BucketingSink configuration >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> We use BucketingSink, I don't think there's anything >>>>>>>>>>>>>>>>>>>>> special here, if not the fact that we're writing to S3. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> String outputPath = "s3://bucket/output"; >>>>>>>>>>>>>>>>>>>>> BucketingSink<Map<String, String>> sink = new >>>>>>>>>>>>>>>>>>>>> BucketingSink<Map<String, String>>(outputPath) >>>>>>>>>>>>>>>>>>>>> .setBucketer(new ProcessdateBucketer()) >>>>>>>>>>>>>>>>>>>>> .setBatchSize(batchSize) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval); >>>>>>>>>>>>>>>>>>>>> sink.setWriter(new IdJsonWriter()); >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 4. Kafka & event time >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> My flink job reads the data from Kafka, using a >>>>>>>>>>>>>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka >>>>>>>>>>>>>>>>>>>>> consumer to >>>>>>>>>>>>>>>>>>>>> synchronize watermarks accross all kafka partitions. We >>>>>>>>>>>>>>>>>>>>> also write late >>>>>>>>>>>>>>>>>>>>> data to side output, but nothing is written there – if it >>>>>>>>>>>>>>>>>>>>> would, it could >>>>>>>>>>>>>>>>>>>>> explain missed data in the main output (I'm also sure >>>>>>>>>>>>>>>>>>>>> that our late data >>>>>>>>>>>>>>>>>>>>> writing works, because we previously had some actual late >>>>>>>>>>>>>>>>>>>>> data which ended >>>>>>>>>>>>>>>>>>>>> up there). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 5. allowedLateness >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> It may be or may not be relevant that I have also >>>>>>>>>>>>>>>>>>>>> enabled allowedLateness with 1 minute lateness on the >>>>>>>>>>>>>>>>>>>>> 24-hour window: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> If that makes sense, I could try removing >>>>>>>>>>>>>>>>>>>>> allowedLateness entirely? That would be just to rule out >>>>>>>>>>>>>>>>>>>>> that Flink doesn't >>>>>>>>>>>>>>>>>>>>> have a bug that's related to restoring state in >>>>>>>>>>>>>>>>>>>>> combination with the >>>>>>>>>>>>>>>>>>>>> allowedLateness feature. After all, all of our data >>>>>>>>>>>>>>>>>>>>> should be in a good >>>>>>>>>>>>>>>>>>>>> enough order to not be late, given the max out of >>>>>>>>>>>>>>>>>>>>> orderness used on kafka >>>>>>>>>>>>>>>>>>>>> consumer timestamp extractor. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thank you in advance! >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > >