Stefan (or anyone!), please, could I have some feedback on the findings
that I reported on Dec 21, 2018? This is still a major blocker..

On Thu, Jan 31, 2019 at 11:46 AM Juho Autio <juho.au...@rovio.com> wrote:

> Hello, is there anyone that could help with this?
>
> On Fri, Jan 11, 2019 at 8:14 AM Juho Autio <juho.au...@rovio.com> wrote:
>
>> Stefan, would you have time to comment?
>>
>> On Wednesday, January 2, 2019, Juho Autio <juho.au...@rovio.com> wrote:
>>
>>> Bump – does anyone know if Stefan will be available to comment the
>>> latest findings? Thanks.
>>>
>>> On Fri, Dec 21, 2018 at 2:33 PM Juho Autio <juho.au...@rovio.com> wrote:
>>>
>>>> Stefan, I managed to analyze savepoint with bravo. It seems that the
>>>> data that's missing from output *is* found in savepoint.
>>>>
>>>> I simplified my test case to the following:
>>>>
>>>> - job 1 has bee running for ~10 days
>>>> - savepoint X created & job 1 cancelled
>>>> - job 2 started with restore from savepoint X
>>>>
>>>> Then I waited until the next day so that job 2 has triggered the 24
>>>> hour window.
>>>>
>>>> Then I analyzed the output & savepoint:
>>>>
>>>> - compare job 2 output with the output of a batch pyspark script =>
>>>> find 4223 missing rows
>>>> - pick one of the missing rows (say, id Z)
>>>> - read savepoint X with bravo, filter for id Z => Z was found in the
>>>> savepoint!
>>>>
>>>> How can it be possible that the value is in state but doesn't end up in
>>>> output after state has been restored & window is eventually triggered?
>>>>
>>>> I also did similar analysis on the previous case where I savepointed &
>>>> restored the job multiple times (5) within the same 24-hour window. A
>>>> missing id that I drilled down to, was found in all of those savepoints,
>>>> yet missing from the output that gets written at the end of the day. This
>>>> is even more surprising: that the missing ID was written to the new
>>>> savepoints also after restoring. Is the reducer state somehow decoupled
>>>> from the window contents?
>>>>
>>>> Big thanks to bravo-developer Gyula for guiding me through to be able
>>>> read the reducer state! https://github.com/king/bravo/pull/11
>>>>
>>>> Gyula also had an idea for how to troubleshoot the missing data in a
>>>> scalable way: I could add some "side effect kafka output" on individual
>>>> operators. This should allow tracking more closely at which point the data
>>>> gets lost. However, maybe this would have to be in some Flink's internal
>>>> components, and I'm not sure which those would be.
>>>>
>>>> Cheers,
>>>> Juho
>>>>
>>>> On Mon, Nov 19, 2018 at 11:52 AM Juho Autio <juho.au...@rovio.com>
>>>> wrote:
>>>>
>>>>>
>>>>> Hi Stefan,
>>>>>
>>>>> Bravo doesn't currently support reading a reducer state. I gave it a
>>>>> try but couldn't get to a working implementation yet. If anyone can 
>>>>> provide
>>>>> some insight on how to make this work, please share at github:
>>>>> https://github.com/king/bravo/pull/11
>>>>>
>>>>> Thanks.
>>>>>
>>>>> On Tue, Oct 23, 2018 at 3:32 PM Juho Autio <juho.au...@rovio.com>
>>>>> wrote:
>>>>>
>>>>>> I was glad to find that bravo had now been updated to support
>>>>>> installing bravo to a local maven repo.
>>>>>>
>>>>>> I was able to load a checkpoint created by my job, thanks to the
>>>>>> example provided in bravo README, but I'm still missing the essential 
>>>>>> piece.
>>>>>>
>>>>>> My code was:
>>>>>>
>>>>>>         OperatorStateReader reader = new OperatorStateReader(env2,
>>>>>> savepoint, "DistinctFunction");
>>>>>>         DontKnowWhatTypeThisIs reducingState =
>>>>>> reader.readKeyedStates(what should I put here?);
>>>>>>
>>>>>> I don't know how to read the values collected from reduce() calls in
>>>>>> the state. Is there a way to access the reducing state of the window with
>>>>>> bravo? I'm a bit confused how this works, because when I check with
>>>>>> debugger, flink internally uses a ReducingStateDescriptor
>>>>>> with name=window-contents, but still reading operator state for
>>>>>> "DistinctFunction" didn't at least throw an exception ("window-contents"
>>>>>> threw – obviously there's no operator by that name).
>>>>>>
>>>>>> Cheers,
>>>>>> Juho
>>>>>>
>>>>>> On Mon, Oct 15, 2018 at 2:25 PM Juho Autio <juho.au...@rovio.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Stefan,
>>>>>>>
>>>>>>> Sorry but it doesn't seem immediately clear to me what's a good way
>>>>>>> to use https://github.com/king/bravo.
>>>>>>>
>>>>>>> How are people using it? Would you for example modify build.gradle
>>>>>>> somehow to publish the bravo as a library locally/internally? Or add 
>>>>>>> code
>>>>>>> directly in the bravo project (locally) and run it from there (using an
>>>>>>> IDE, for example)? Also it doesn't seem like the bravo gradle project
>>>>>>> supports building a flink job jar, but if it does, how do I do it?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio <juho.au...@rovio.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Good then, I'll try to analyze the savepoints with Bravo. Thanks!
>>>>>>>>
>>>>>>>> > How would you assume that backpressure would influence your
>>>>>>>> updates? Updates to each local state still happen event-by-event, in a
>>>>>>>> single reader/writing thread.
>>>>>>>>
>>>>>>>> Sure, just an ignorant guess by me. I'm not familiar with most of
>>>>>>>> Flink's internals. Any way high backpressure is not a seen on this job
>>>>>>>> after it has caught up the lag, so at I thought it would be worth
>>>>>>>> mentioning.
>>>>>>>>
>>>>>>>> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter <
>>>>>>>> s.rich...@data-artisans.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.au...@rovio.com>:
>>>>>>>>>
>>>>>>>>> > you could take a look at Bravo [1] to query your savepoints and
>>>>>>>>> to check if the state in the savepoint complete w.r.t your 
>>>>>>>>> expectations
>>>>>>>>>
>>>>>>>>> Thanks. I'm not 100% if this is the case, but to me it seemed like
>>>>>>>>> the missed ids were being logged by the reducer soon after the job had
>>>>>>>>> started (after restoring a savepoint). But on the other hand, after 
>>>>>>>>> that I
>>>>>>>>> also made another savepoint & restored that, so what I could check 
>>>>>>>>> is: does
>>>>>>>>> that next savepoint have the missed ids that were logged (a couple of
>>>>>>>>> minutes before the savepoint was created, so there should've been 
>>>>>>>>> more than
>>>>>>>>> enough time to add them to the state before the savepoint was 
>>>>>>>>> triggered) or
>>>>>>>>> not. Any way, if I would be able to verify with Bravo that the ids are
>>>>>>>>> missing from the savepoint (even though reduced logged that it saw 
>>>>>>>>> them),
>>>>>>>>> would that help in figuring out where they are lost? Is there some 
>>>>>>>>> major
>>>>>>>>> difference compared to just looking at the final output after window 
>>>>>>>>> has
>>>>>>>>> been triggered?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I think that makes a difference. For example, you can investigate
>>>>>>>>> if there is a state loss or a problem with the windowing. In the 
>>>>>>>>> savepoint
>>>>>>>>> you could see which keys exists and to which windows they are 
>>>>>>>>> assigned.
>>>>>>>>> Also just to make sure there is no misunderstanding: only elements 
>>>>>>>>> that are
>>>>>>>>> in the state at the start of a savepoint are expected to be part of 
>>>>>>>>> the
>>>>>>>>> savepoint; all elements between start and completion of the savepoint 
>>>>>>>>> are
>>>>>>>>> not expected to be part of the savepoint.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> > I also doubt that the problem is about backpressure after
>>>>>>>>> restore, because the job will only continue running after the state 
>>>>>>>>> restore
>>>>>>>>> is already completed.
>>>>>>>>>
>>>>>>>>> Yes, I'm not suspecting that the state restoring would be the
>>>>>>>>> problem either. My concern was about backpressure possibly messing 
>>>>>>>>> with the
>>>>>>>>> updates of reducing state? I would tend to suspect that updating the 
>>>>>>>>> state
>>>>>>>>> consistently is what fails, where heavy load / backpressure might be a
>>>>>>>>> factor.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> How would you assume that backpressure would influence your
>>>>>>>>> updates? Updates to each local state still happen event-by-event, in a
>>>>>>>>> single reader/writing thread.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter <
>>>>>>>>> s.rich...@data-artisans.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> you could take a look at Bravo [1] to query your savepoints and
>>>>>>>>>> to check if the state in the savepoint complete w.r.t your 
>>>>>>>>>> expectations. I
>>>>>>>>>> somewhat doubt that there is a general problem with the 
>>>>>>>>>> state/savepoints
>>>>>>>>>> because many users are successfully running it on a large state and 
>>>>>>>>>> I am
>>>>>>>>>> not aware of any data loss problems, but nothing is impossible. What 
>>>>>>>>>> the
>>>>>>>>>> savepoint does is also straight forward: iterate a db snapshot and 
>>>>>>>>>> write
>>>>>>>>>> all key/value pairs to disk, so all data that was in the db at the 
>>>>>>>>>> time of
>>>>>>>>>> the savepoint, should show up. I also doubt that the problem is about
>>>>>>>>>> backpressure after restore, because the job will only continue 
>>>>>>>>>> running
>>>>>>>>>> after the state restore is already completed. Did you check if you 
>>>>>>>>>> are
>>>>>>>>>> using exactly-one-semantics or at-least-once semantics? Also did you 
>>>>>>>>>> check
>>>>>>>>>> that the kafka consumer start position is configured properly [2]? 
>>>>>>>>>> Are
>>>>>>>>>> watermarks generated as expected after restore?
>>>>>>>>>>
>>>>>>>>>> One more unrelated high-level comment that I have: for a
>>>>>>>>>> granularity of 24h windows, I wonder if it would not make sense to 
>>>>>>>>>> use a
>>>>>>>>>> batch job instead?
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Stefan
>>>>>>>>>>
>>>>>>>>>> [1] https://github.com/king/bravo
>>>>>>>>>> [2]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>>>>>>>>>
>>>>>>>>>> Am 04.10.2018 um 14:53 schrieb Juho Autio <juho.au...@rovio.com>:
>>>>>>>>>>
>>>>>>>>>> Thanks for the suggestions!
>>>>>>>>>>
>>>>>>>>>> > In general, it would be tremendously helpful to have a minimal
>>>>>>>>>> working example which allows to reproduce the problem.
>>>>>>>>>>
>>>>>>>>>> Definitely. The problem with reproducing has been that this only
>>>>>>>>>> seems to happen in the bigger production data volumes.
>>>>>>>>>>
>>>>>>>>>> That's why I'm hoping to find a way to debug this with the
>>>>>>>>>> production data. With that it seems to consistently cause some 
>>>>>>>>>> misses every
>>>>>>>>>> time the job is killed/restored.
>>>>>>>>>>
>>>>>>>>>> > check if it happens for shorter windows, like 1h etc
>>>>>>>>>>
>>>>>>>>>> What would be the benefit of that compared to 24h window?
>>>>>>>>>>
>>>>>>>>>> > simplify the job to not use a reduce window but simply a time
>>>>>>>>>> window which outputs the window events. Then counting the input and 
>>>>>>>>>> output
>>>>>>>>>> events should allow you to verify the results. If you are not seeing
>>>>>>>>>> missing events, then it could have something to do with the reducing 
>>>>>>>>>> state
>>>>>>>>>> used in the reduce function.
>>>>>>>>>>
>>>>>>>>>> Hm, maybe, but not sure how useful that would be, because it
>>>>>>>>>> wouldn't yet prove that it's related to reducing, because not having 
>>>>>>>>>> a
>>>>>>>>>> reduce function could also mean smaller load on the job, which might 
>>>>>>>>>> alone
>>>>>>>>>> be enough to make the problem not manifest.
>>>>>>>>>>
>>>>>>>>>> Is there a way to debug what goes into the reducing state
>>>>>>>>>> (including what gets removed or overwritten and what restored), if 
>>>>>>>>>> that
>>>>>>>>>> makes sense..? Maybe some suitable logging could be used to prove 
>>>>>>>>>> that the
>>>>>>>>>> lost data is written to the reducing state (or at least asked to be
>>>>>>>>>> written), but not found any more when the window closes and state is
>>>>>>>>>> flushed?
>>>>>>>>>>
>>>>>>>>>> On configuration once more, we're using RocksDB state backend
>>>>>>>>>> with asynchronous incremental checkpointing. The state is restored 
>>>>>>>>>> from
>>>>>>>>>> savepoints though, we haven't been using those checkpoints in these 
>>>>>>>>>> tests
>>>>>>>>>> (although they could be used in case of crashes – but we haven't had 
>>>>>>>>>> those
>>>>>>>>>> now).
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann <
>>>>>>>>>> trohrm...@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>
>>>>>>>>>>> another idea to further narrow down the problem could be to
>>>>>>>>>>> simplify the job to not use a reduce window but simply a time 
>>>>>>>>>>> window which
>>>>>>>>>>> outputs the window events. Then counting the input and output 
>>>>>>>>>>> events should
>>>>>>>>>>> allow you to verify the results. If you are not seeing missing 
>>>>>>>>>>> events, then
>>>>>>>>>>> it could have something to do with the reducing state used in the 
>>>>>>>>>>> reduce
>>>>>>>>>>> function.
>>>>>>>>>>>
>>>>>>>>>>> In general, it would be tremendously helpful to have a minimal
>>>>>>>>>>> working example which allows to reproduce the problem.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Till
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin <
>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>
>>>>>>>>>>>> can you try to reduce the job to minimal reproducible example
>>>>>>>>>>>> and share the job and input?
>>>>>>>>>>>>
>>>>>>>>>>>> For example:
>>>>>>>>>>>> - some simple records as input, e.g. tuples of primitive types
>>>>>>>>>>>> saved as cvs
>>>>>>>>>>>> - minimal deduplication job which processes them and misses
>>>>>>>>>>>> records
>>>>>>>>>>>> - check if it happens for shorter windows, like 1h etc
>>>>>>>>>>>> - setup which you use for the job, ideally locally reproducible
>>>>>>>>>>>> or cloud
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Andrey
>>>>>>>>>>>>
>>>>>>>>>>>> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Sorry to insist, but we seem to be blocked for any serious
>>>>>>>>>>>> usage of state in Flink if we can't rely on it to not miss data in 
>>>>>>>>>>>> case of
>>>>>>>>>>>> restore.
>>>>>>>>>>>>
>>>>>>>>>>>> Would anyone have suggestions for how to troubleshoot this? So
>>>>>>>>>>>> far I have verified with DEBUG logs that our reduce function gets 
>>>>>>>>>>>> to
>>>>>>>>>>>> process also the data that is missing from window output.
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <
>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Andrey,
>>>>>>>>>>>>>
>>>>>>>>>>>>> To rule out for good any questions about sink behaviour, the
>>>>>>>>>>>>> job was killed and started with an additional Kafka sink.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The same number of ids were missed in both outputs: KafkaSink
>>>>>>>>>>>>> & BucketingSink.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I wonder what would be the next steps in debugging?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <
>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks, Andrey.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> > so it means that the savepoint does not loose at least some
>>>>>>>>>>>>>> dropped records.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm not sure what you mean by that? I mean, it was known from
>>>>>>>>>>>>>> the beginning, that not everything is lost before/after 
>>>>>>>>>>>>>> restoring a
>>>>>>>>>>>>>> savepoint, just some records around the time of restoration. 
>>>>>>>>>>>>>> It's not 100%
>>>>>>>>>>>>>> clear whether records are lost before making a savepoint or 
>>>>>>>>>>>>>> after restoring
>>>>>>>>>>>>>> it. Although, based on the new DEBUG logs it seems more like 
>>>>>>>>>>>>>> losing some
>>>>>>>>>>>>>> records that are seen ~soon after restoring. It seems like Flink 
>>>>>>>>>>>>>> would be
>>>>>>>>>>>>>> somehow confused either about the restored state vs. new inserts 
>>>>>>>>>>>>>> to state.
>>>>>>>>>>>>>> This could also be somehow linked to the high back pressure on 
>>>>>>>>>>>>>> the kafka
>>>>>>>>>>>>>> source while the stream is catching up.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> > If it is feasible for your setup, I suggest to insert one
>>>>>>>>>>>>>> more map function after reduce and before sink.
>>>>>>>>>>>>>> > etc.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Isn't that the same thing that we discussed before? Nothing
>>>>>>>>>>>>>> is sent to BucketingSink before the window closes, so I don't 
>>>>>>>>>>>>>> see how it
>>>>>>>>>>>>>> would make any difference if we replace the BucketingSink with a 
>>>>>>>>>>>>>> map
>>>>>>>>>>>>>> function or another sink type. We don't create or restore 
>>>>>>>>>>>>>> savepoints during
>>>>>>>>>>>>>> the time when BucketingSink gets input or has open buckets – 
>>>>>>>>>>>>>> that happens
>>>>>>>>>>>>>> at a much later time of day. I would focus on figuring out why 
>>>>>>>>>>>>>> the records
>>>>>>>>>>>>>> are lost while the window is open. But I don't know how to do 
>>>>>>>>>>>>>> that. Would
>>>>>>>>>>>>>> you have any additional suggestions?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <
>>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> so it means that the savepoint does not loose at least some
>>>>>>>>>>>>>>> dropped records.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If it is feasible for your setup, I suggest to insert one
>>>>>>>>>>>>>>> more map function after reduce and before sink.
>>>>>>>>>>>>>>> The map function should be called right after window is
>>>>>>>>>>>>>>> triggered but before flushing to s3.
>>>>>>>>>>>>>>> The result of reduce (deduped record) could be logged there.
>>>>>>>>>>>>>>> This should allow to check whether the processed distinct
>>>>>>>>>>>>>>> records were buffered in the state after the restoration from 
>>>>>>>>>>>>>>> the savepoint
>>>>>>>>>>>>>>> or not. If they were buffered we should see that there was an 
>>>>>>>>>>>>>>> attempt to
>>>>>>>>>>>>>>> write them to the sink from the state.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Another suggestion is to try to write records to some other
>>>>>>>>>>>>>>> sink or to both.
>>>>>>>>>>>>>>> E.g. if you can access file system of workers, maybe just
>>>>>>>>>>>>>>> into local files and check whether the records are also dropped 
>>>>>>>>>>>>>>> there.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Andrey!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I was finally able to gather the DEBUG logs that you
>>>>>>>>>>>>>>> suggested. In short, the reducer logged that it processed at 
>>>>>>>>>>>>>>> least some of
>>>>>>>>>>>>>>> the ids that were missing from the output.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> "At least some", because I didn't have the job running with
>>>>>>>>>>>>>>> DEBUG logs for the full 24-hour window period. So I was only 
>>>>>>>>>>>>>>> able to look
>>>>>>>>>>>>>>> up if I can find *some* of the missing ids in the DEBUG
>>>>>>>>>>>>>>> logs. Which I did indeed.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I changed the DistinctFunction.java to do this:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>     public Map<String, String> reduce(Map<String, String>
>>>>>>>>>>>>>>> value1, Map<String, String> value2) {
>>>>>>>>>>>>>>>         LOG.debug("DistinctFunction.reduce returns: {}={}",
>>>>>>>>>>>>>>> value1.get("field"), value1.get("id"));
>>>>>>>>>>>>>>>         return value1;
>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Then:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> vi flink-1.6.0/conf/log4j.properties
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Then I ran the following kind of test:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - Cancelled the on-going job with savepoint created at ~Sep
>>>>>>>>>>>>>>> 18 08:35 UTC 2018
>>>>>>>>>>>>>>> - Started a new cluster & job with DEBUG enabled at ~09:13,
>>>>>>>>>>>>>>> restored from that previous cluster's savepoint
>>>>>>>>>>>>>>> - Ran until caught up offsets
>>>>>>>>>>>>>>> - Cancelled the job with a new savepoint
>>>>>>>>>>>>>>> - Started a new job _without_ DEBUG, which restored the new
>>>>>>>>>>>>>>> savepoint, let it keep running so that it will eventually write 
>>>>>>>>>>>>>>> the output
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Then on the next day, after results had been flushed when
>>>>>>>>>>>>>>> the 24-hour window closed, I compared the results again with a 
>>>>>>>>>>>>>>> batch
>>>>>>>>>>>>>>> version's output. And found some missing ids as usual.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I drilled down to one specific missing id (I'm replacing the
>>>>>>>>>>>>>>> actual value with AN12345 below), which was not found in the 
>>>>>>>>>>>>>>> stream output,
>>>>>>>>>>>>>>> but was found in batch output & flink DEBUG logs.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Related to that id, I gathered the following information:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2018-09-18~09:13:21,000 job started & savepoint is restored
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2018-09-18 09:14:29,085 missing id is processed for the
>>>>>>>>>>>>>>> first time, proved by this log line:
>>>>>>>>>>>>>>> 2018-09-18 09:14:29,085 DEBUG
>>>>>>>>>>>>>>> com.rovio.ds.flink.uniqueid.DistinctFunction                  -
>>>>>>>>>>>>>>> DistinctFunction.reduce returns: s.aid1=AN12345
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint
>>>>>>>>>>>>>>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> (
>>>>>>>>>>>>>>> more occurrences of checkpoints (~1 min checkpointing time +
>>>>>>>>>>>>>>> ~1 min delay before next)
>>>>>>>>>>>>>>> /
>>>>>>>>>>>>>>> more occurrences of DistinctFunction.reduce
>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2018-09-18 09:23:45,053 missing id is processed for the last
>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> To be noted, there was high backpressure after restoring
>>>>>>>>>>>>>>> from savepoint until the stream caught up with the kafka 
>>>>>>>>>>>>>>> offsets. Although,
>>>>>>>>>>>>>>> our job uses assign timestamps & watermarks on the flink kafka 
>>>>>>>>>>>>>>> consumer
>>>>>>>>>>>>>>> itself, so event time of all partitions is synchronized. As 
>>>>>>>>>>>>>>> expected, we
>>>>>>>>>>>>>>> don't get any late data in the late data side output.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> From this we can see that the missing ids are processed by
>>>>>>>>>>>>>>> the reducer, but they must get lost somewhere before the 
>>>>>>>>>>>>>>> 24-hour window is
>>>>>>>>>>>>>>> triggered.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think it's worth mentioning once more that the stream
>>>>>>>>>>>>>>> doesn't miss any ids if we let it's running without 
>>>>>>>>>>>>>>> interruptions / state
>>>>>>>>>>>>>>> restoring.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What's next?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <
>>>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> > only when the 24-hour window triggers, BucketingSink gets
>>>>>>>>>>>>>>>> a burst of input
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This is of course totally true, my understanding is the
>>>>>>>>>>>>>>>> same. We cannot exclude problem there for sure, just 
>>>>>>>>>>>>>>>> savepoints are used a
>>>>>>>>>>>>>>>> lot w/o problem reports and BucketingSink is known to be 
>>>>>>>>>>>>>>>> problematic with
>>>>>>>>>>>>>>>> s3. That is why, I asked you:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> > You also wrote that the timestamps of lost event are
>>>>>>>>>>>>>>>> 'probably' around the time of the savepoint, if it is not yet 
>>>>>>>>>>>>>>>> for sure I
>>>>>>>>>>>>>>>> would also check it.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Although, bucketing sink might loose any data at the end of
>>>>>>>>>>>>>>>> the day (also from the middle). The fact, that it is always 
>>>>>>>>>>>>>>>> around the time
>>>>>>>>>>>>>>>> of taking a savepoint and not random, is surely suspicious and 
>>>>>>>>>>>>>>>> possible
>>>>>>>>>>>>>>>> savepoint failures need to be investigated.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regarding the s3 problem, s3 doc says:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> > The caveat is that if you make a HEAD or GET request to
>>>>>>>>>>>>>>>> the key name (to find if the object exists) before creating 
>>>>>>>>>>>>>>>> the object,
>>>>>>>>>>>>>>>> Amazon S3 provides 'eventual consistency' for read-after-write.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The algorithm you suggest is how it is roughly implemented
>>>>>>>>>>>>>>>> now (BucketingSink.openNewPartFile). My understanding is that
>>>>>>>>>>>>>>>> 'eventual consistency’ means that even if you just created 
>>>>>>>>>>>>>>>> file (its name
>>>>>>>>>>>>>>>> is key) it can be that you do not get it in the list or exists 
>>>>>>>>>>>>>>>> (HEAD)
>>>>>>>>>>>>>>>> returns false and you risk to rewrite the previous part.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The BucketingSink was designed for a standard file system.
>>>>>>>>>>>>>>>> s3 is used over a file system wrapper atm but does not always 
>>>>>>>>>>>>>>>> provide
>>>>>>>>>>>>>>>> normal file system guarantees. See also last example in [1].
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Andrey, thank you very much for the debugging suggestions,
>>>>>>>>>>>>>>>> I'll try them.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> In the meanwhile two more questions, please:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> > Just to keep in mind this problem with s3 and exclude it
>>>>>>>>>>>>>>>> for sure. I would also check whether the size of missing 
>>>>>>>>>>>>>>>> events is around
>>>>>>>>>>>>>>>> the batch size of BucketingSink or not.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Fair enough, but I also want to focus on debugging the most
>>>>>>>>>>>>>>>> probable subject first. So what do you think about this – true 
>>>>>>>>>>>>>>>> or false:
>>>>>>>>>>>>>>>> only when the 24-hour window triggers, BucketinSink gets a 
>>>>>>>>>>>>>>>> burst of input.
>>>>>>>>>>>>>>>> Around the state restoring point (middle of the day) it 
>>>>>>>>>>>>>>>> doesn't get any
>>>>>>>>>>>>>>>> input, so it can't lose anything either. Isn't this true, or 
>>>>>>>>>>>>>>>> have I totally
>>>>>>>>>>>>>>>> missed how Flink works in triggering window results? I would 
>>>>>>>>>>>>>>>> not expect
>>>>>>>>>>>>>>>> there to be any optimization that speculatively triggers early 
>>>>>>>>>>>>>>>> results of a
>>>>>>>>>>>>>>>> regular time window to the downstream operators.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> > The old BucketingSink has in general problem with s3.
>>>>>>>>>>>>>>>> Internally BucketingSink queries s3 as a file system to list 
>>>>>>>>>>>>>>>> already
>>>>>>>>>>>>>>>> written file parts (batches) and determine index of the next 
>>>>>>>>>>>>>>>> part to start.
>>>>>>>>>>>>>>>> Due to eventual consistency of checking file existence in s3 
>>>>>>>>>>>>>>>> [1], the
>>>>>>>>>>>>>>>> BucketingSink can rewrite the previously written part and 
>>>>>>>>>>>>>>>> basically loose
>>>>>>>>>>>>>>>> it.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I was wondering, what does S3's "read-after-write
>>>>>>>>>>>>>>>> consistency" (mentioned on the page you linked) actually mean. 
>>>>>>>>>>>>>>>> It seems
>>>>>>>>>>>>>>>> that this might be possible:
>>>>>>>>>>>>>>>> - LIST keys, find current max index
>>>>>>>>>>>>>>>> - choose next index = max + 1
>>>>>>>>>>>>>>>> - HEAD next index: if it exists, keep adding + 1 until key
>>>>>>>>>>>>>>>> doesn't exist on S3
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> But definitely sounds easier if a sink keeps track of files
>>>>>>>>>>>>>>>> in a way that's guaranteed to be consistent.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> Juho
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> true, StreamingFileSink does not support s3 in 1.6.0, it
>>>>>>>>>>>>>>>>> is planned for the next 1.7 release, sorry for confusion.
>>>>>>>>>>>>>>>>> The old BucketingSink has in general problem with s3.
>>>>>>>>>>>>>>>>> Internally BucketingSink queries s3 as a file system
>>>>>>>>>>>>>>>>> to list already written file parts (batches) and determine
>>>>>>>>>>>>>>>>> index of the next part to start. Due to eventual consistency 
>>>>>>>>>>>>>>>>> of checking
>>>>>>>>>>>>>>>>> file existence in s3 [1], the BucketingSink can rewrite the 
>>>>>>>>>>>>>>>>> previously
>>>>>>>>>>>>>>>>> written part and basically loose it. It should be fixed for
>>>>>>>>>>>>>>>>> StreamingFileSink in 1.7 where Flink keeps its own track of 
>>>>>>>>>>>>>>>>> written parts
>>>>>>>>>>>>>>>>> and does not rely on s3 as a file system.
>>>>>>>>>>>>>>>>> I also include Kostas, he might add more details.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Just to keep in mind this problem with s3 and exclude it
>>>>>>>>>>>>>>>>> for sure  I would also check whether the size of missing 
>>>>>>>>>>>>>>>>> events is around
>>>>>>>>>>>>>>>>> the batch size of BucketingSink or not. You also wrote that 
>>>>>>>>>>>>>>>>> the timestamps
>>>>>>>>>>>>>>>>> of lost event are 'probably' around the time of the 
>>>>>>>>>>>>>>>>> savepoint, if it is not
>>>>>>>>>>>>>>>>> yet for sure I would also check it.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Have you already checked the log files of job manager and
>>>>>>>>>>>>>>>>> task managers for the job running before and after the 
>>>>>>>>>>>>>>>>> restore from the
>>>>>>>>>>>>>>>>> check point? Is everything successful there, no errors, 
>>>>>>>>>>>>>>>>> relevant warnings
>>>>>>>>>>>>>>>>> or exceptions?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> As the next step, I would suggest to log all encountered
>>>>>>>>>>>>>>>>> events in DistinctFunction.reduce if possible for production 
>>>>>>>>>>>>>>>>> data and check
>>>>>>>>>>>>>>>>> whether the missed events are eventually processed before or 
>>>>>>>>>>>>>>>>> after the
>>>>>>>>>>>>>>>>> savepoint. The following log message indicates a border 
>>>>>>>>>>>>>>>>> between the events
>>>>>>>>>>>>>>>>> that should be included into the savepoint (logged before) or 
>>>>>>>>>>>>>>>>> not:
>>>>>>>>>>>>>>>>> “{} ({}, synchronous part) in thread {} took {} ms”
>>>>>>>>>>>>>>>>> (template)
>>>>>>>>>>>>>>>>> Also check if the savepoint has been overall completed:
>>>>>>>>>>>>>>>>> "{} ({}, asynchronous part) in thread {} took {} ms."
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Using StreamingFileSink is not a convenient option for
>>>>>>>>>>>>>>>>> production use for us as it doesn't support s3*. I could use
>>>>>>>>>>>>>>>>> StreamingFileSink just to verify, but I don't see much point 
>>>>>>>>>>>>>>>>> in doing so.
>>>>>>>>>>>>>>>>> Please consider my previous comment:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> > I realized that BucketingSink must not play any role in
>>>>>>>>>>>>>>>>> this problem. This is because only when the 24-hour window 
>>>>>>>>>>>>>>>>> triggers,
>>>>>>>>>>>>>>>>> BucketingSink gets a burst of input. Around the state 
>>>>>>>>>>>>>>>>> restoring point
>>>>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't 
>>>>>>>>>>>>>>>>> lose anything
>>>>>>>>>>>>>>>>> either (right?).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I could also use a kafka sink instead, but I can't imagine
>>>>>>>>>>>>>>>>> how there could be any difference. It's very real that the 
>>>>>>>>>>>>>>>>> sink doesn't get
>>>>>>>>>>>>>>>>> any input for a long time until the 24-hour window closes, 
>>>>>>>>>>>>>>>>> and then it
>>>>>>>>>>>>>>>>> quickly writes out everything because it's not that much data 
>>>>>>>>>>>>>>>>> eventually
>>>>>>>>>>>>>>>>> for the distinct values.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Any ideas for debugging what's happening around the
>>>>>>>>>>>>>>>>> savepoint & restoration time?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> *) I actually implemented StreamingFileSink as an
>>>>>>>>>>>>>>>>> alternative sink. This was before I came to realize that most 
>>>>>>>>>>>>>>>>> likely the
>>>>>>>>>>>>>>>>> sink component has nothing to do with the data loss problem. 
>>>>>>>>>>>>>>>>> I tried it
>>>>>>>>>>>>>>>>> with s3n:// path just to see an exception being thrown. In 
>>>>>>>>>>>>>>>>> the source code
>>>>>>>>>>>>>>>>> I indeed then found an explicit check for the target path 
>>>>>>>>>>>>>>>>> scheme to be
>>>>>>>>>>>>>>>>> "hdfs://".
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Ok, I think before further debugging the window reduced
>>>>>>>>>>>>>>>>>> state,
>>>>>>>>>>>>>>>>>> could you try the new ‘StreamingFileSink’ [1] introduced
>>>>>>>>>>>>>>>>>> in Flink 1.6.0 instead of the previous 'BucketingSink’?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On 24 Aug 2018, at 18:03, Juho Autio <
>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Yes, sorry for my confusing comment. I just meant that it
>>>>>>>>>>>>>>>>>> seems like there's a bug somewhere now that the output is 
>>>>>>>>>>>>>>>>>> missing some data.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> > I would wait and check the actual output in s3 because
>>>>>>>>>>>>>>>>>> it is the main result of the job
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Yes, and that's what I have already done. There seems to
>>>>>>>>>>>>>>>>>> be always some data loss with the production data volumes, 
>>>>>>>>>>>>>>>>>> if the job has
>>>>>>>>>>>>>>>>>> been restarted on that day.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Would you have any suggestions for how to debug this
>>>>>>>>>>>>>>>>>> further?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Many thanks for stepping in.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> So it is a per key deduplication job.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Yes, I would wait and check the actual output in s3
>>>>>>>>>>>>>>>>>>> because it is the main result of the job and
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> > The late data around the time of taking savepoint
>>>>>>>>>>>>>>>>>>> might be not included into the savepoint but it should be 
>>>>>>>>>>>>>>>>>>> behind the
>>>>>>>>>>>>>>>>>>> snapshotted offset in Kafka.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> is not a bug, it is a possible behaviour.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The savepoint is a snapshot of the data in transient
>>>>>>>>>>>>>>>>>>> which is already consumed from Kafka.
>>>>>>>>>>>>>>>>>>> Basically the full contents of the window result is
>>>>>>>>>>>>>>>>>>> split between the savepoint and what can come after the 
>>>>>>>>>>>>>>>>>>> savepoint'ed offset
>>>>>>>>>>>>>>>>>>> in Kafka but before the window result is written into s3.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Allowed lateness should not affect it, I am just saying
>>>>>>>>>>>>>>>>>>> that the final result in s3 should include all records 
>>>>>>>>>>>>>>>>>>> after it.
>>>>>>>>>>>>>>>>>>> This is what should be guaranteed but not the contents
>>>>>>>>>>>>>>>>>>> of the intermediate savepoint.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio <
>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for your answer!
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I check for the missed data from the final output on s3.
>>>>>>>>>>>>>>>>>>> So I wait until the next day, then run the same thing 
>>>>>>>>>>>>>>>>>>> re-implemented in
>>>>>>>>>>>>>>>>>>> batch, and compare the output.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> > The late data around the time of taking savepoint
>>>>>>>>>>>>>>>>>>> might be not included into the savepoint but it should be 
>>>>>>>>>>>>>>>>>>> behind the
>>>>>>>>>>>>>>>>>>> snapshotted offset in Kafka.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Yes, I would definitely expect that. It seems like
>>>>>>>>>>>>>>>>>>> there's a bug somewhere.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> > Then it should just come later after the restore and
>>>>>>>>>>>>>>>>>>> should be reduced within the allowed lateness into the 
>>>>>>>>>>>>>>>>>>> final result which
>>>>>>>>>>>>>>>>>>> is saved into s3.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Well, as far as I know, allowed lateness doesn't play
>>>>>>>>>>>>>>>>>>> any role here, because I started running the job with 
>>>>>>>>>>>>>>>>>>> allowedLateness=0,
>>>>>>>>>>>>>>>>>>> and still get the data loss, while my late data output 
>>>>>>>>>>>>>>>>>>> doesn't receive
>>>>>>>>>>>>>>>>>>> anything.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> > Also, is this `DistinctFunction.reduce` just an
>>>>>>>>>>>>>>>>>>> example or the actual implementation, basically saving just 
>>>>>>>>>>>>>>>>>>> one of records
>>>>>>>>>>>>>>>>>>> inside the 24h window in s3? then what is missing there?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Yes, it's the actual implementation. Note that there's a
>>>>>>>>>>>>>>>>>>> keyBy before the DistinctFunction. So there's one record 
>>>>>>>>>>>>>>>>>>> for each key
>>>>>>>>>>>>>>>>>>> (which is the combination of a couple of fields). In 
>>>>>>>>>>>>>>>>>>> practice I've seen
>>>>>>>>>>>>>>>>>>> that we're missing ~2000-4000 elements on each restore, and 
>>>>>>>>>>>>>>>>>>> the total
>>>>>>>>>>>>>>>>>>> output is obviously much more than that.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Here's the full code for the key selector:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> public class MapKeySelector implements
>>>>>>>>>>>>>>>>>>> KeySelector<Map<String,String>, Object> {
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     private final String[] fields;
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     public MapKeySelector(String... fields) {
>>>>>>>>>>>>>>>>>>>         this.fields = fields;
>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>>>>     public Object getKey(Map<String, String> event)
>>>>>>>>>>>>>>>>>>> throws Exception {
>>>>>>>>>>>>>>>>>>>         Tuple key =
>>>>>>>>>>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance();
>>>>>>>>>>>>>>>>>>>         for (int i = 0; i < fields.length; i++) {
>>>>>>>>>>>>>>>>>>>             key.setField(event.getOrDefault(fields[i],
>>>>>>>>>>>>>>>>>>> ""), i);
>>>>>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>>>>>         return key;
>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> And a more exact example on how it's used:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>                 .keyBy(new MapKeySelector("ID",
>>>>>>>>>>>>>>>>>>> "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
>>>>>>>>>>>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>>>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Where exactly does the data miss? When do you notice
>>>>>>>>>>>>>>>>>>>> that?
>>>>>>>>>>>>>>>>>>>> Do you check it:
>>>>>>>>>>>>>>>>>>>> - debugging `DistinctFunction.reduce` right after
>>>>>>>>>>>>>>>>>>>> resume in the middle of the day
>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>> - some distinct records miss in the final output
>>>>>>>>>>>>>>>>>>>> of BucketingSink in s3 after window result is actually 
>>>>>>>>>>>>>>>>>>>> triggered and saved
>>>>>>>>>>>>>>>>>>>> into s3 at the end of the day? is this the main output?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The late data around the time of taking savepoint might
>>>>>>>>>>>>>>>>>>>> be not included into the savepoint but it should be behind 
>>>>>>>>>>>>>>>>>>>> the snapshotted
>>>>>>>>>>>>>>>>>>>> offset in Kafka. Then it should just come later after the 
>>>>>>>>>>>>>>>>>>>> restore and
>>>>>>>>>>>>>>>>>>>> should be reduced within the allowed lateness into the 
>>>>>>>>>>>>>>>>>>>> final result which
>>>>>>>>>>>>>>>>>>>> is saved into s3.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Also, is this `DistinctFunction.reduce` just an example
>>>>>>>>>>>>>>>>>>>> or the actual implementation, basically saving just one of 
>>>>>>>>>>>>>>>>>>>> records inside
>>>>>>>>>>>>>>>>>>>> the 24h window in s3? then what is missing there?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <
>>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I changed to allowedLateness=0, no change, still
>>>>>>>>>>>>>>>>>>>> missing data when restoring from savepoint.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <
>>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I realized that BucketingSink must not play any role
>>>>>>>>>>>>>>>>>>>>> in this problem. This is because only when the 24-hour 
>>>>>>>>>>>>>>>>>>>>> window triggers,
>>>>>>>>>>>>>>>>>>>>> BucketinSink gets a burst of input. Around the state 
>>>>>>>>>>>>>>>>>>>>> restoring point
>>>>>>>>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't 
>>>>>>>>>>>>>>>>>>>>> lose anything
>>>>>>>>>>>>>>>>>>>>> either (right?).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I will next try removing the allowedLateness entirely
>>>>>>>>>>>>>>>>>>>>> from the equation.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> In the meanwhile, please let me know if you have any
>>>>>>>>>>>>>>>>>>>>> suggestions for debugging the lost data, for example what 
>>>>>>>>>>>>>>>>>>>>> logs to enable.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known
>>>>>>>>>>>>>>>>>>>>> issues with that, that could contribute to lost data when 
>>>>>>>>>>>>>>>>>>>>> restoring a
>>>>>>>>>>>>>>>>>>>>> savepoint?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Some data is silently lost on my Flink stream job
>>>>>>>>>>>>>>>>>>>>>> when state is restored from a savepoint.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Do you have any debugging hints to find out where
>>>>>>>>>>>>>>>>>>>>>> exactly the data gets dropped?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> My job gathers distinct values using a 24-hour
>>>>>>>>>>>>>>>>>>>>>> window. It doesn't have any custom state management.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> When I cancel the job with savepoint and restore from
>>>>>>>>>>>>>>>>>>>>>> that savepoint, some data is missed. It seems to be 
>>>>>>>>>>>>>>>>>>>>>> losing just a small
>>>>>>>>>>>>>>>>>>>>>> amount of data. The event time of lost data is probably 
>>>>>>>>>>>>>>>>>>>>>> around the time of
>>>>>>>>>>>>>>>>>>>>>> savepoint. In other words the rest of the time window is 
>>>>>>>>>>>>>>>>>>>>>> not entirely
>>>>>>>>>>>>>>>>>>>>>> missed – collection works correctly also for (most of 
>>>>>>>>>>>>>>>>>>>>>> the) events that come
>>>>>>>>>>>>>>>>>>>>>> in after restoring.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> When the job processes a full 24-hour window without
>>>>>>>>>>>>>>>>>>>>>> interruptions it doesn't miss anything.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Usually the problem doesn't happen in test
>>>>>>>>>>>>>>>>>>>>>> environments that have smaller parallelism and smaller 
>>>>>>>>>>>>>>>>>>>>>> data volumes. But in
>>>>>>>>>>>>>>>>>>>>>> production volumes the job seems to be consistently 
>>>>>>>>>>>>>>>>>>>>>> missing at least
>>>>>>>>>>>>>>>>>>>>>> something on every restore.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> This issue has consistently happened since the job
>>>>>>>>>>>>>>>>>>>>>> was initially created. It was at first run on an older 
>>>>>>>>>>>>>>>>>>>>>> version of Flink
>>>>>>>>>>>>>>>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 
>>>>>>>>>>>>>>>>>>>>>> 1.6.0.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I'm wondering if this could be for example some
>>>>>>>>>>>>>>>>>>>>>> synchronization issue between the kafka consumer offsets 
>>>>>>>>>>>>>>>>>>>>>> vs. what's been
>>>>>>>>>>>>>>>>>>>>>> written by BucketingSink?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 1. Job content, simplified
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>         kafkaStream
>>>>>>>>>>>>>>>>>>>>>>                 .flatMap(new ExtractFieldsFunction())
>>>>>>>>>>>>>>>>>>>>>>                 .keyBy(new MapKeySelector(1, 2, 3, 4))
>>>>>>>>>>>>>>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>>>>>>>>>>>>>>                 .allowedLateness(allowedLateness)
>>>>>>>>>>>>>>>>>>>>>>                 .sideOutputLateData(lateDataTag)
>>>>>>>>>>>>>>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>>>>>>>>>>>>>>                 .addSink(sink)
>>>>>>>>>>>>>>>>>>>>>>                 // use a fixed number of output
>>>>>>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>>>>>>                 .setParallelism(8))
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>>>>>>>>>>  * Usage: .keyBy("the", "distinct",
>>>>>>>>>>>>>>>>>>>>>> "fields").reduce(new DistinctFunction())
>>>>>>>>>>>>>>>>>>>>>>  */
>>>>>>>>>>>>>>>>>>>>>> public class DistinctFunction implements
>>>>>>>>>>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> {
>>>>>>>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>>>>>>>     public Map<String, String> reduce(Map<String,
>>>>>>>>>>>>>>>>>>>>>> String> value1, Map<String, String> value2) {
>>>>>>>>>>>>>>>>>>>>>>         return value1;
>>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 2. State configuration
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> boolean enableIncrementalCheckpointing = true;
>>>>>>>>>>>>>>>>>>>>>> String statePath = "s3n://bucket/savepoints";
>>>>>>>>>>>>>>>>>>>>>> new RocksDBStateBackend(statePath,
>>>>>>>>>>>>>>>>>>>>>> enableIncrementalCheckpointing);
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Checkpointing Mode Exactly Once
>>>>>>>>>>>>>>>>>>>>>> Interval 1m 0s
>>>>>>>>>>>>>>>>>>>>>> Timeout 10m 0s
>>>>>>>>>>>>>>>>>>>>>> Minimum Pause Between Checkpoints 1m 0s
>>>>>>>>>>>>>>>>>>>>>> Maximum Concurrent Checkpoints 1
>>>>>>>>>>>>>>>>>>>>>> Persist Checkpoints Externally Enabled (retain on
>>>>>>>>>>>>>>>>>>>>>> cancellation)
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 3. BucketingSink configuration
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> We use BucketingSink, I don't think there's anything
>>>>>>>>>>>>>>>>>>>>>> special here, if not the fact that we're writing to S3.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>         String outputPath = "s3://bucket/output";
>>>>>>>>>>>>>>>>>>>>>>         BucketingSink<Map<String, String>> sink = new
>>>>>>>>>>>>>>>>>>>>>> BucketingSink<Map<String, String>>(outputPath)
>>>>>>>>>>>>>>>>>>>>>>                 .setBucketer(new
>>>>>>>>>>>>>>>>>>>>>> ProcessdateBucketer())
>>>>>>>>>>>>>>>>>>>>>>                 .setBatchSize(batchSize)
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>>>>>>>>>>>>>>>>>>>>         sink.setWriter(new IdJsonWriter());
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 4. Kafka & event time
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> My flink job reads the data from Kafka, using a
>>>>>>>>>>>>>>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka 
>>>>>>>>>>>>>>>>>>>>>> consumer to
>>>>>>>>>>>>>>>>>>>>>> synchronize watermarks accross all kafka partitions. We 
>>>>>>>>>>>>>>>>>>>>>> also write late
>>>>>>>>>>>>>>>>>>>>>> data to side output, but nothing is written there – if 
>>>>>>>>>>>>>>>>>>>>>> it would, it could
>>>>>>>>>>>>>>>>>>>>>> explain missed data in the main output (I'm also sure 
>>>>>>>>>>>>>>>>>>>>>> that our late data
>>>>>>>>>>>>>>>>>>>>>> writing works, because we previously had some actual 
>>>>>>>>>>>>>>>>>>>>>> late data which ended
>>>>>>>>>>>>>>>>>>>>>> up there).
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 5. allowedLateness
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> It may be or may not be relevant that I have also
>>>>>>>>>>>>>>>>>>>>>> enabled allowedLateness with 1 minute lateness on the 
>>>>>>>>>>>>>>>>>>>>>> 24-hour window:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> If that makes sense, I could try removing
>>>>>>>>>>>>>>>>>>>>>> allowedLateness entirely? That would be just to rule out 
>>>>>>>>>>>>>>>>>>>>>> that Flink doesn't
>>>>>>>>>>>>>>>>>>>>>> have a bug that's related to restoring state in 
>>>>>>>>>>>>>>>>>>>>>> combination with the
>>>>>>>>>>>>>>>>>>>>>> allowedLateness feature. After all, all of our data 
>>>>>>>>>>>>>>>>>>>>>> should be in a good
>>>>>>>>>>>>>>>>>>>>>> enough order to not be late, given the max out of 
>>>>>>>>>>>>>>>>>>>>>> orderness used on kafka
>>>>>>>>>>>>>>>>>>>>>> consumer timestamp extractor.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thank you in advance!
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>

Reply via email to