Bump – does anyone know if Stefan will be available to comment the latest
findings? Thanks.

On Fri, Dec 21, 2018 at 2:33 PM Juho Autio <juho.au...@rovio.com> wrote:

> Stefan, I managed to analyze savepoint with bravo. It seems that the data
> that's missing from output *is* found in savepoint.
>
> I simplified my test case to the following:
>
> - job 1 has bee running for ~10 days
> - savepoint X created & job 1 cancelled
> - job 2 started with restore from savepoint X
>
> Then I waited until the next day so that job 2 has triggered the 24 hour
> window.
>
> Then I analyzed the output & savepoint:
>
> - compare job 2 output with the output of a batch pyspark script => find
> 4223 missing rows
> - pick one of the missing rows (say, id Z)
> - read savepoint X with bravo, filter for id Z => Z was found in the
> savepoint!
>
> How can it be possible that the value is in state but doesn't end up in
> output after state has been restored & window is eventually triggered?
>
> I also did similar analysis on the previous case where I savepointed &
> restored the job multiple times (5) within the same 24-hour window. A
> missing id that I drilled down to, was found in all of those savepoints,
> yet missing from the output that gets written at the end of the day. This
> is even more surprising: that the missing ID was written to the new
> savepoints also after restoring. Is the reducer state somehow decoupled
> from the window contents?
>
> Big thanks to bravo-developer Gyula for guiding me through to be able read
> the reducer state! https://github.com/king/bravo/pull/11
>
> Gyula also had an idea for how to troubleshoot the missing data in a
> scalable way: I could add some "side effect kafka output" on individual
> operators. This should allow tracking more closely at which point the data
> gets lost. However, maybe this would have to be in some Flink's internal
> components, and I'm not sure which those would be.
>
> Cheers,
> Juho
>
> On Mon, Nov 19, 2018 at 11:52 AM Juho Autio <juho.au...@rovio.com> wrote:
>
>>
>> Hi Stefan,
>>
>> Bravo doesn't currently support reading a reducer state. I gave it a try
>> but couldn't get to a working implementation yet. If anyone can provide
>> some insight on how to make this work, please share at github:
>> https://github.com/king/bravo/pull/11
>>
>> Thanks.
>>
>> On Tue, Oct 23, 2018 at 3:32 PM Juho Autio <juho.au...@rovio.com> wrote:
>>
>>> I was glad to find that bravo had now been updated to support installing
>>> bravo to a local maven repo.
>>>
>>> I was able to load a checkpoint created by my job, thanks to the example
>>> provided in bravo README, but I'm still missing the essential piece.
>>>
>>> My code was:
>>>
>>>         OperatorStateReader reader = new OperatorStateReader(env2,
>>> savepoint, "DistinctFunction");
>>>         DontKnowWhatTypeThisIs reducingState =
>>> reader.readKeyedStates(what should I put here?);
>>>
>>> I don't know how to read the values collected from reduce() calls in the
>>> state. Is there a way to access the reducing state of the window with
>>> bravo? I'm a bit confused how this works, because when I check with
>>> debugger, flink internally uses a ReducingStateDescriptor
>>> with name=window-contents, but still reading operator state for
>>> "DistinctFunction" didn't at least throw an exception ("window-contents"
>>> threw – obviously there's no operator by that name).
>>>
>>> Cheers,
>>> Juho
>>>
>>> On Mon, Oct 15, 2018 at 2:25 PM Juho Autio <juho.au...@rovio.com> wrote:
>>>
>>>> Hi Stefan,
>>>>
>>>> Sorry but it doesn't seem immediately clear to me what's a good way to
>>>> use https://github.com/king/bravo.
>>>>
>>>> How are people using it? Would you for example modify build.gradle
>>>> somehow to publish the bravo as a library locally/internally? Or add code
>>>> directly in the bravo project (locally) and run it from there (using an
>>>> IDE, for example)? Also it doesn't seem like the bravo gradle project
>>>> supports building a flink job jar, but if it does, how do I do it?
>>>>
>>>> Thanks.
>>>>
>>>> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio <juho.au...@rovio.com> wrote:
>>>>
>>>>> Good then, I'll try to analyze the savepoints with Bravo. Thanks!
>>>>>
>>>>> > How would you assume that backpressure would influence your updates?
>>>>> Updates to each local state still happen event-by-event, in a single
>>>>> reader/writing thread.
>>>>>
>>>>> Sure, just an ignorant guess by me. I'm not familiar with most of
>>>>> Flink's internals. Any way high backpressure is not a seen on this job
>>>>> after it has caught up the lag, so at I thought it would be worth
>>>>> mentioning.
>>>>>
>>>>> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter <
>>>>> s.rich...@data-artisans.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.au...@rovio.com>:
>>>>>>
>>>>>> > you could take a look at Bravo [1] to query your savepoints and to
>>>>>> check if the state in the savepoint complete w.r.t your expectations
>>>>>>
>>>>>> Thanks. I'm not 100% if this is the case, but to me it seemed like
>>>>>> the missed ids were being logged by the reducer soon after the job had
>>>>>> started (after restoring a savepoint). But on the other hand, after that 
>>>>>> I
>>>>>> also made another savepoint & restored that, so what I could check is: 
>>>>>> does
>>>>>> that next savepoint have the missed ids that were logged (a couple of
>>>>>> minutes before the savepoint was created, so there should've been more 
>>>>>> than
>>>>>> enough time to add them to the state before the savepoint was triggered) 
>>>>>> or
>>>>>> not. Any way, if I would be able to verify with Bravo that the ids are
>>>>>> missing from the savepoint (even though reduced logged that it saw them),
>>>>>> would that help in figuring out where they are lost? Is there some major
>>>>>> difference compared to just looking at the final output after window has
>>>>>> been triggered?
>>>>>>
>>>>>>
>>>>>>
>>>>>> I think that makes a difference. For example, you can investigate if
>>>>>> there is a state loss or a problem with the windowing. In the savepoint 
>>>>>> you
>>>>>> could see which keys exists and to which windows they are assigned. Also
>>>>>> just to make sure there is no misunderstanding: only elements that are in
>>>>>> the state at the start of a savepoint are expected to be part of the
>>>>>> savepoint; all elements between start and completion of the savepoint are
>>>>>> not expected to be part of the savepoint.
>>>>>>
>>>>>>
>>>>>> > I also doubt that the problem is about backpressure after restore,
>>>>>> because the job will only continue running after the state restore is
>>>>>> already completed.
>>>>>>
>>>>>> Yes, I'm not suspecting that the state restoring would be the problem
>>>>>> either. My concern was about backpressure possibly messing with the 
>>>>>> updates
>>>>>> of reducing state? I would tend to suspect that updating the state
>>>>>> consistently is what fails, where heavy load / backpressure might be a
>>>>>> factor.
>>>>>>
>>>>>>
>>>>>>
>>>>>> How would you assume that backpressure would influence your updates?
>>>>>> Updates to each local state still happen event-by-event, in a single
>>>>>> reader/writing thread.
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter <
>>>>>> s.rich...@data-artisans.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> you could take a look at Bravo [1] to query your savepoints and to
>>>>>>> check if the state in the savepoint complete w.r.t your expectations. I
>>>>>>> somewhat doubt that there is a general problem with the state/savepoints
>>>>>>> because many users are successfully running it on a large state and I am
>>>>>>> not aware of any data loss problems, but nothing is impossible. What the
>>>>>>> savepoint does is also straight forward: iterate a db snapshot and write
>>>>>>> all key/value pairs to disk, so all data that was in the db at the time 
>>>>>>> of
>>>>>>> the savepoint, should show up. I also doubt that the problem is about
>>>>>>> backpressure after restore, because the job will only continue running
>>>>>>> after the state restore is already completed. Did you check if you are
>>>>>>> using exactly-one-semantics or at-least-once semantics? Also did you 
>>>>>>> check
>>>>>>> that the kafka consumer start position is configured properly [2]? Are
>>>>>>> watermarks generated as expected after restore?
>>>>>>>
>>>>>>> One more unrelated high-level comment that I have: for a granularity
>>>>>>> of 24h windows, I wonder if it would not make sense to use a batch job
>>>>>>> instead?
>>>>>>>
>>>>>>> Best,
>>>>>>> Stefan
>>>>>>>
>>>>>>> [1] https://github.com/king/bravo
>>>>>>> [2]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>>>>>>
>>>>>>> Am 04.10.2018 um 14:53 schrieb Juho Autio <juho.au...@rovio.com>:
>>>>>>>
>>>>>>> Thanks for the suggestions!
>>>>>>>
>>>>>>> > In general, it would be tremendously helpful to have a minimal
>>>>>>> working example which allows to reproduce the problem.
>>>>>>>
>>>>>>> Definitely. The problem with reproducing has been that this only
>>>>>>> seems to happen in the bigger production data volumes.
>>>>>>>
>>>>>>> That's why I'm hoping to find a way to debug this with the
>>>>>>> production data. With that it seems to consistently cause some misses 
>>>>>>> every
>>>>>>> time the job is killed/restored.
>>>>>>>
>>>>>>> > check if it happens for shorter windows, like 1h etc
>>>>>>>
>>>>>>> What would be the benefit of that compared to 24h window?
>>>>>>>
>>>>>>> > simplify the job to not use a reduce window but simply a time
>>>>>>> window which outputs the window events. Then counting the input and 
>>>>>>> output
>>>>>>> events should allow you to verify the results. If you are not seeing
>>>>>>> missing events, then it could have something to do with the reducing 
>>>>>>> state
>>>>>>> used in the reduce function.
>>>>>>>
>>>>>>> Hm, maybe, but not sure how useful that would be, because it
>>>>>>> wouldn't yet prove that it's related to reducing, because not having a
>>>>>>> reduce function could also mean smaller load on the job, which might 
>>>>>>> alone
>>>>>>> be enough to make the problem not manifest.
>>>>>>>
>>>>>>> Is there a way to debug what goes into the reducing state (including
>>>>>>> what gets removed or overwritten and what restored), if that makes 
>>>>>>> sense..?
>>>>>>> Maybe some suitable logging could be used to prove that the lost data is
>>>>>>> written to the reducing state (or at least asked to be written), but not
>>>>>>> found any more when the window closes and state is flushed?
>>>>>>>
>>>>>>> On configuration once more, we're using RocksDB state backend with
>>>>>>> asynchronous incremental checkpointing. The state is restored from
>>>>>>> savepoints though, we haven't been using those checkpoints in these 
>>>>>>> tests
>>>>>>> (although they could be used in case of crashes – but we haven't had 
>>>>>>> those
>>>>>>> now).
>>>>>>>
>>>>>>> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann <trohrm...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Juho,
>>>>>>>>
>>>>>>>> another idea to further narrow down the problem could be to
>>>>>>>> simplify the job to not use a reduce window but simply a time window 
>>>>>>>> which
>>>>>>>> outputs the window events. Then counting the input and output events 
>>>>>>>> should
>>>>>>>> allow you to verify the results. If you are not seeing missing events, 
>>>>>>>> then
>>>>>>>> it could have something to do with the reducing state used in the 
>>>>>>>> reduce
>>>>>>>> function.
>>>>>>>>
>>>>>>>> In general, it would be tremendously helpful to have a minimal
>>>>>>>> working example which allows to reproduce the problem.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin <
>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Juho,
>>>>>>>>>
>>>>>>>>> can you try to reduce the job to minimal reproducible example and
>>>>>>>>> share the job and input?
>>>>>>>>>
>>>>>>>>> For example:
>>>>>>>>> - some simple records as input, e.g. tuples of primitive types
>>>>>>>>> saved as cvs
>>>>>>>>> - minimal deduplication job which processes them and misses records
>>>>>>>>> - check if it happens for shorter windows, like 1h etc
>>>>>>>>> - setup which you use for the job, ideally locally reproducible or
>>>>>>>>> cloud
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Andrey
>>>>>>>>>
>>>>>>>>> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com> wrote:
>>>>>>>>>
>>>>>>>>> Sorry to insist, but we seem to be blocked for any serious usage
>>>>>>>>> of state in Flink if we can't rely on it to not miss data in case of
>>>>>>>>> restore.
>>>>>>>>>
>>>>>>>>> Would anyone have suggestions for how to troubleshoot this? So far
>>>>>>>>> I have verified with DEBUG logs that our reduce function gets to 
>>>>>>>>> process
>>>>>>>>> also the data that is missing from window output.
>>>>>>>>>
>>>>>>>>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <juho.au...@rovio.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Andrey,
>>>>>>>>>>
>>>>>>>>>> To rule out for good any questions about sink behaviour, the job
>>>>>>>>>> was killed and started with an additional Kafka sink.
>>>>>>>>>>
>>>>>>>>>> The same number of ids were missed in both outputs: KafkaSink &
>>>>>>>>>> BucketingSink.
>>>>>>>>>>
>>>>>>>>>> I wonder what would be the next steps in debugging?
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <juho.au...@rovio.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks, Andrey.
>>>>>>>>>>>
>>>>>>>>>>> > so it means that the savepoint does not loose at least some
>>>>>>>>>>> dropped records.
>>>>>>>>>>>
>>>>>>>>>>> I'm not sure what you mean by that? I mean, it was known from
>>>>>>>>>>> the beginning, that not everything is lost before/after restoring a
>>>>>>>>>>> savepoint, just some records around the time of restoration. It's 
>>>>>>>>>>> not 100%
>>>>>>>>>>> clear whether records are lost before making a savepoint or after 
>>>>>>>>>>> restoring
>>>>>>>>>>> it. Although, based on the new DEBUG logs it seems more like losing 
>>>>>>>>>>> some
>>>>>>>>>>> records that are seen ~soon after restoring. It seems like Flink 
>>>>>>>>>>> would be
>>>>>>>>>>> somehow confused either about the restored state vs. new inserts to 
>>>>>>>>>>> state.
>>>>>>>>>>> This could also be somehow linked to the high back pressure on the 
>>>>>>>>>>> kafka
>>>>>>>>>>> source while the stream is catching up.
>>>>>>>>>>>
>>>>>>>>>>> > If it is feasible for your setup, I suggest to insert one more
>>>>>>>>>>> map function after reduce and before sink.
>>>>>>>>>>> > etc.
>>>>>>>>>>>
>>>>>>>>>>> Isn't that the same thing that we discussed before? Nothing is
>>>>>>>>>>> sent to BucketingSink before the window closes, so I don't see how 
>>>>>>>>>>> it would
>>>>>>>>>>> make any difference if we replace the BucketingSink with a map 
>>>>>>>>>>> function or
>>>>>>>>>>> another sink type. We don't create or restore savepoints during the 
>>>>>>>>>>> time
>>>>>>>>>>> when BucketingSink gets input or has open buckets – that happens at 
>>>>>>>>>>> a much
>>>>>>>>>>> later time of day. I would focus on figuring out why the records 
>>>>>>>>>>> are lost
>>>>>>>>>>> while the window is open. But I don't know how to do that. Would 
>>>>>>>>>>> you have
>>>>>>>>>>> any additional suggestions?
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <
>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>
>>>>>>>>>>>> so it means that the savepoint does not loose at least some
>>>>>>>>>>>> dropped records.
>>>>>>>>>>>>
>>>>>>>>>>>> If it is feasible for your setup, I suggest to insert one more
>>>>>>>>>>>> map function after reduce and before sink.
>>>>>>>>>>>> The map function should be called right after window is
>>>>>>>>>>>> triggered but before flushing to s3.
>>>>>>>>>>>> The result of reduce (deduped record) could be logged there.
>>>>>>>>>>>> This should allow to check whether the processed distinct
>>>>>>>>>>>> records were buffered in the state after the restoration from the 
>>>>>>>>>>>> savepoint
>>>>>>>>>>>> or not. If they were buffered we should see that there was an 
>>>>>>>>>>>> attempt to
>>>>>>>>>>>> write them to the sink from the state.
>>>>>>>>>>>>
>>>>>>>>>>>> Another suggestion is to try to write records to some other
>>>>>>>>>>>> sink or to both.
>>>>>>>>>>>> E.g. if you can access file system of workers, maybe just into
>>>>>>>>>>>> local files and check whether the records are also dropped there.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Andrey
>>>>>>>>>>>>
>>>>>>>>>>>> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Andrey!
>>>>>>>>>>>>
>>>>>>>>>>>> I was finally able to gather the DEBUG logs that you suggested.
>>>>>>>>>>>> In short, the reducer logged that it processed at least some of 
>>>>>>>>>>>> the ids
>>>>>>>>>>>> that were missing from the output.
>>>>>>>>>>>>
>>>>>>>>>>>> "At least some", because I didn't have the job running with
>>>>>>>>>>>> DEBUG logs for the full 24-hour window period. So I was only able 
>>>>>>>>>>>> to look
>>>>>>>>>>>> up if I can find *some* of the missing ids in the DEBUG logs.
>>>>>>>>>>>> Which I did indeed.
>>>>>>>>>>>>
>>>>>>>>>>>> I changed the DistinctFunction.java to do this:
>>>>>>>>>>>>
>>>>>>>>>>>>     @Override
>>>>>>>>>>>>     public Map<String, String> reduce(Map<String, String>
>>>>>>>>>>>> value1, Map<String, String> value2) {
>>>>>>>>>>>>         LOG.debug("DistinctFunction.reduce returns: {}={}",
>>>>>>>>>>>> value1.get("field"), value1.get("id"));
>>>>>>>>>>>>         return value1;
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>> Then:
>>>>>>>>>>>>
>>>>>>>>>>>> vi flink-1.6.0/conf/log4j.properties
>>>>>>>>>>>>
>>>>>>>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>>>>>>>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>>>>>>>>>>>>
>>>>>>>>>>>> Then I ran the following kind of test:
>>>>>>>>>>>>
>>>>>>>>>>>> - Cancelled the on-going job with savepoint created at ~Sep 18
>>>>>>>>>>>> 08:35 UTC 2018
>>>>>>>>>>>> - Started a new cluster & job with DEBUG enabled at ~09:13,
>>>>>>>>>>>> restored from that previous cluster's savepoint
>>>>>>>>>>>> - Ran until caught up offsets
>>>>>>>>>>>> - Cancelled the job with a new savepoint
>>>>>>>>>>>> - Started a new job _without_ DEBUG, which restored the new
>>>>>>>>>>>> savepoint, let it keep running so that it will eventually write 
>>>>>>>>>>>> the output
>>>>>>>>>>>>
>>>>>>>>>>>> Then on the next day, after results had been flushed when the
>>>>>>>>>>>> 24-hour window closed, I compared the results again with a batch 
>>>>>>>>>>>> version's
>>>>>>>>>>>> output. And found some missing ids as usual.
>>>>>>>>>>>>
>>>>>>>>>>>> I drilled down to one specific missing id (I'm replacing the
>>>>>>>>>>>> actual value with AN12345 below), which was not found in the 
>>>>>>>>>>>> stream output,
>>>>>>>>>>>> but was found in batch output & flink DEBUG logs.
>>>>>>>>>>>>
>>>>>>>>>>>> Related to that id, I gathered the following information:
>>>>>>>>>>>>
>>>>>>>>>>>> 2018-09-18~09:13:21,000 job started & savepoint is restored
>>>>>>>>>>>>
>>>>>>>>>>>> 2018-09-18 09:14:29,085 missing id is processed for the first
>>>>>>>>>>>> time, proved by this log line:
>>>>>>>>>>>> 2018-09-18 09:14:29,085 DEBUG
>>>>>>>>>>>> com.rovio.ds.flink.uniqueid.DistinctFunction                  -
>>>>>>>>>>>> DistinctFunction.reduce returns: s.aid1=AN12345
>>>>>>>>>>>>
>>>>>>>>>>>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint
>>>>>>>>>>>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint
>>>>>>>>>>>>
>>>>>>>>>>>> (
>>>>>>>>>>>> more occurrences of checkpoints (~1 min checkpointing time + ~1
>>>>>>>>>>>> min delay before next)
>>>>>>>>>>>> /
>>>>>>>>>>>> more occurrences of DistinctFunction.reduce
>>>>>>>>>>>> )
>>>>>>>>>>>>
>>>>>>>>>>>> 2018-09-18 09:23:45,053 missing id is processed for the last
>>>>>>>>>>>> time
>>>>>>>>>>>>
>>>>>>>>>>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled
>>>>>>>>>>>>
>>>>>>>>>>>> To be noted, there was high backpressure after restoring from
>>>>>>>>>>>> savepoint until the stream caught up with the kafka offsets. 
>>>>>>>>>>>> Although, our
>>>>>>>>>>>> job uses assign timestamps & watermarks on the flink kafka 
>>>>>>>>>>>> consumer itself,
>>>>>>>>>>>> so event time of all partitions is synchronized. As expected, we 
>>>>>>>>>>>> don't get
>>>>>>>>>>>> any late data in the late data side output.
>>>>>>>>>>>>
>>>>>>>>>>>> From this we can see that the missing ids are processed by the
>>>>>>>>>>>> reducer, but they must get lost somewhere before the 24-hour 
>>>>>>>>>>>> window is
>>>>>>>>>>>> triggered.
>>>>>>>>>>>>
>>>>>>>>>>>> I think it's worth mentioning once more that the stream doesn't
>>>>>>>>>>>> miss any ids if we let it's running without interruptions / state 
>>>>>>>>>>>> restoring.
>>>>>>>>>>>>
>>>>>>>>>>>> What's next?
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <
>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>
>>>>>>>>>>>>> > only when the 24-hour window triggers, BucketingSink gets a
>>>>>>>>>>>>> burst of input
>>>>>>>>>>>>>
>>>>>>>>>>>>> This is of course totally true, my understanding is the same.
>>>>>>>>>>>>> We cannot exclude problem there for sure, just savepoints are 
>>>>>>>>>>>>> used a lot
>>>>>>>>>>>>> w/o problem reports and BucketingSink is known to be problematic 
>>>>>>>>>>>>> with s3.
>>>>>>>>>>>>> That is why, I asked you:
>>>>>>>>>>>>>
>>>>>>>>>>>>> > You also wrote that the timestamps of lost event are
>>>>>>>>>>>>> 'probably' around the time of the savepoint, if it is not yet for 
>>>>>>>>>>>>> sure I
>>>>>>>>>>>>> would also check it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Although, bucketing sink might loose any data at the end of
>>>>>>>>>>>>> the day (also from the middle). The fact, that it is always 
>>>>>>>>>>>>> around the time
>>>>>>>>>>>>> of taking a savepoint and not random, is surely suspicious and 
>>>>>>>>>>>>> possible
>>>>>>>>>>>>> savepoint failures need to be investigated.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regarding the s3 problem, s3 doc says:
>>>>>>>>>>>>>
>>>>>>>>>>>>> > The caveat is that if you make a HEAD or GET request to the
>>>>>>>>>>>>> key name (to find if the object exists) before creating the 
>>>>>>>>>>>>> object, Amazon
>>>>>>>>>>>>> S3 provides 'eventual consistency' for read-after-write.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The algorithm you suggest is how it is roughly implemented now
>>>>>>>>>>>>> (BucketingSink.openNewPartFile). My understanding is that
>>>>>>>>>>>>> 'eventual consistency’ means that even if you just created file 
>>>>>>>>>>>>> (its name
>>>>>>>>>>>>> is key) it can be that you do not get it in the list or exists 
>>>>>>>>>>>>> (HEAD)
>>>>>>>>>>>>> returns false and you risk to rewrite the previous part.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The BucketingSink was designed for a standard file system. s3
>>>>>>>>>>>>> is used over a file system wrapper atm but does not always 
>>>>>>>>>>>>> provide normal
>>>>>>>>>>>>> file system guarantees. See also last example in [1].
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Andrey, thank you very much for the debugging suggestions,
>>>>>>>>>>>>> I'll try them.
>>>>>>>>>>>>>
>>>>>>>>>>>>> In the meanwhile two more questions, please:
>>>>>>>>>>>>>
>>>>>>>>>>>>> > Just to keep in mind this problem with s3 and exclude it for
>>>>>>>>>>>>> sure. I would also check whether the size of missing events is 
>>>>>>>>>>>>> around the
>>>>>>>>>>>>> batch size of BucketingSink or not.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Fair enough, but I also want to focus on debugging the most
>>>>>>>>>>>>> probable subject first. So what do you think about this – true or 
>>>>>>>>>>>>> false:
>>>>>>>>>>>>> only when the 24-hour window triggers, BucketinSink gets a burst 
>>>>>>>>>>>>> of input.
>>>>>>>>>>>>> Around the state restoring point (middle of the day) it doesn't 
>>>>>>>>>>>>> get any
>>>>>>>>>>>>> input, so it can't lose anything either. Isn't this true, or have 
>>>>>>>>>>>>> I totally
>>>>>>>>>>>>> missed how Flink works in triggering window results? I would not 
>>>>>>>>>>>>> expect
>>>>>>>>>>>>> there to be any optimization that speculatively triggers early 
>>>>>>>>>>>>> results of a
>>>>>>>>>>>>> regular time window to the downstream operators.
>>>>>>>>>>>>>
>>>>>>>>>>>>> > The old BucketingSink has in general problem with s3.
>>>>>>>>>>>>> Internally BucketingSink queries s3 as a file system to list 
>>>>>>>>>>>>> already
>>>>>>>>>>>>> written file parts (batches) and determine index of the next part 
>>>>>>>>>>>>> to start.
>>>>>>>>>>>>> Due to eventual consistency of checking file existence in s3 [1], 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> BucketingSink can rewrite the previously written part and 
>>>>>>>>>>>>> basically loose
>>>>>>>>>>>>> it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I was wondering, what does S3's "read-after-write consistency"
>>>>>>>>>>>>> (mentioned on the page you linked) actually mean. It seems that 
>>>>>>>>>>>>> this might
>>>>>>>>>>>>> be possible:
>>>>>>>>>>>>> - LIST keys, find current max index
>>>>>>>>>>>>> - choose next index = max + 1
>>>>>>>>>>>>> - HEAD next index: if it exists, keep adding + 1 until key
>>>>>>>>>>>>> doesn't exist on S3
>>>>>>>>>>>>>
>>>>>>>>>>>>> But definitely sounds easier if a sink keeps track of files in
>>>>>>>>>>>>> a way that's guaranteed to be consistent.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Juho
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <
>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> true, StreamingFileSink does not support s3 in 1.6.0, it is
>>>>>>>>>>>>>> planned for the next 1.7 release, sorry for confusion.
>>>>>>>>>>>>>> The old BucketingSink has in general problem with s3.
>>>>>>>>>>>>>> Internally BucketingSink queries s3 as a file system
>>>>>>>>>>>>>> to list already written file parts (batches) and determine
>>>>>>>>>>>>>> index of the next part to start. Due to eventual consistency of 
>>>>>>>>>>>>>> checking
>>>>>>>>>>>>>> file existence in s3 [1], the BucketingSink can rewrite the 
>>>>>>>>>>>>>> previously
>>>>>>>>>>>>>> written part and basically loose it. It should be fixed for
>>>>>>>>>>>>>> StreamingFileSink in 1.7 where Flink keeps its own track of 
>>>>>>>>>>>>>> written parts
>>>>>>>>>>>>>> and does not rely on s3 as a file system.
>>>>>>>>>>>>>> I also include Kostas, he might add more details.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Just to keep in mind this problem with s3 and exclude it for
>>>>>>>>>>>>>> sure  I would also check whether the size of missing events is 
>>>>>>>>>>>>>> around the
>>>>>>>>>>>>>> batch size of BucketingSink or not. You also wrote that the 
>>>>>>>>>>>>>> timestamps of
>>>>>>>>>>>>>> lost event are 'probably' around the time of the savepoint, if 
>>>>>>>>>>>>>> it is not
>>>>>>>>>>>>>> yet for sure I would also check it.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Have you already checked the log files of job manager and
>>>>>>>>>>>>>> task managers for the job running before and after the restore 
>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>> check point? Is everything successful there, no errors, relevant 
>>>>>>>>>>>>>> warnings
>>>>>>>>>>>>>> or exceptions?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As the next step, I would suggest to log all encountered
>>>>>>>>>>>>>> events in DistinctFunction.reduce if possible for production 
>>>>>>>>>>>>>> data and check
>>>>>>>>>>>>>> whether the missed events are eventually processed before or 
>>>>>>>>>>>>>> after the
>>>>>>>>>>>>>> savepoint. The following log message indicates a border between 
>>>>>>>>>>>>>> the events
>>>>>>>>>>>>>> that should be included into the savepoint (logged before) or 
>>>>>>>>>>>>>> not:
>>>>>>>>>>>>>> “{} ({}, synchronous part) in thread {} took {} ms” (template)
>>>>>>>>>>>>>> Also check if the savepoint has been overall completed:
>>>>>>>>>>>>>> "{} ({}, asynchronous part) in thread {} took {} ms."
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Using StreamingFileSink is not a convenient option for
>>>>>>>>>>>>>> production use for us as it doesn't support s3*. I could use
>>>>>>>>>>>>>> StreamingFileSink just to verify, but I don't see much point in 
>>>>>>>>>>>>>> doing so.
>>>>>>>>>>>>>> Please consider my previous comment:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> > I realized that BucketingSink must not play any role in
>>>>>>>>>>>>>> this problem. This is because only when the 24-hour window 
>>>>>>>>>>>>>> triggers,
>>>>>>>>>>>>>> BucketingSink gets a burst of input. Around the state restoring 
>>>>>>>>>>>>>> point
>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose 
>>>>>>>>>>>>>> anything
>>>>>>>>>>>>>> either (right?).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I could also use a kafka sink instead, but I can't imagine
>>>>>>>>>>>>>> how there could be any difference. It's very real that the sink 
>>>>>>>>>>>>>> doesn't get
>>>>>>>>>>>>>> any input for a long time until the 24-hour window closes, and 
>>>>>>>>>>>>>> then it
>>>>>>>>>>>>>> quickly writes out everything because it's not that much data 
>>>>>>>>>>>>>> eventually
>>>>>>>>>>>>>> for the distinct values.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Any ideas for debugging what's happening around the savepoint
>>>>>>>>>>>>>> & restoration time?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *) I actually implemented StreamingFileSink as an alternative
>>>>>>>>>>>>>> sink. This was before I came to realize that most likely the 
>>>>>>>>>>>>>> sink component
>>>>>>>>>>>>>> has nothing to do with the data loss problem. I tried it with 
>>>>>>>>>>>>>> s3n:// path
>>>>>>>>>>>>>> just to see an exception being thrown. In the source code I 
>>>>>>>>>>>>>> indeed then
>>>>>>>>>>>>>> found an explicit check for the target path scheme to be 
>>>>>>>>>>>>>> "hdfs://".
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <
>>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Ok, I think before further debugging the window reduced
>>>>>>>>>>>>>>> state,
>>>>>>>>>>>>>>> could you try the new ‘StreamingFileSink’ [1] introduced in
>>>>>>>>>>>>>>> Flink 1.6.0 instead of the previous 'BucketingSink’?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Yes, sorry for my confusing comment. I just meant that it
>>>>>>>>>>>>>>> seems like there's a bug somewhere now that the output is 
>>>>>>>>>>>>>>> missing some data.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > I would wait and check the actual output in s3 because it
>>>>>>>>>>>>>>> is the main result of the job
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Yes, and that's what I have already done. There seems to be
>>>>>>>>>>>>>>> always some data loss with the production data volumes, if the 
>>>>>>>>>>>>>>> job has been
>>>>>>>>>>>>>>> restarted on that day.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Would you have any suggestions for how to debug this further?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Many thanks for stepping in.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <
>>>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So it is a per key deduplication job.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Yes, I would wait and check the actual output in s3 because
>>>>>>>>>>>>>>>> it is the main result of the job and
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> > The late data around the time of taking savepoint might
>>>>>>>>>>>>>>>> be not included into the savepoint but it should be behind the 
>>>>>>>>>>>>>>>> snapshotted
>>>>>>>>>>>>>>>> offset in Kafka.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> is not a bug, it is a possible behaviour.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The savepoint is a snapshot of the data in transient which
>>>>>>>>>>>>>>>> is already consumed from Kafka.
>>>>>>>>>>>>>>>> Basically the full contents of the window result is split
>>>>>>>>>>>>>>>> between the savepoint and what can come after the savepoint'ed 
>>>>>>>>>>>>>>>> offset in
>>>>>>>>>>>>>>>> Kafka but before the window result is written into s3.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Allowed lateness should not affect it, I am just saying
>>>>>>>>>>>>>>>> that the final result in s3 should include all records after 
>>>>>>>>>>>>>>>> it.
>>>>>>>>>>>>>>>> This is what should be guaranteed but not the contents of
>>>>>>>>>>>>>>>> the intermediate savepoint.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for your answer!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I check for the missed data from the final output on s3. So
>>>>>>>>>>>>>>>> I wait until the next day, then run the same thing 
>>>>>>>>>>>>>>>> re-implemented in batch,
>>>>>>>>>>>>>>>> and compare the output.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> > The late data around the time of taking savepoint might
>>>>>>>>>>>>>>>> be not included into the savepoint but it should be behind the 
>>>>>>>>>>>>>>>> snapshotted
>>>>>>>>>>>>>>>> offset in Kafka.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Yes, I would definitely expect that. It seems like there's
>>>>>>>>>>>>>>>> a bug somewhere.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> > Then it should just come later after the restore and
>>>>>>>>>>>>>>>> should be reduced within the allowed lateness into the final 
>>>>>>>>>>>>>>>> result which
>>>>>>>>>>>>>>>> is saved into s3.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Well, as far as I know, allowed lateness doesn't play any
>>>>>>>>>>>>>>>> role here, because I started running the job with 
>>>>>>>>>>>>>>>> allowedLateness=0, and
>>>>>>>>>>>>>>>> still get the data loss, while my late data output doesn't 
>>>>>>>>>>>>>>>> receive anything.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> > Also, is this `DistinctFunction.reduce` just an example
>>>>>>>>>>>>>>>> or the actual implementation, basically saving just one of 
>>>>>>>>>>>>>>>> records inside
>>>>>>>>>>>>>>>> the 24h window in s3? then what is missing there?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Yes, it's the actual implementation. Note that there's a
>>>>>>>>>>>>>>>> keyBy before the DistinctFunction. So there's one record for 
>>>>>>>>>>>>>>>> each key
>>>>>>>>>>>>>>>> (which is the combination of a couple of fields). In practice 
>>>>>>>>>>>>>>>> I've seen
>>>>>>>>>>>>>>>> that we're missing ~2000-4000 elements on each restore, and 
>>>>>>>>>>>>>>>> the total
>>>>>>>>>>>>>>>> output is obviously much more than that.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Here's the full code for the key selector:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> public class MapKeySelector implements
>>>>>>>>>>>>>>>> KeySelector<Map<String,String>, Object> {
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     private final String[] fields;
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     public MapKeySelector(String... fields) {
>>>>>>>>>>>>>>>>         this.fields = fields;
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>     public Object getKey(Map<String, String> event) throws
>>>>>>>>>>>>>>>> Exception {
>>>>>>>>>>>>>>>>         Tuple key =
>>>>>>>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance();
>>>>>>>>>>>>>>>>         for (int i = 0; i < fields.length; i++) {
>>>>>>>>>>>>>>>>             key.setField(event.getOrDefault(fields[i], ""),
>>>>>>>>>>>>>>>> i);
>>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>>         return key;
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> And a more exact example on how it's used:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>                 .keyBy(new MapKeySelector("ID",
>>>>>>>>>>>>>>>> "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
>>>>>>>>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Where exactly does the data miss? When do you notice that?
>>>>>>>>>>>>>>>>> Do you check it:
>>>>>>>>>>>>>>>>> - debugging `DistinctFunction.reduce` right after resume
>>>>>>>>>>>>>>>>> in the middle of the day
>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>> - some distinct records miss in the final output
>>>>>>>>>>>>>>>>> of BucketingSink in s3 after window result is actually 
>>>>>>>>>>>>>>>>> triggered and saved
>>>>>>>>>>>>>>>>> into s3 at the end of the day? is this the main output?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The late data around the time of taking savepoint might be
>>>>>>>>>>>>>>>>> not included into the savepoint but it should be behind the 
>>>>>>>>>>>>>>>>> snapshotted
>>>>>>>>>>>>>>>>> offset in Kafka. Then it should just come later after the 
>>>>>>>>>>>>>>>>> restore and
>>>>>>>>>>>>>>>>> should be reduced within the allowed lateness into the final 
>>>>>>>>>>>>>>>>> result which
>>>>>>>>>>>>>>>>> is saved into s3.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Also, is this `DistinctFunction.reduce` just an example or
>>>>>>>>>>>>>>>>> the actual implementation, basically saving just one of 
>>>>>>>>>>>>>>>>> records inside the
>>>>>>>>>>>>>>>>> 24h window in s3? then what is missing there?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I changed to allowedLateness=0, no change, still missing
>>>>>>>>>>>>>>>>> data when restoring from savepoint.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <
>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I realized that BucketingSink must not play any role in
>>>>>>>>>>>>>>>>>> this problem. This is because only when the 24-hour window 
>>>>>>>>>>>>>>>>>> triggers,
>>>>>>>>>>>>>>>>>> BucketinSink gets a burst of input. Around the state 
>>>>>>>>>>>>>>>>>> restoring point
>>>>>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't 
>>>>>>>>>>>>>>>>>> lose anything
>>>>>>>>>>>>>>>>>> either (right?).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I will next try removing the allowedLateness entirely
>>>>>>>>>>>>>>>>>> from the equation.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> In the meanwhile, please let me know if you have any
>>>>>>>>>>>>>>>>>> suggestions for debugging the lost data, for example what 
>>>>>>>>>>>>>>>>>> logs to enable.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known
>>>>>>>>>>>>>>>>>> issues with that, that could contribute to lost data when 
>>>>>>>>>>>>>>>>>> restoring a
>>>>>>>>>>>>>>>>>> savepoint?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <
>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Some data is silently lost on my Flink stream job when
>>>>>>>>>>>>>>>>>>> state is restored from a savepoint.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Do you have any debugging hints to find out where
>>>>>>>>>>>>>>>>>>> exactly the data gets dropped?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> My job gathers distinct values using a 24-hour window.
>>>>>>>>>>>>>>>>>>> It doesn't have any custom state management.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> When I cancel the job with savepoint and restore from
>>>>>>>>>>>>>>>>>>> that savepoint, some data is missed. It seems to be losing 
>>>>>>>>>>>>>>>>>>> just a small
>>>>>>>>>>>>>>>>>>> amount of data. The event time of lost data is probably 
>>>>>>>>>>>>>>>>>>> around the time of
>>>>>>>>>>>>>>>>>>> savepoint. In other words the rest of the time window is 
>>>>>>>>>>>>>>>>>>> not entirely
>>>>>>>>>>>>>>>>>>> missed – collection works correctly also for (most of the) 
>>>>>>>>>>>>>>>>>>> events that come
>>>>>>>>>>>>>>>>>>> in after restoring.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> When the job processes a full 24-hour window without
>>>>>>>>>>>>>>>>>>> interruptions it doesn't miss anything.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Usually the problem doesn't happen in test environments
>>>>>>>>>>>>>>>>>>> that have smaller parallelism and smaller data volumes. But 
>>>>>>>>>>>>>>>>>>> in production
>>>>>>>>>>>>>>>>>>> volumes the job seems to be consistently missing at least 
>>>>>>>>>>>>>>>>>>> something on
>>>>>>>>>>>>>>>>>>> every restore.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> This issue has consistently happened since the job was
>>>>>>>>>>>>>>>>>>> initially created. It was at first run on an older version 
>>>>>>>>>>>>>>>>>>> of Flink
>>>>>>>>>>>>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 
>>>>>>>>>>>>>>>>>>> 1.6.0.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I'm wondering if this could be for example some
>>>>>>>>>>>>>>>>>>> synchronization issue between the kafka consumer offsets 
>>>>>>>>>>>>>>>>>>> vs. what's been
>>>>>>>>>>>>>>>>>>> written by BucketingSink?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 1. Job content, simplified
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>         kafkaStream
>>>>>>>>>>>>>>>>>>>                 .flatMap(new ExtractFieldsFunction())
>>>>>>>>>>>>>>>>>>>                 .keyBy(new MapKeySelector(1, 2, 3, 4))
>>>>>>>>>>>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>>>>>>>>>>>                 .allowedLateness(allowedLateness)
>>>>>>>>>>>>>>>>>>>                 .sideOutputLateData(lateDataTag)
>>>>>>>>>>>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>>>>>>>>>>>                 .addSink(sink)
>>>>>>>>>>>>>>>>>>>                 // use a fixed number of output
>>>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>>>                 .setParallelism(8))
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>>>>>>>  * Usage: .keyBy("the", "distinct", "fields").reduce(new
>>>>>>>>>>>>>>>>>>> DistinctFunction())
>>>>>>>>>>>>>>>>>>>  */
>>>>>>>>>>>>>>>>>>> public class DistinctFunction implements
>>>>>>>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> {
>>>>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>>>>     public Map<String, String> reduce(Map<String,
>>>>>>>>>>>>>>>>>>> String> value1, Map<String, String> value2) {
>>>>>>>>>>>>>>>>>>>         return value1;
>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 2. State configuration
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> boolean enableIncrementalCheckpointing = true;
>>>>>>>>>>>>>>>>>>> String statePath = "s3n://bucket/savepoints";
>>>>>>>>>>>>>>>>>>> new RocksDBStateBackend(statePath,
>>>>>>>>>>>>>>>>>>> enableIncrementalCheckpointing);
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Checkpointing Mode Exactly Once
>>>>>>>>>>>>>>>>>>> Interval 1m 0s
>>>>>>>>>>>>>>>>>>> Timeout 10m 0s
>>>>>>>>>>>>>>>>>>> Minimum Pause Between Checkpoints 1m 0s
>>>>>>>>>>>>>>>>>>> Maximum Concurrent Checkpoints 1
>>>>>>>>>>>>>>>>>>> Persist Checkpoints Externally Enabled (retain on
>>>>>>>>>>>>>>>>>>> cancellation)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 3. BucketingSink configuration
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> We use BucketingSink, I don't think there's anything
>>>>>>>>>>>>>>>>>>> special here, if not the fact that we're writing to S3.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>         String outputPath = "s3://bucket/output";
>>>>>>>>>>>>>>>>>>>         BucketingSink<Map<String, String>> sink = new
>>>>>>>>>>>>>>>>>>> BucketingSink<Map<String, String>>(outputPath)
>>>>>>>>>>>>>>>>>>>                 .setBucketer(new ProcessdateBucketer())
>>>>>>>>>>>>>>>>>>>                 .setBatchSize(batchSize)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>>>>>>>>>>>>>>>>>         sink.setWriter(new IdJsonWriter());
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 4. Kafka & event time
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> My flink job reads the data from Kafka, using a
>>>>>>>>>>>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka 
>>>>>>>>>>>>>>>>>>> consumer to
>>>>>>>>>>>>>>>>>>> synchronize watermarks accross all kafka partitions. We 
>>>>>>>>>>>>>>>>>>> also write late
>>>>>>>>>>>>>>>>>>> data to side output, but nothing is written there – if it 
>>>>>>>>>>>>>>>>>>> would, it could
>>>>>>>>>>>>>>>>>>> explain missed data in the main output (I'm also sure that 
>>>>>>>>>>>>>>>>>>> our late data
>>>>>>>>>>>>>>>>>>> writing works, because we previously had some actual late 
>>>>>>>>>>>>>>>>>>> data which ended
>>>>>>>>>>>>>>>>>>> up there).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 5. allowedLateness
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> It may be or may not be relevant that I have also
>>>>>>>>>>>>>>>>>>> enabled allowedLateness with 1 minute lateness on the 
>>>>>>>>>>>>>>>>>>> 24-hour window:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> If that makes sense, I could try removing
>>>>>>>>>>>>>>>>>>> allowedLateness entirely? That would be just to rule out 
>>>>>>>>>>>>>>>>>>> that Flink doesn't
>>>>>>>>>>>>>>>>>>> have a bug that's related to restoring state in combination 
>>>>>>>>>>>>>>>>>>> with the
>>>>>>>>>>>>>>>>>>> allowedLateness feature. After all, all of our data should 
>>>>>>>>>>>>>>>>>>> be in a good
>>>>>>>>>>>>>>>>>>> enough order to not be late, given the max out of orderness 
>>>>>>>>>>>>>>>>>>> used on kafka
>>>>>>>>>>>>>>>>>>> consumer timestamp extractor.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thank you in advance!
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to