Sorry not posting on the mail list was my mistake :/
On Wed, 13 Feb 2019 at 15:01, Juho Autio <juho.au...@rovio.com> wrote: > Thanks for stepping in, did you post outside of the mailing list on > purpose btw? > > This I did long time ago: > > To rule out for good any questions about sink behaviour, the job was >> killed and started with an additional Kafka sink. >> The same number of ids were missed in both outputs: KafkaSink & >> BucketingSink. > > > (I wrote about that On Oct 1, 2018 in this email thread) > > After that I did the savepoint analysis with Bravo. > > Currently I'm indeed trying to get suggestions how to debug further, for > example, where to add additional kafka output, to catch where the data gets > lost. That would probably be somewhere in Flink's internals. > > I could try to share the full code also, but IMHO the problem has been > quite well narrowed down, considering that data can be found in savepoint, > savepoint is successfully restored, and after restoring the data doesn't go > to "user code" (like the reducer) any more. > > On Wed, Feb 13, 2019 at 3:47 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > >> Hi Juho! >> I think the reason you are not getting much answers here is because it is >> very hard to debug this problem remotely. >> Seemingly you do very normal operations, the state contains all the >> required data and nobody else has hit a similar problem for ages. >> >> My best guess would be some bug with the deduplication or output writing >> logic but without a complete code example its very hard to say anything >> useful. >> Did you try writing it to Kafka to see if the output is there? (that way >> we could rule out the dedup probllem) >> >> Cheers, >> Gyula >> >> On Wed, Feb 13, 2019 at 2:37 PM Juho Autio <juho.au...@rovio.com> wrote: >> >>> Stefan (or anyone!), please, could I have some feedback on the findings >>> that I reported on Dec 21, 2018? This is still a major blocker.. >>> >>> On Thu, Jan 31, 2019 at 11:46 AM Juho Autio <juho.au...@rovio.com> >>> wrote: >>> >>>> Hello, is there anyone that could help with this? >>>> >>>> On Fri, Jan 11, 2019 at 8:14 AM Juho Autio <juho.au...@rovio.com> >>>> wrote: >>>> >>>>> Stefan, would you have time to comment? >>>>> >>>>> On Wednesday, January 2, 2019, Juho Autio <juho.au...@rovio.com> >>>>> wrote: >>>>> >>>>>> Bump – does anyone know if Stefan will be available to comment the >>>>>> latest findings? Thanks. >>>>>> >>>>>> On Fri, Dec 21, 2018 at 2:33 PM Juho Autio <juho.au...@rovio.com> >>>>>> wrote: >>>>>> >>>>>>> Stefan, I managed to analyze savepoint with bravo. It seems that the >>>>>>> data that's missing from output *is* found in savepoint. >>>>>>> >>>>>>> I simplified my test case to the following: >>>>>>> >>>>>>> - job 1 has bee running for ~10 days >>>>>>> - savepoint X created & job 1 cancelled >>>>>>> - job 2 started with restore from savepoint X >>>>>>> >>>>>>> Then I waited until the next day so that job 2 has triggered the 24 >>>>>>> hour window. >>>>>>> >>>>>>> Then I analyzed the output & savepoint: >>>>>>> >>>>>>> - compare job 2 output with the output of a batch pyspark script => >>>>>>> find 4223 missing rows >>>>>>> - pick one of the missing rows (say, id Z) >>>>>>> - read savepoint X with bravo, filter for id Z => Z was found in the >>>>>>> savepoint! >>>>>>> >>>>>>> How can it be possible that the value is in state but doesn't end up >>>>>>> in output after state has been restored & window is eventually >>>>>>> triggered? >>>>>>> >>>>>>> I also did similar analysis on the previous case where I savepointed >>>>>>> & restored the job multiple times (5) within the same 24-hour window. A >>>>>>> missing id that I drilled down to, was found in all of those savepoints, >>>>>>> yet missing from the output that gets written at the end of the day. >>>>>>> This >>>>>>> is even more surprising: that the missing ID was written to the new >>>>>>> savepoints also after restoring. Is the reducer state somehow decoupled >>>>>>> from the window contents? >>>>>>> >>>>>>> Big thanks to bravo-developer Gyula for guiding me through to be >>>>>>> able read the reducer state! https://github.com/king/bravo/pull/11 >>>>>>> >>>>>>> Gyula also had an idea for how to troubleshoot the missing data in a >>>>>>> scalable way: I could add some "side effect kafka output" on individual >>>>>>> operators. This should allow tracking more closely at which point the >>>>>>> data >>>>>>> gets lost. However, maybe this would have to be in some Flink's internal >>>>>>> components, and I'm not sure which those would be. >>>>>>> >>>>>>> Cheers, >>>>>>> Juho >>>>>>> >>>>>>> On Mon, Nov 19, 2018 at 11:52 AM Juho Autio <juho.au...@rovio.com> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> Hi Stefan, >>>>>>>> >>>>>>>> Bravo doesn't currently support reading a reducer state. I gave it >>>>>>>> a try but couldn't get to a working implementation yet. If anyone can >>>>>>>> provide some insight on how to make this work, please share at github: >>>>>>>> https://github.com/king/bravo/pull/11 >>>>>>>> >>>>>>>> Thanks. >>>>>>>> >>>>>>>> On Tue, Oct 23, 2018 at 3:32 PM Juho Autio <juho.au...@rovio.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I was glad to find that bravo had now been updated to support >>>>>>>>> installing bravo to a local maven repo. >>>>>>>>> >>>>>>>>> I was able to load a checkpoint created by my job, thanks to the >>>>>>>>> example provided in bravo README, but I'm still missing the essential >>>>>>>>> piece. >>>>>>>>> >>>>>>>>> My code was: >>>>>>>>> >>>>>>>>> OperatorStateReader reader = new OperatorStateReader(env2, >>>>>>>>> savepoint, "DistinctFunction"); >>>>>>>>> DontKnowWhatTypeThisIs reducingState = >>>>>>>>> reader.readKeyedStates(what should I put here?); >>>>>>>>> >>>>>>>>> I don't know how to read the values collected from reduce() calls >>>>>>>>> in the state. Is there a way to access the reducing state of the >>>>>>>>> window >>>>>>>>> with bravo? I'm a bit confused how this works, because when I check >>>>>>>>> with >>>>>>>>> debugger, flink internally uses a ReducingStateDescriptor >>>>>>>>> with name=window-contents, but still reading operator state for >>>>>>>>> "DistinctFunction" didn't at least throw an exception >>>>>>>>> ("window-contents" >>>>>>>>> threw – obviously there's no operator by that name). >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Juho >>>>>>>>> >>>>>>>>> On Mon, Oct 15, 2018 at 2:25 PM Juho Autio <juho.au...@rovio.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Stefan, >>>>>>>>>> >>>>>>>>>> Sorry but it doesn't seem immediately clear to me what's a good >>>>>>>>>> way to use https://github.com/king/bravo. >>>>>>>>>> >>>>>>>>>> How are people using it? Would you for example modify >>>>>>>>>> build.gradle somehow to publish the bravo as a library >>>>>>>>>> locally/internally? >>>>>>>>>> Or add code directly in the bravo project (locally) and run it from >>>>>>>>>> there >>>>>>>>>> (using an IDE, for example)? Also it doesn't seem like the bravo >>>>>>>>>> gradle >>>>>>>>>> project supports building a flink job jar, but if it does, how do I >>>>>>>>>> do it? >>>>>>>>>> >>>>>>>>>> Thanks. >>>>>>>>>> >>>>>>>>>> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio <juho.au...@rovio.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Good then, I'll try to analyze the savepoints with Bravo. Thanks! >>>>>>>>>>> >>>>>>>>>>> > How would you assume that backpressure would influence your >>>>>>>>>>> updates? Updates to each local state still happen event-by-event, >>>>>>>>>>> in a >>>>>>>>>>> single reader/writing thread. >>>>>>>>>>> >>>>>>>>>>> Sure, just an ignorant guess by me. I'm not familiar with most >>>>>>>>>>> of Flink's internals. Any way high backpressure is not a seen on >>>>>>>>>>> this job >>>>>>>>>>> after it has caught up the lag, so at I thought it would be worth >>>>>>>>>>> mentioning. >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter < >>>>>>>>>>> s.rich...@data-artisans.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.au...@rovio.com >>>>>>>>>>>> >: >>>>>>>>>>>> >>>>>>>>>>>> > you could take a look at Bravo [1] to query your savepoints >>>>>>>>>>>> and to check if the state in the savepoint complete w.r.t your >>>>>>>>>>>> expectations >>>>>>>>>>>> >>>>>>>>>>>> Thanks. I'm not 100% if this is the case, but to me it seemed >>>>>>>>>>>> like the missed ids were being logged by the reducer soon after >>>>>>>>>>>> the job had >>>>>>>>>>>> started (after restoring a savepoint). But on the other hand, >>>>>>>>>>>> after that I >>>>>>>>>>>> also made another savepoint & restored that, so what I could check >>>>>>>>>>>> is: does >>>>>>>>>>>> that next savepoint have the missed ids that were logged (a couple >>>>>>>>>>>> of >>>>>>>>>>>> minutes before the savepoint was created, so there should've been >>>>>>>>>>>> more than >>>>>>>>>>>> enough time to add them to the state before the savepoint was >>>>>>>>>>>> triggered) or >>>>>>>>>>>> not. Any way, if I would be able to verify with Bravo that the ids >>>>>>>>>>>> are >>>>>>>>>>>> missing from the savepoint (even though reduced logged that it saw >>>>>>>>>>>> them), >>>>>>>>>>>> would that help in figuring out where they are lost? Is there some >>>>>>>>>>>> major >>>>>>>>>>>> difference compared to just looking at the final output after >>>>>>>>>>>> window has >>>>>>>>>>>> been triggered? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I think that makes a difference. For example, you can >>>>>>>>>>>> investigate if there is a state loss or a problem with the >>>>>>>>>>>> windowing. In >>>>>>>>>>>> the savepoint you could see which keys exists and to which windows >>>>>>>>>>>> they are >>>>>>>>>>>> assigned. Also just to make sure there is no misunderstanding: only >>>>>>>>>>>> elements that are in the state at the start of a savepoint are >>>>>>>>>>>> expected to >>>>>>>>>>>> be part of the savepoint; all elements between start and >>>>>>>>>>>> completion of the >>>>>>>>>>>> savepoint are not expected to be part of the savepoint. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> > I also doubt that the problem is about backpressure after >>>>>>>>>>>> restore, because the job will only continue running after the >>>>>>>>>>>> state restore >>>>>>>>>>>> is already completed. >>>>>>>>>>>> >>>>>>>>>>>> Yes, I'm not suspecting that the state restoring would be the >>>>>>>>>>>> problem either. My concern was about backpressure possibly messing >>>>>>>>>>>> with the >>>>>>>>>>>> updates of reducing state? I would tend to suspect that updating >>>>>>>>>>>> the state >>>>>>>>>>>> consistently is what fails, where heavy load / backpressure might >>>>>>>>>>>> be a >>>>>>>>>>>> factor. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> How would you assume that backpressure would influence your >>>>>>>>>>>> updates? Updates to each local state still happen event-by-event, >>>>>>>>>>>> in a >>>>>>>>>>>> single reader/writing thread. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter < >>>>>>>>>>>> s.rich...@data-artisans.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> you could take a look at Bravo [1] to query your savepoints >>>>>>>>>>>>> and to check if the state in the savepoint complete w.r.t your >>>>>>>>>>>>> expectations. I somewhat doubt that there is a general problem >>>>>>>>>>>>> with the >>>>>>>>>>>>> state/savepoints because many users are successfully running it >>>>>>>>>>>>> on a large >>>>>>>>>>>>> state and I am not aware of any data loss problems, but nothing is >>>>>>>>>>>>> impossible. What the savepoint does is also straight forward: >>>>>>>>>>>>> iterate a db >>>>>>>>>>>>> snapshot and write all key/value pairs to disk, so all data that >>>>>>>>>>>>> was in the >>>>>>>>>>>>> db at the time of the savepoint, should show up. I also doubt >>>>>>>>>>>>> that the >>>>>>>>>>>>> problem is about backpressure after restore, because the job will >>>>>>>>>>>>> only >>>>>>>>>>>>> continue running after the state restore is already completed. >>>>>>>>>>>>> Did you >>>>>>>>>>>>> check if you are using exactly-one-semantics or at-least-once >>>>>>>>>>>>> semantics? >>>>>>>>>>>>> Also did you check that the kafka consumer start position is >>>>>>>>>>>>> configured >>>>>>>>>>>>> properly [2]? Are watermarks generated as expected after restore? >>>>>>>>>>>>> >>>>>>>>>>>>> One more unrelated high-level comment that I have: for a >>>>>>>>>>>>> granularity of 24h windows, I wonder if it would not make sense >>>>>>>>>>>>> to use a >>>>>>>>>>>>> batch job instead? >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Stefan >>>>>>>>>>>>> >>>>>>>>>>>>> [1] https://github.com/king/bravo >>>>>>>>>>>>> [2] >>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration >>>>>>>>>>>>> >>>>>>>>>>>>> Am 04.10.2018 um 14:53 schrieb Juho Autio < >>>>>>>>>>>>> juho.au...@rovio.com>: >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the suggestions! >>>>>>>>>>>>> >>>>>>>>>>>>> > In general, it would be tremendously helpful to have a >>>>>>>>>>>>> minimal working example which allows to reproduce the problem. >>>>>>>>>>>>> >>>>>>>>>>>>> Definitely. The problem with reproducing has been that this >>>>>>>>>>>>> only seems to happen in the bigger production data volumes. >>>>>>>>>>>>> >>>>>>>>>>>>> That's why I'm hoping to find a way to debug this with the >>>>>>>>>>>>> production data. With that it seems to consistently cause some >>>>>>>>>>>>> misses every >>>>>>>>>>>>> time the job is killed/restored. >>>>>>>>>>>>> >>>>>>>>>>>>> > check if it happens for shorter windows, like 1h etc >>>>>>>>>>>>> >>>>>>>>>>>>> What would be the benefit of that compared to 24h window? >>>>>>>>>>>>> >>>>>>>>>>>>> > simplify the job to not use a reduce window but simply a >>>>>>>>>>>>> time window which outputs the window events. Then counting the >>>>>>>>>>>>> input and >>>>>>>>>>>>> output events should allow you to verify the results. If you are >>>>>>>>>>>>> not seeing >>>>>>>>>>>>> missing events, then it could have something to do with the >>>>>>>>>>>>> reducing state >>>>>>>>>>>>> used in the reduce function. >>>>>>>>>>>>> >>>>>>>>>>>>> Hm, maybe, but not sure how useful that would be, because it >>>>>>>>>>>>> wouldn't yet prove that it's related to reducing, because not >>>>>>>>>>>>> having a >>>>>>>>>>>>> reduce function could also mean smaller load on the job, which >>>>>>>>>>>>> might alone >>>>>>>>>>>>> be enough to make the problem not manifest. >>>>>>>>>>>>> >>>>>>>>>>>>> Is there a way to debug what goes into the reducing state >>>>>>>>>>>>> (including what gets removed or overwritten and what restored), >>>>>>>>>>>>> if that >>>>>>>>>>>>> makes sense..? Maybe some suitable logging could be used to prove >>>>>>>>>>>>> that the >>>>>>>>>>>>> lost data is written to the reducing state (or at least asked to >>>>>>>>>>>>> be >>>>>>>>>>>>> written), but not found any more when the window closes and state >>>>>>>>>>>>> is >>>>>>>>>>>>> flushed? >>>>>>>>>>>>> >>>>>>>>>>>>> On configuration once more, we're using RocksDB state backend >>>>>>>>>>>>> with asynchronous incremental checkpointing. The state is >>>>>>>>>>>>> restored from >>>>>>>>>>>>> savepoints though, we haven't been using those checkpoints in >>>>>>>>>>>>> these tests >>>>>>>>>>>>> (although they could be used in case of crashes – but we haven't >>>>>>>>>>>>> had those >>>>>>>>>>>>> now). >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann < >>>>>>>>>>>>> trohrm...@apache.org> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>> >>>>>>>>>>>>>> another idea to further narrow down the problem could be to >>>>>>>>>>>>>> simplify the job to not use a reduce window but simply a time >>>>>>>>>>>>>> window which >>>>>>>>>>>>>> outputs the window events. Then counting the input and output >>>>>>>>>>>>>> events should >>>>>>>>>>>>>> allow you to verify the results. If you are not seeing missing >>>>>>>>>>>>>> events, then >>>>>>>>>>>>>> it could have something to do with the reducing state used in >>>>>>>>>>>>>> the reduce >>>>>>>>>>>>>> function. >>>>>>>>>>>>>> >>>>>>>>>>>>>> In general, it would be tremendously helpful to have a >>>>>>>>>>>>>> minimal working example which allows to reproduce the problem. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Till >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin < >>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> can you try to reduce the job to minimal reproducible >>>>>>>>>>>>>>> example and share the job and input? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> For example: >>>>>>>>>>>>>>> - some simple records as input, e.g. tuples of primitive >>>>>>>>>>>>>>> types saved as cvs >>>>>>>>>>>>>>> - minimal deduplication job which processes them and misses >>>>>>>>>>>>>>> records >>>>>>>>>>>>>>> - check if it happens for shorter windows, like 1h etc >>>>>>>>>>>>>>> - setup which you use for the job, ideally >>>>>>>>>>>>>>> locally reproducible or cloud >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Sorry to insist, but we seem to be blocked for any serious >>>>>>>>>>>>>>> usage of state in Flink if we can't rely on it to not miss data >>>>>>>>>>>>>>> in case of >>>>>>>>>>>>>>> restore. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Would anyone have suggestions for how to troubleshoot this? >>>>>>>>>>>>>>> So far I have verified with DEBUG logs that our reduce function >>>>>>>>>>>>>>> gets to >>>>>>>>>>>>>>> process also the data that is missing from window output. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio < >>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Andrey, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> To rule out for good any questions about sink behaviour, >>>>>>>>>>>>>>>> the job was killed and started with an additional Kafka sink. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The same number of ids were missed in both outputs: >>>>>>>>>>>>>>>> KafkaSink & BucketingSink. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I wonder what would be the next steps in debugging? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio < >>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, Andrey. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > so it means that the savepoint does not loose at least >>>>>>>>>>>>>>>>> some dropped records. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I'm not sure what you mean by that? I mean, it was known >>>>>>>>>>>>>>>>> from the beginning, that not everything is lost before/after >>>>>>>>>>>>>>>>> restoring a >>>>>>>>>>>>>>>>> savepoint, just some records around the time of restoration. >>>>>>>>>>>>>>>>> It's not 100% >>>>>>>>>>>>>>>>> clear whether records are lost before making a savepoint or >>>>>>>>>>>>>>>>> after restoring >>>>>>>>>>>>>>>>> it. Although, based on the new DEBUG logs it seems more like >>>>>>>>>>>>>>>>> losing some >>>>>>>>>>>>>>>>> records that are seen ~soon after restoring. It seems like >>>>>>>>>>>>>>>>> Flink would be >>>>>>>>>>>>>>>>> somehow confused either about the restored state vs. new >>>>>>>>>>>>>>>>> inserts to state. >>>>>>>>>>>>>>>>> This could also be somehow linked to the high back pressure >>>>>>>>>>>>>>>>> on the kafka >>>>>>>>>>>>>>>>> source while the stream is catching up. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> > If it is feasible for your setup, I suggest to insert >>>>>>>>>>>>>>>>> one more map function after reduce and before sink. >>>>>>>>>>>>>>>>> > etc. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Isn't that the same thing that we discussed before? >>>>>>>>>>>>>>>>> Nothing is sent to BucketingSink before the window closes, so >>>>>>>>>>>>>>>>> I don't see >>>>>>>>>>>>>>>>> how it would make any difference if we replace the >>>>>>>>>>>>>>>>> BucketingSink with a map >>>>>>>>>>>>>>>>> function or another sink type. We don't create or restore >>>>>>>>>>>>>>>>> savepoints during >>>>>>>>>>>>>>>>> the time when BucketingSink gets input or has open buckets – >>>>>>>>>>>>>>>>> that happens >>>>>>>>>>>>>>>>> at a much later time of day. I would focus on figuring out >>>>>>>>>>>>>>>>> why the records >>>>>>>>>>>>>>>>> are lost while the window is open. But I don't know how to do >>>>>>>>>>>>>>>>> that. Would >>>>>>>>>>>>>>>>> you have any additional suggestions? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin < >>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> so it means that the savepoint does not loose at least >>>>>>>>>>>>>>>>>> some dropped records. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> If it is feasible for your setup, I suggest to insert one >>>>>>>>>>>>>>>>>> more map function after reduce and before sink. >>>>>>>>>>>>>>>>>> The map function should be called right after window is >>>>>>>>>>>>>>>>>> triggered but before flushing to s3. >>>>>>>>>>>>>>>>>> The result of reduce (deduped record) could be logged >>>>>>>>>>>>>>>>>> there. >>>>>>>>>>>>>>>>>> This should allow to check whether the processed distinct >>>>>>>>>>>>>>>>>> records were buffered in the state after the restoration >>>>>>>>>>>>>>>>>> from the savepoint >>>>>>>>>>>>>>>>>> or not. If they were buffered we should see that there was >>>>>>>>>>>>>>>>>> an attempt to >>>>>>>>>>>>>>>>>> write them to the sink from the state. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Another suggestion is to try to write records to some >>>>>>>>>>>>>>>>>> other sink or to both. >>>>>>>>>>>>>>>>>> E.g. if you can access file system of workers, maybe just >>>>>>>>>>>>>>>>>> into local files and check whether the records are also >>>>>>>>>>>>>>>>>> dropped there. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On 20 Sep 2018, at 15:37, Juho Autio < >>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Andrey! >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I was finally able to gather the DEBUG logs that you >>>>>>>>>>>>>>>>>> suggested. In short, the reducer logged that it processed at >>>>>>>>>>>>>>>>>> least some of >>>>>>>>>>>>>>>>>> the ids that were missing from the output. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> "At least some", because I didn't have the job running >>>>>>>>>>>>>>>>>> with DEBUG logs for the full 24-hour window period. So I was >>>>>>>>>>>>>>>>>> only able to >>>>>>>>>>>>>>>>>> look up if I can find *some* of the missing ids in the >>>>>>>>>>>>>>>>>> DEBUG logs. Which I did indeed. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I changed the DistinctFunction.java to do this: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>> public Map<String, String> reduce(Map<String, String> >>>>>>>>>>>>>>>>>> value1, Map<String, String> value2) { >>>>>>>>>>>>>>>>>> LOG.debug("DistinctFunction.reduce returns: >>>>>>>>>>>>>>>>>> {}={}", value1.get("field"), value1.get("id")); >>>>>>>>>>>>>>>>>> return value1; >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Then: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> vi flink-1.6.0/conf/log4j.properties >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Then I ran the following kind of test: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> - Cancelled the on-going job with savepoint created at >>>>>>>>>>>>>>>>>> ~Sep 18 08:35 UTC 2018 >>>>>>>>>>>>>>>>>> - Started a new cluster & job with DEBUG enabled at >>>>>>>>>>>>>>>>>> ~09:13, restored from that previous cluster's savepoint >>>>>>>>>>>>>>>>>> - Ran until caught up offsets >>>>>>>>>>>>>>>>>> - Cancelled the job with a new savepoint >>>>>>>>>>>>>>>>>> - Started a new job _without_ DEBUG, which restored the >>>>>>>>>>>>>>>>>> new savepoint, let it keep running so that it will >>>>>>>>>>>>>>>>>> eventually write the >>>>>>>>>>>>>>>>>> output >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Then on the next day, after results had been flushed when >>>>>>>>>>>>>>>>>> the 24-hour window closed, I compared the results again with >>>>>>>>>>>>>>>>>> a batch >>>>>>>>>>>>>>>>>> version's output. And found some missing ids as usual. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I drilled down to one specific missing id (I'm replacing >>>>>>>>>>>>>>>>>> the actual value with AN12345 below), which was not found in >>>>>>>>>>>>>>>>>> the stream >>>>>>>>>>>>>>>>>> output, but was found in batch output & flink DEBUG logs. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Related to that id, I gathered the following information: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2018-09-18~09:13:21,000 job started & savepoint is >>>>>>>>>>>>>>>>>> restored >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2018-09-18 09:14:29,085 missing id is processed for the >>>>>>>>>>>>>>>>>> first time, proved by this log line: >>>>>>>>>>>>>>>>>> 2018-09-18 09:14:29,085 DEBUG >>>>>>>>>>>>>>>>>> com.rovio.ds.flink.uniqueid.DistinctFunction >>>>>>>>>>>>>>>>>> - >>>>>>>>>>>>>>>>>> DistinctFunction.reduce returns: s.aid1=AN12345 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2018-09-18 09:15:14,264 first synchronous part of >>>>>>>>>>>>>>>>>> checkpoint >>>>>>>>>>>>>>>>>> 2018-09-18 09:15:16,544 first asynchronous part of >>>>>>>>>>>>>>>>>> checkpoint >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> ( >>>>>>>>>>>>>>>>>> more occurrences of checkpoints (~1 min checkpointing >>>>>>>>>>>>>>>>>> time + ~1 min delay before next) >>>>>>>>>>>>>>>>>> / >>>>>>>>>>>>>>>>>> more occurrences of DistinctFunction.reduce >>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2018-09-18 09:23:45,053 missing id is processed for the >>>>>>>>>>>>>>>>>> last time >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> To be noted, there was high backpressure after restoring >>>>>>>>>>>>>>>>>> from savepoint until the stream caught up with the kafka >>>>>>>>>>>>>>>>>> offsets. Although, >>>>>>>>>>>>>>>>>> our job uses assign timestamps & watermarks on the flink >>>>>>>>>>>>>>>>>> kafka consumer >>>>>>>>>>>>>>>>>> itself, so event time of all partitions is synchronized. As >>>>>>>>>>>>>>>>>> expected, we >>>>>>>>>>>>>>>>>> don't get any late data in the late data side output. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> From this we can see that the missing ids are processed >>>>>>>>>>>>>>>>>> by the reducer, but they must get lost somewhere before the >>>>>>>>>>>>>>>>>> 24-hour window >>>>>>>>>>>>>>>>>> is triggered. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I think it's worth mentioning once more that the stream >>>>>>>>>>>>>>>>>> doesn't miss any ids if we let it's running without >>>>>>>>>>>>>>>>>> interruptions / state >>>>>>>>>>>>>>>>>> restoring. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> What's next? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin < >>>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> > only when the 24-hour window >>>>>>>>>>>>>>>>>>> triggers, BucketingSink gets a burst of input >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> This is of course totally true, my understanding is the >>>>>>>>>>>>>>>>>>> same. We cannot exclude problem there for sure, just >>>>>>>>>>>>>>>>>>> savepoints are used a >>>>>>>>>>>>>>>>>>> lot w/o problem reports and BucketingSink is known to be >>>>>>>>>>>>>>>>>>> problematic with >>>>>>>>>>>>>>>>>>> s3. That is why, I asked you: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> > You also wrote that the timestamps of lost event are >>>>>>>>>>>>>>>>>>> 'probably' around the time of the savepoint, if it is not >>>>>>>>>>>>>>>>>>> yet for sure I >>>>>>>>>>>>>>>>>>> would also check it. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Although, bucketing sink might loose any data at the end >>>>>>>>>>>>>>>>>>> of the day (also from the middle). The fact, that it is >>>>>>>>>>>>>>>>>>> always around the >>>>>>>>>>>>>>>>>>> time of taking a savepoint and not random, is surely >>>>>>>>>>>>>>>>>>> suspicious and >>>>>>>>>>>>>>>>>>> possible savepoint failures need to be investigated. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Regarding the s3 problem, s3 doc says: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> > The caveat is that if you make a HEAD or GET request >>>>>>>>>>>>>>>>>>> to the key name (to find if the object exists) before >>>>>>>>>>>>>>>>>>> creating the object, >>>>>>>>>>>>>>>>>>> Amazon S3 provides 'eventual consistency' for >>>>>>>>>>>>>>>>>>> read-after-write. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The algorithm you suggest is how it is roughly >>>>>>>>>>>>>>>>>>> implemented now (BucketingSink.openNewPartFile). My >>>>>>>>>>>>>>>>>>> understanding is that >>>>>>>>>>>>>>>>>>> 'eventual consistency’ means that even if you just created >>>>>>>>>>>>>>>>>>> file (its name >>>>>>>>>>>>>>>>>>> is key) it can be that you do not get it in the list or >>>>>>>>>>>>>>>>>>> exists (HEAD) >>>>>>>>>>>>>>>>>>> returns false and you risk to rewrite the previous part. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The BucketingSink was designed for a standard file >>>>>>>>>>>>>>>>>>> system. s3 is used over a file system wrapper atm but does >>>>>>>>>>>>>>>>>>> not always >>>>>>>>>>>>>>>>>>> provide normal file system guarantees. See also last >>>>>>>>>>>>>>>>>>> example in [1]. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On 29 Aug 2018, at 12:11, Juho Autio < >>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Andrey, thank you very much for the debugging >>>>>>>>>>>>>>>>>>> suggestions, I'll try them. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> In the meanwhile two more questions, please: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> > Just to keep in mind this problem with s3 and exclude >>>>>>>>>>>>>>>>>>> it for sure. I would also check whether the size of missing >>>>>>>>>>>>>>>>>>> events is >>>>>>>>>>>>>>>>>>> around the batch size of BucketingSink or not. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Fair enough, but I also want to focus on debugging the >>>>>>>>>>>>>>>>>>> most probable subject first. So what do you think about >>>>>>>>>>>>>>>>>>> this – true or >>>>>>>>>>>>>>>>>>> false: only when the 24-hour window triggers, BucketinSink >>>>>>>>>>>>>>>>>>> gets a burst of >>>>>>>>>>>>>>>>>>> input. Around the state restoring point (middle of the day) >>>>>>>>>>>>>>>>>>> it doesn't get >>>>>>>>>>>>>>>>>>> any input, so it can't lose anything either. Isn't this >>>>>>>>>>>>>>>>>>> true, or have I >>>>>>>>>>>>>>>>>>> totally missed how Flink works in triggering window >>>>>>>>>>>>>>>>>>> results? I would not >>>>>>>>>>>>>>>>>>> expect there to be any optimization that speculatively >>>>>>>>>>>>>>>>>>> triggers early >>>>>>>>>>>>>>>>>>> results of a regular time window to the downstream >>>>>>>>>>>>>>>>>>> operators. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> > The old BucketingSink has in general problem with s3. >>>>>>>>>>>>>>>>>>> Internally BucketingSink queries s3 as a file system to >>>>>>>>>>>>>>>>>>> list already >>>>>>>>>>>>>>>>>>> written file parts (batches) and determine index of the >>>>>>>>>>>>>>>>>>> next part to start. >>>>>>>>>>>>>>>>>>> Due to eventual consistency of checking file existence in >>>>>>>>>>>>>>>>>>> s3 [1], the >>>>>>>>>>>>>>>>>>> BucketingSink can rewrite the previously written part and >>>>>>>>>>>>>>>>>>> basically loose >>>>>>>>>>>>>>>>>>> it. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I was wondering, what does S3's "read-after-write >>>>>>>>>>>>>>>>>>> consistency" (mentioned on the page you linked) actually >>>>>>>>>>>>>>>>>>> mean. It seems >>>>>>>>>>>>>>>>>>> that this might be possible: >>>>>>>>>>>>>>>>>>> - LIST keys, find current max index >>>>>>>>>>>>>>>>>>> - choose next index = max + 1 >>>>>>>>>>>>>>>>>>> - HEAD next index: if it exists, keep adding + 1 until >>>>>>>>>>>>>>>>>>> key doesn't exist on S3 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> But definitely sounds easier if a sink keeps track of >>>>>>>>>>>>>>>>>>> files in a way that's guaranteed to be consistent. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>> Juho >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin < >>>>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> true, StreamingFileSink does not support s3 in 1.6.0, >>>>>>>>>>>>>>>>>>>> it is planned for the next 1.7 release, sorry for >>>>>>>>>>>>>>>>>>>> confusion. >>>>>>>>>>>>>>>>>>>> The old BucketingSink has in general problem with s3. >>>>>>>>>>>>>>>>>>>> Internally BucketingSink queries s3 as a file system >>>>>>>>>>>>>>>>>>>> to list already written file parts (batches) and >>>>>>>>>>>>>>>>>>>> determine index of the next part to start. Due to eventual >>>>>>>>>>>>>>>>>>>> consistency of >>>>>>>>>>>>>>>>>>>> checking file existence in s3 [1], the BucketingSink can >>>>>>>>>>>>>>>>>>>> rewrite the >>>>>>>>>>>>>>>>>>>> previously written part and basically loose it. It should >>>>>>>>>>>>>>>>>>>> be fixed for >>>>>>>>>>>>>>>>>>>> StreamingFileSink in 1.7 where Flink keeps its own track >>>>>>>>>>>>>>>>>>>> of written parts >>>>>>>>>>>>>>>>>>>> and does not rely on s3 as a file system. >>>>>>>>>>>>>>>>>>>> I also include Kostas, he might add more details. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Just to keep in mind this problem with s3 and exclude >>>>>>>>>>>>>>>>>>>> it for sure I would also check whether the size of >>>>>>>>>>>>>>>>>>>> missing events is >>>>>>>>>>>>>>>>>>>> around the batch size of BucketingSink or not. You also >>>>>>>>>>>>>>>>>>>> wrote that the >>>>>>>>>>>>>>>>>>>> timestamps of lost event are 'probably' around the time of >>>>>>>>>>>>>>>>>>>> the savepoint, >>>>>>>>>>>>>>>>>>>> if it is not yet for sure I would also check it. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Have you already checked the log files of job manager >>>>>>>>>>>>>>>>>>>> and task managers for the job running before and after the >>>>>>>>>>>>>>>>>>>> restore from the >>>>>>>>>>>>>>>>>>>> check point? Is everything successful there, no errors, >>>>>>>>>>>>>>>>>>>> relevant warnings >>>>>>>>>>>>>>>>>>>> or exceptions? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> As the next step, I would suggest to log all >>>>>>>>>>>>>>>>>>>> encountered events in DistinctFunction.reduce if possible >>>>>>>>>>>>>>>>>>>> for production >>>>>>>>>>>>>>>>>>>> data and check whether the missed events are eventually >>>>>>>>>>>>>>>>>>>> processed before or >>>>>>>>>>>>>>>>>>>> after the savepoint. The following log message indicates a >>>>>>>>>>>>>>>>>>>> border between >>>>>>>>>>>>>>>>>>>> the events that should be included into the savepoint >>>>>>>>>>>>>>>>>>>> (logged before) or >>>>>>>>>>>>>>>>>>>> not: >>>>>>>>>>>>>>>>>>>> “{} ({}, synchronous part) in thread {} took {} ms” >>>>>>>>>>>>>>>>>>>> (template) >>>>>>>>>>>>>>>>>>>> Also check if the savepoint has been overall completed: >>>>>>>>>>>>>>>>>>>> "{} ({}, asynchronous part) in thread {} took {} ms." >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 24 Aug 2018, at 20:41, Juho Autio < >>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Using StreamingFileSink is not a convenient option for >>>>>>>>>>>>>>>>>>>> production use for us as it doesn't support s3*. I could >>>>>>>>>>>>>>>>>>>> use >>>>>>>>>>>>>>>>>>>> StreamingFileSink just to verify, but I don't see much >>>>>>>>>>>>>>>>>>>> point in doing so. >>>>>>>>>>>>>>>>>>>> Please consider my previous comment: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> > I realized that BucketingSink must not play any role >>>>>>>>>>>>>>>>>>>> in this problem. This is because only when the 24-hour >>>>>>>>>>>>>>>>>>>> window triggers, >>>>>>>>>>>>>>>>>>>> BucketingSink gets a burst of input. Around the state >>>>>>>>>>>>>>>>>>>> restoring point >>>>>>>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't >>>>>>>>>>>>>>>>>>>> lose anything >>>>>>>>>>>>>>>>>>>> either (right?). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I could also use a kafka sink instead, but I can't >>>>>>>>>>>>>>>>>>>> imagine how there could be any difference. It's very real >>>>>>>>>>>>>>>>>>>> that the sink >>>>>>>>>>>>>>>>>>>> doesn't get any input for a long time until the 24-hour >>>>>>>>>>>>>>>>>>>> window closes, and >>>>>>>>>>>>>>>>>>>> then it quickly writes out everything because it's not >>>>>>>>>>>>>>>>>>>> that much data >>>>>>>>>>>>>>>>>>>> eventually for the distinct values. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Any ideas for debugging what's happening around the >>>>>>>>>>>>>>>>>>>> savepoint & restoration time? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> *) I actually implemented StreamingFileSink as an >>>>>>>>>>>>>>>>>>>> alternative sink. This was before I came to realize that >>>>>>>>>>>>>>>>>>>> most likely the >>>>>>>>>>>>>>>>>>>> sink component has nothing to do with the data loss >>>>>>>>>>>>>>>>>>>> problem. I tried it >>>>>>>>>>>>>>>>>>>> with s3n:// path just to see an exception being thrown. In >>>>>>>>>>>>>>>>>>>> the source code >>>>>>>>>>>>>>>>>>>> I indeed then found an explicit check for the target path >>>>>>>>>>>>>>>>>>>> scheme to be >>>>>>>>>>>>>>>>>>>> "hdfs://". >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin < >>>>>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Ok, I think before further debugging the window >>>>>>>>>>>>>>>>>>>>> reduced state, >>>>>>>>>>>>>>>>>>>>> could you try the new ‘StreamingFileSink’ [1] >>>>>>>>>>>>>>>>>>>>> introduced in Flink 1.6.0 instead of the previous >>>>>>>>>>>>>>>>>>>>> 'BucketingSink’? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On 24 Aug 2018, at 18:03, Juho Autio < >>>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Yes, sorry for my confusing comment. I just meant that >>>>>>>>>>>>>>>>>>>>> it seems like there's a bug somewhere now that the output >>>>>>>>>>>>>>>>>>>>> is missing some >>>>>>>>>>>>>>>>>>>>> data. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> > I would wait and check the actual output in s3 >>>>>>>>>>>>>>>>>>>>> because it is the main result of the job >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Yes, and that's what I have already done. There seems >>>>>>>>>>>>>>>>>>>>> to be always some data loss with the production data >>>>>>>>>>>>>>>>>>>>> volumes, if the job >>>>>>>>>>>>>>>>>>>>> has been restarted on that day. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Would you have any suggestions for how to debug this >>>>>>>>>>>>>>>>>>>>> further? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Many thanks for stepping in. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin < >>>>>>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> So it is a per key deduplication job. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Yes, I would wait and check the actual output in s3 >>>>>>>>>>>>>>>>>>>>>> because it is the main result of the job and >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> > The late data around the time of taking savepoint >>>>>>>>>>>>>>>>>>>>>> might be not included into the savepoint but it should >>>>>>>>>>>>>>>>>>>>>> be behind the >>>>>>>>>>>>>>>>>>>>>> snapshotted offset in Kafka. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> is not a bug, it is a possible behaviour. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> The savepoint is a snapshot of the data in transient >>>>>>>>>>>>>>>>>>>>>> which is already consumed from Kafka. >>>>>>>>>>>>>>>>>>>>>> Basically the full contents of the window result is >>>>>>>>>>>>>>>>>>>>>> split between the savepoint and what can come after the >>>>>>>>>>>>>>>>>>>>>> savepoint'ed offset >>>>>>>>>>>>>>>>>>>>>> in Kafka but before the window result is written into s3. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Allowed lateness should not affect it, I am just >>>>>>>>>>>>>>>>>>>>>> saying that the final result in s3 should include all >>>>>>>>>>>>>>>>>>>>>> records after it. >>>>>>>>>>>>>>>>>>>>>> This is what should be guaranteed but not the >>>>>>>>>>>>>>>>>>>>>> contents of the intermediate savepoint. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio < >>>>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks for your answer! >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I check for the missed data from the final output on >>>>>>>>>>>>>>>>>>>>>> s3. So I wait until the next day, then run the same >>>>>>>>>>>>>>>>>>>>>> thing re-implemented in >>>>>>>>>>>>>>>>>>>>>> batch, and compare the output. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> > The late data around the time of taking savepoint >>>>>>>>>>>>>>>>>>>>>> might be not included into the savepoint but it should >>>>>>>>>>>>>>>>>>>>>> be behind the >>>>>>>>>>>>>>>>>>>>>> snapshotted offset in Kafka. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Yes, I would definitely expect that. It seems like >>>>>>>>>>>>>>>>>>>>>> there's a bug somewhere. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> > Then it should just come later after the restore >>>>>>>>>>>>>>>>>>>>>> and should be reduced within the allowed lateness into >>>>>>>>>>>>>>>>>>>>>> the final result >>>>>>>>>>>>>>>>>>>>>> which is saved into s3. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Well, as far as I know, allowed lateness doesn't play >>>>>>>>>>>>>>>>>>>>>> any role here, because I started running the job with >>>>>>>>>>>>>>>>>>>>>> allowedLateness=0, >>>>>>>>>>>>>>>>>>>>>> and still get the data loss, while my late data output >>>>>>>>>>>>>>>>>>>>>> doesn't receive >>>>>>>>>>>>>>>>>>>>>> anything. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> > Also, is this `DistinctFunction.reduce` just an >>>>>>>>>>>>>>>>>>>>>> example or the actual implementation, basically saving >>>>>>>>>>>>>>>>>>>>>> just one of records >>>>>>>>>>>>>>>>>>>>>> inside the 24h window in s3? then what is missing there? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Yes, it's the actual implementation. Note that >>>>>>>>>>>>>>>>>>>>>> there's a keyBy before the DistinctFunction. So there's >>>>>>>>>>>>>>>>>>>>>> one record for each >>>>>>>>>>>>>>>>>>>>>> key (which is the combination of a couple of fields). In >>>>>>>>>>>>>>>>>>>>>> practice I've seen >>>>>>>>>>>>>>>>>>>>>> that we're missing ~2000-4000 elements on each restore, >>>>>>>>>>>>>>>>>>>>>> and the total >>>>>>>>>>>>>>>>>>>>>> output is obviously much more than that. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Here's the full code for the key selector: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> public class MapKeySelector implements >>>>>>>>>>>>>>>>>>>>>> KeySelector<Map<String,String>, Object> { >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> private final String[] fields; >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> public MapKeySelector(String... fields) { >>>>>>>>>>>>>>>>>>>>>> this.fields = fields; >>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>>>>>> public Object getKey(Map<String, String> event) >>>>>>>>>>>>>>>>>>>>>> throws Exception { >>>>>>>>>>>>>>>>>>>>>> Tuple key = >>>>>>>>>>>>>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance(); >>>>>>>>>>>>>>>>>>>>>> for (int i = 0; i < fields.length; i++) { >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> key.setField(event.getOrDefault(fields[i], ""), i); >>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>> return key; >>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> And a more exact example on how it's used: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> .keyBy(new MapKeySelector("ID", >>>>>>>>>>>>>>>>>>>>>> "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE")) >>>>>>>>>>>>>>>>>>>>>> .timeWindow(Time.days(1)) >>>>>>>>>>>>>>>>>>>>>> .reduce(new DistinctFunction()) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin < >>>>>>>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Where exactly does the data miss? When do you notice >>>>>>>>>>>>>>>>>>>>>>> that? >>>>>>>>>>>>>>>>>>>>>>> Do you check it: >>>>>>>>>>>>>>>>>>>>>>> - debugging `DistinctFunction.reduce` right after >>>>>>>>>>>>>>>>>>>>>>> resume in the middle of the day >>>>>>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>>>> - some distinct records miss in the final output >>>>>>>>>>>>>>>>>>>>>>> of BucketingSink in s3 after window result is actually >>>>>>>>>>>>>>>>>>>>>>> triggered and saved >>>>>>>>>>>>>>>>>>>>>>> into s3 at the end of the day? is this the main output? >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> The late data around the time of taking savepoint >>>>>>>>>>>>>>>>>>>>>>> might be not included into the savepoint but it should >>>>>>>>>>>>>>>>>>>>>>> be behind the >>>>>>>>>>>>>>>>>>>>>>> snapshotted offset in Kafka. Then it should just come >>>>>>>>>>>>>>>>>>>>>>> later after the >>>>>>>>>>>>>>>>>>>>>>> restore and should be reduced within the allowed >>>>>>>>>>>>>>>>>>>>>>> lateness into the final >>>>>>>>>>>>>>>>>>>>>>> result which is saved into s3. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Also, is this `DistinctFunction.reduce` just an >>>>>>>>>>>>>>>>>>>>>>> example or the actual implementation, basically saving >>>>>>>>>>>>>>>>>>>>>>> just one of records >>>>>>>>>>>>>>>>>>>>>>> inside the 24h window in s3? then what is missing there? >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio < >>>>>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I changed to allowedLateness=0, no change, still >>>>>>>>>>>>>>>>>>>>>>> missing data when restoring from savepoint. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio < >>>>>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I realized that BucketingSink must not play any >>>>>>>>>>>>>>>>>>>>>>>> role in this problem. This is because only when the >>>>>>>>>>>>>>>>>>>>>>>> 24-hour window >>>>>>>>>>>>>>>>>>>>>>>> triggers, BucketinSink gets a burst of input. Around >>>>>>>>>>>>>>>>>>>>>>>> the state restoring >>>>>>>>>>>>>>>>>>>>>>>> point (middle of the day) it doesn't get any input, so >>>>>>>>>>>>>>>>>>>>>>>> it can't lose >>>>>>>>>>>>>>>>>>>>>>>> anything either (right?). >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I will next try removing the allowedLateness >>>>>>>>>>>>>>>>>>>>>>>> entirely from the equation. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> In the meanwhile, please let me know if you have >>>>>>>>>>>>>>>>>>>>>>>> any suggestions for debugging the lost data, for >>>>>>>>>>>>>>>>>>>>>>>> example what logs to >>>>>>>>>>>>>>>>>>>>>>>> enable. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any >>>>>>>>>>>>>>>>>>>>>>>> known issues with that, that could contribute to lost >>>>>>>>>>>>>>>>>>>>>>>> data when restoring a >>>>>>>>>>>>>>>>>>>>>>>> savepoint? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Some data is silently lost on my Flink stream job >>>>>>>>>>>>>>>>>>>>>>>>> when state is restored from a savepoint. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Do you have any debugging hints to find out where >>>>>>>>>>>>>>>>>>>>>>>>> exactly the data gets dropped? >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> My job gathers distinct values using a 24-hour >>>>>>>>>>>>>>>>>>>>>>>>> window. It doesn't have any custom state management. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> When I cancel the job with savepoint and restore >>>>>>>>>>>>>>>>>>>>>>>>> from that savepoint, some data is missed. It seems to >>>>>>>>>>>>>>>>>>>>>>>>> be losing just a >>>>>>>>>>>>>>>>>>>>>>>>> small amount of data. The event time of lost data is >>>>>>>>>>>>>>>>>>>>>>>>> probably around the >>>>>>>>>>>>>>>>>>>>>>>>> time of savepoint. In other words the rest of the >>>>>>>>>>>>>>>>>>>>>>>>> time window is not >>>>>>>>>>>>>>>>>>>>>>>>> entirely missed – collection works correctly also for >>>>>>>>>>>>>>>>>>>>>>>>> (most of the) events >>>>>>>>>>>>>>>>>>>>>>>>> that come in after restoring. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> When the job processes a full 24-hour window >>>>>>>>>>>>>>>>>>>>>>>>> without interruptions it doesn't miss anything. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Usually the problem doesn't happen in test >>>>>>>>>>>>>>>>>>>>>>>>> environments that have smaller parallelism and >>>>>>>>>>>>>>>>>>>>>>>>> smaller data volumes. But in >>>>>>>>>>>>>>>>>>>>>>>>> production volumes the job seems to be consistently >>>>>>>>>>>>>>>>>>>>>>>>> missing at least >>>>>>>>>>>>>>>>>>>>>>>>> something on every restore. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> This issue has consistently happened since the job >>>>>>>>>>>>>>>>>>>>>>>>> was initially created. It was at first run on an >>>>>>>>>>>>>>>>>>>>>>>>> older version of Flink >>>>>>>>>>>>>>>>>>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 >>>>>>>>>>>>>>>>>>>>>>>>> & 1.6.0. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'm wondering if this could be for example some >>>>>>>>>>>>>>>>>>>>>>>>> synchronization issue between the kafka consumer >>>>>>>>>>>>>>>>>>>>>>>>> offsets vs. what's been >>>>>>>>>>>>>>>>>>>>>>>>> written by BucketingSink? >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 1. Job content, simplified >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> kafkaStream >>>>>>>>>>>>>>>>>>>>>>>>> .flatMap(new >>>>>>>>>>>>>>>>>>>>>>>>> ExtractFieldsFunction()) >>>>>>>>>>>>>>>>>>>>>>>>> .keyBy(new MapKeySelector(1, 2, 3, >>>>>>>>>>>>>>>>>>>>>>>>> 4)) >>>>>>>>>>>>>>>>>>>>>>>>> .timeWindow(Time.days(1)) >>>>>>>>>>>>>>>>>>>>>>>>> .allowedLateness(allowedLateness) >>>>>>>>>>>>>>>>>>>>>>>>> .sideOutputLateData(lateDataTag) >>>>>>>>>>>>>>>>>>>>>>>>> .reduce(new DistinctFunction()) >>>>>>>>>>>>>>>>>>>>>>>>> .addSink(sink) >>>>>>>>>>>>>>>>>>>>>>>>> // use a fixed number of output >>>>>>>>>>>>>>>>>>>>>>>>> partitions >>>>>>>>>>>>>>>>>>>>>>>>> .setParallelism(8)) >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> /** >>>>>>>>>>>>>>>>>>>>>>>>> * Usage: .keyBy("the", "distinct", >>>>>>>>>>>>>>>>>>>>>>>>> "fields").reduce(new DistinctFunction()) >>>>>>>>>>>>>>>>>>>>>>>>> */ >>>>>>>>>>>>>>>>>>>>>>>>> public class DistinctFunction implements >>>>>>>>>>>>>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> { >>>>>>>>>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>>>>>>>>> public Map<String, String> reduce(Map<String, >>>>>>>>>>>>>>>>>>>>>>>>> String> value1, Map<String, String> value2) { >>>>>>>>>>>>>>>>>>>>>>>>> return value1; >>>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 2. State configuration >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> boolean enableIncrementalCheckpointing = true; >>>>>>>>>>>>>>>>>>>>>>>>> String statePath = "s3n://bucket/savepoints"; >>>>>>>>>>>>>>>>>>>>>>>>> new RocksDBStateBackend(statePath, >>>>>>>>>>>>>>>>>>>>>>>>> enableIncrementalCheckpointing); >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Checkpointing Mode Exactly Once >>>>>>>>>>>>>>>>>>>>>>>>> Interval 1m 0s >>>>>>>>>>>>>>>>>>>>>>>>> Timeout 10m 0s >>>>>>>>>>>>>>>>>>>>>>>>> Minimum Pause Between Checkpoints 1m 0s >>>>>>>>>>>>>>>>>>>>>>>>> Maximum Concurrent Checkpoints 1 >>>>>>>>>>>>>>>>>>>>>>>>> Persist Checkpoints Externally Enabled (retain on >>>>>>>>>>>>>>>>>>>>>>>>> cancellation) >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 3. BucketingSink configuration >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> We use BucketingSink, I don't think there's >>>>>>>>>>>>>>>>>>>>>>>>> anything special here, if not the fact that we're >>>>>>>>>>>>>>>>>>>>>>>>> writing to S3. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> String outputPath = "s3://bucket/output"; >>>>>>>>>>>>>>>>>>>>>>>>> BucketingSink<Map<String, String>> sink = >>>>>>>>>>>>>>>>>>>>>>>>> new BucketingSink<Map<String, String>>(outputPath) >>>>>>>>>>>>>>>>>>>>>>>>> .setBucketer(new >>>>>>>>>>>>>>>>>>>>>>>>> ProcessdateBucketer()) >>>>>>>>>>>>>>>>>>>>>>>>> .setBatchSize(batchSize) >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold) >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval); >>>>>>>>>>>>>>>>>>>>>>>>> sink.setWriter(new IdJsonWriter()); >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 4. Kafka & event time >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> My flink job reads the data from Kafka, using a >>>>>>>>>>>>>>>>>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka >>>>>>>>>>>>>>>>>>>>>>>>> consumer to >>>>>>>>>>>>>>>>>>>>>>>>> synchronize watermarks accross all kafka partitions. >>>>>>>>>>>>>>>>>>>>>>>>> We also write late >>>>>>>>>>>>>>>>>>>>>>>>> data to side output, but nothing is written there – >>>>>>>>>>>>>>>>>>>>>>>>> if it would, it could >>>>>>>>>>>>>>>>>>>>>>>>> explain missed data in the main output (I'm also sure >>>>>>>>>>>>>>>>>>>>>>>>> that our late data >>>>>>>>>>>>>>>>>>>>>>>>> writing works, because we previously had some actual >>>>>>>>>>>>>>>>>>>>>>>>> late data which ended >>>>>>>>>>>>>>>>>>>>>>>>> up there). >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> 5. allowedLateness >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> It may be or may not be relevant that I have also >>>>>>>>>>>>>>>>>>>>>>>>> enabled allowedLateness with 1 minute lateness on the >>>>>>>>>>>>>>>>>>>>>>>>> 24-hour window: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> If that makes sense, I could try removing >>>>>>>>>>>>>>>>>>>>>>>>> allowedLateness entirely? That would be just to rule >>>>>>>>>>>>>>>>>>>>>>>>> out that Flink doesn't >>>>>>>>>>>>>>>>>>>>>>>>> have a bug that's related to restoring state in >>>>>>>>>>>>>>>>>>>>>>>>> combination with the >>>>>>>>>>>>>>>>>>>>>>>>> allowedLateness feature. After all, all of our data >>>>>>>>>>>>>>>>>>>>>>>>> should be in a good >>>>>>>>>>>>>>>>>>>>>>>>> enough order to not be late, given the max out of >>>>>>>>>>>>>>>>>>>>>>>>> orderness used on kafka >>>>>>>>>>>>>>>>>>>>>>>>> consumer timestamp extractor. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Thank you in advance! >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>> > > -- > *Juho Autio* > Senior Data Engineer > > Data Engineering, Games > Rovio Entertainment Corporation > Mobile: + 358 (0)45 313 0122 > juho.au...@rovio.com > www.rovio.com > > *This message and its attachments may contain confidential information and > is intended solely for the attention and use of the named addressee(s). If > you are not the intended recipient and / or you have received this message > in error, please contact the sender immediately and delete all material you > have received in this message. You are hereby notified that any use of the > information, which you have received in error in whatsoever form, is > strictly prohibited. Thank you for your co-operation.* >