Hi Stefan,

Bravo doesn't currently support reading a reducer state. I gave it a try
but couldn't get to a working implementation yet. If anyone can provide
some insight on how to make this work, please share at github:
https://github.com/king/bravo/pull/11

Thanks.

On Tue, Oct 23, 2018 at 3:32 PM Juho Autio <juho.au...@rovio.com> wrote:

> I was glad to find that bravo had now been updated to support installing
> bravo to a local maven repo.
>
> I was able to load a checkpoint created by my job, thanks to the example
> provided in bravo README, but I'm still missing the essential piece.
>
> My code was:
>
>         OperatorStateReader reader = new OperatorStateReader(env2,
> savepoint, "DistinctFunction");
>         DontKnowWhatTypeThisIs reducingState = reader.readKeyedStates(what
> should I put here?);
>
> I don't know how to read the values collected from reduce() calls in the
> state. Is there a way to access the reducing state of the window with
> bravo? I'm a bit confused how this works, because when I check with
> debugger, flink internally uses a ReducingStateDescriptor
> with name=window-contents, but still reading operator state for
> "DistinctFunction" didn't at least throw an exception ("window-contents"
> threw – obviously there's no operator by that name).
>
> Cheers,
> Juho
>
> On Mon, Oct 15, 2018 at 2:25 PM Juho Autio <juho.au...@rovio.com> wrote:
>
>> Hi Stefan,
>>
>> Sorry but it doesn't seem immediately clear to me what's a good way to
>> use https://github.com/king/bravo.
>>
>> How are people using it? Would you for example modify build.gradle
>> somehow to publish the bravo as a library locally/internally? Or add code
>> directly in the bravo project (locally) and run it from there (using an
>> IDE, for example)? Also it doesn't seem like the bravo gradle project
>> supports building a flink job jar, but if it does, how do I do it?
>>
>> Thanks.
>>
>> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio <juho.au...@rovio.com> wrote:
>>
>>> Good then, I'll try to analyze the savepoints with Bravo. Thanks!
>>>
>>> > How would you assume that backpressure would influence your updates?
>>> Updates to each local state still happen event-by-event, in a single
>>> reader/writing thread.
>>>
>>> Sure, just an ignorant guess by me. I'm not familiar with most of
>>> Flink's internals. Any way high backpressure is not a seen on this job
>>> after it has caught up the lag, so at I thought it would be worth
>>> mentioning.
>>>
>>> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter <
>>> s.rich...@data-artisans.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.au...@rovio.com>:
>>>>
>>>> > you could take a look at Bravo [1] to query your savepoints and to
>>>> check if the state in the savepoint complete w.r.t your expectations
>>>>
>>>> Thanks. I'm not 100% if this is the case, but to me it seemed like the
>>>> missed ids were being logged by the reducer soon after the job had started
>>>> (after restoring a savepoint). But on the other hand, after that I also
>>>> made another savepoint & restored that, so what I could check is: does that
>>>> next savepoint have the missed ids that were logged (a couple of minutes
>>>> before the savepoint was created, so there should've been more than enough
>>>> time to add them to the state before the savepoint was triggered) or not.
>>>> Any way, if I would be able to verify with Bravo that the ids are missing
>>>> from the savepoint (even though reduced logged that it saw them), would
>>>> that help in figuring out where they are lost? Is there some major
>>>> difference compared to just looking at the final output after window has
>>>> been triggered?
>>>>
>>>>
>>>>
>>>> I think that makes a difference. For example, you can investigate if
>>>> there is a state loss or a problem with the windowing. In the savepoint you
>>>> could see which keys exists and to which windows they are assigned. Also
>>>> just to make sure there is no misunderstanding: only elements that are in
>>>> the state at the start of a savepoint are expected to be part of the
>>>> savepoint; all elements between start and completion of the savepoint are
>>>> not expected to be part of the savepoint.
>>>>
>>>>
>>>> > I also doubt that the problem is about backpressure after restore,
>>>> because the job will only continue running after the state restore is
>>>> already completed.
>>>>
>>>> Yes, I'm not suspecting that the state restoring would be the problem
>>>> either. My concern was about backpressure possibly messing with the updates
>>>> of reducing state? I would tend to suspect that updating the state
>>>> consistently is what fails, where heavy load / backpressure might be a
>>>> factor.
>>>>
>>>>
>>>>
>>>> How would you assume that backpressure would influence your updates?
>>>> Updates to each local state still happen event-by-event, in a single
>>>> reader/writing thread.
>>>>
>>>>
>>>> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter <
>>>> s.rich...@data-artisans.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> you could take a look at Bravo [1] to query your savepoints and to
>>>>> check if the state in the savepoint complete w.r.t your expectations. I
>>>>> somewhat doubt that there is a general problem with the state/savepoints
>>>>> because many users are successfully running it on a large state and I am
>>>>> not aware of any data loss problems, but nothing is impossible. What the
>>>>> savepoint does is also straight forward: iterate a db snapshot and write
>>>>> all key/value pairs to disk, so all data that was in the db at the time of
>>>>> the savepoint, should show up. I also doubt that the problem is about
>>>>> backpressure after restore, because the job will only continue running
>>>>> after the state restore is already completed. Did you check if you are
>>>>> using exactly-one-semantics or at-least-once semantics? Also did you check
>>>>> that the kafka consumer start position is configured properly [2]? Are
>>>>> watermarks generated as expected after restore?
>>>>>
>>>>> One more unrelated high-level comment that I have: for a granularity
>>>>> of 24h windows, I wonder if it would not make sense to use a batch job
>>>>> instead?
>>>>>
>>>>> Best,
>>>>> Stefan
>>>>>
>>>>> [1] https://github.com/king/bravo
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>>>>
>>>>> Am 04.10.2018 um 14:53 schrieb Juho Autio <juho.au...@rovio.com>:
>>>>>
>>>>> Thanks for the suggestions!
>>>>>
>>>>> > In general, it would be tremendously helpful to have a minimal
>>>>> working example which allows to reproduce the problem.
>>>>>
>>>>> Definitely. The problem with reproducing has been that this only seems
>>>>> to happen in the bigger production data volumes.
>>>>>
>>>>> That's why I'm hoping to find a way to debug this with the production
>>>>> data. With that it seems to consistently cause some misses every time the
>>>>> job is killed/restored.
>>>>>
>>>>> > check if it happens for shorter windows, like 1h etc
>>>>>
>>>>> What would be the benefit of that compared to 24h window?
>>>>>
>>>>> > simplify the job to not use a reduce window but simply a time window
>>>>> which outputs the window events. Then counting the input and output events
>>>>> should allow you to verify the results. If you are not seeing missing
>>>>> events, then it could have something to do with the reducing state used in
>>>>> the reduce function.
>>>>>
>>>>> Hm, maybe, but not sure how useful that would be, because it wouldn't
>>>>> yet prove that it's related to reducing, because not having a reduce
>>>>> function could also mean smaller load on the job, which might alone be
>>>>> enough to make the problem not manifest.
>>>>>
>>>>> Is there a way to debug what goes into the reducing state (including
>>>>> what gets removed or overwritten and what restored), if that makes 
>>>>> sense..?
>>>>> Maybe some suitable logging could be used to prove that the lost data is
>>>>> written to the reducing state (or at least asked to be written), but not
>>>>> found any more when the window closes and state is flushed?
>>>>>
>>>>> On configuration once more, we're using RocksDB state backend with
>>>>> asynchronous incremental checkpointing. The state is restored from
>>>>> savepoints though, we haven't been using those checkpoints in these tests
>>>>> (although they could be used in case of crashes – but we haven't had those
>>>>> now).
>>>>>
>>>>> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann <trohrm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Juho,
>>>>>>
>>>>>> another idea to further narrow down the problem could be to simplify
>>>>>> the job to not use a reduce window but simply a time window which outputs
>>>>>> the window events. Then counting the input and output events should allow
>>>>>> you to verify the results. If you are not seeing missing events, then it
>>>>>> could have something to do with the reducing state used in the reduce
>>>>>> function.
>>>>>>
>>>>>> In general, it would be tremendously helpful to have a minimal
>>>>>> working example which allows to reproduce the problem.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin <
>>>>>> and...@data-artisans.com> wrote:
>>>>>>
>>>>>>> Hi Juho,
>>>>>>>
>>>>>>> can you try to reduce the job to minimal reproducible example and
>>>>>>> share the job and input?
>>>>>>>
>>>>>>> For example:
>>>>>>> - some simple records as input, e.g. tuples of primitive types saved
>>>>>>> as cvs
>>>>>>> - minimal deduplication job which processes them and misses records
>>>>>>> - check if it happens for shorter windows, like 1h etc
>>>>>>> - setup which you use for the job, ideally locally reproducible or
>>>>>>> cloud
>>>>>>>
>>>>>>> Best,
>>>>>>> Andrey
>>>>>>>
>>>>>>> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com> wrote:
>>>>>>>
>>>>>>> Sorry to insist, but we seem to be blocked for any serious usage of
>>>>>>> state in Flink if we can't rely on it to not miss data in case of 
>>>>>>> restore.
>>>>>>>
>>>>>>> Would anyone have suggestions for how to troubleshoot this? So far I
>>>>>>> have verified with DEBUG logs that our reduce function gets to process 
>>>>>>> also
>>>>>>> the data that is missing from window output.
>>>>>>>
>>>>>>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <juho.au...@rovio.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Andrey,
>>>>>>>>
>>>>>>>> To rule out for good any questions about sink behaviour, the job
>>>>>>>> was killed and started with an additional Kafka sink.
>>>>>>>>
>>>>>>>> The same number of ids were missed in both outputs: KafkaSink &
>>>>>>>> BucketingSink.
>>>>>>>>
>>>>>>>> I wonder what would be the next steps in debugging?
>>>>>>>>
>>>>>>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <juho.au...@rovio.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks, Andrey.
>>>>>>>>>
>>>>>>>>> > so it means that the savepoint does not loose at least some
>>>>>>>>> dropped records.
>>>>>>>>>
>>>>>>>>> I'm not sure what you mean by that? I mean, it was known from the
>>>>>>>>> beginning, that not everything is lost before/after restoring a 
>>>>>>>>> savepoint,
>>>>>>>>> just some records around the time of restoration. It's not 100% clear
>>>>>>>>> whether records are lost before making a savepoint or after restoring 
>>>>>>>>> it.
>>>>>>>>> Although, based on the new DEBUG logs it seems more like losing some
>>>>>>>>> records that are seen ~soon after restoring. It seems like Flink 
>>>>>>>>> would be
>>>>>>>>> somehow confused either about the restored state vs. new inserts to 
>>>>>>>>> state.
>>>>>>>>> This could also be somehow linked to the high back pressure on the 
>>>>>>>>> kafka
>>>>>>>>> source while the stream is catching up.
>>>>>>>>>
>>>>>>>>> > If it is feasible for your setup, I suggest to insert one more
>>>>>>>>> map function after reduce and before sink.
>>>>>>>>> > etc.
>>>>>>>>>
>>>>>>>>> Isn't that the same thing that we discussed before? Nothing is
>>>>>>>>> sent to BucketingSink before the window closes, so I don't see how it 
>>>>>>>>> would
>>>>>>>>> make any difference if we replace the BucketingSink with a map 
>>>>>>>>> function or
>>>>>>>>> another sink type. We don't create or restore savepoints during the 
>>>>>>>>> time
>>>>>>>>> when BucketingSink gets input or has open buckets – that happens at a 
>>>>>>>>> much
>>>>>>>>> later time of day. I would focus on figuring out why the records are 
>>>>>>>>> lost
>>>>>>>>> while the window is open. But I don't know how to do that. Would you 
>>>>>>>>> have
>>>>>>>>> any additional suggestions?
>>>>>>>>>
>>>>>>>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <
>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Juho,
>>>>>>>>>>
>>>>>>>>>> so it means that the savepoint does not loose at least some
>>>>>>>>>> dropped records.
>>>>>>>>>>
>>>>>>>>>> If it is feasible for your setup, I suggest to insert one more
>>>>>>>>>> map function after reduce and before sink.
>>>>>>>>>> The map function should be called right after window is triggered
>>>>>>>>>> but before flushing to s3.
>>>>>>>>>> The result of reduce (deduped record) could be logged there.
>>>>>>>>>> This should allow to check whether the processed distinct records
>>>>>>>>>> were buffered in the state after the restoration from the savepoint 
>>>>>>>>>> or not.
>>>>>>>>>> If they were buffered we should see that there was an attempt to 
>>>>>>>>>> write them
>>>>>>>>>> to the sink from the state.
>>>>>>>>>>
>>>>>>>>>> Another suggestion is to try to write records to some other sink
>>>>>>>>>> or to both.
>>>>>>>>>> E.g. if you can access file system of workers, maybe just into
>>>>>>>>>> local files and check whether the records are also dropped there.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Andrey
>>>>>>>>>>
>>>>>>>>>> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Andrey!
>>>>>>>>>>
>>>>>>>>>> I was finally able to gather the DEBUG logs that you suggested.
>>>>>>>>>> In short, the reducer logged that it processed at least some of the 
>>>>>>>>>> ids
>>>>>>>>>> that were missing from the output.
>>>>>>>>>>
>>>>>>>>>> "At least some", because I didn't have the job running with DEBUG
>>>>>>>>>> logs for the full 24-hour window period. So I was only able to look 
>>>>>>>>>> up if I
>>>>>>>>>> can find *some* of the missing ids in the DEBUG logs. Which I
>>>>>>>>>> did indeed.
>>>>>>>>>>
>>>>>>>>>> I changed the DistinctFunction.java to do this:
>>>>>>>>>>
>>>>>>>>>>     @Override
>>>>>>>>>>     public Map<String, String> reduce(Map<String, String> value1,
>>>>>>>>>> Map<String, String> value2) {
>>>>>>>>>>         LOG.debug("DistinctFunction.reduce returns: {}={}",
>>>>>>>>>> value1.get("field"), value1.get("id"));
>>>>>>>>>>         return value1;
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>> Then:
>>>>>>>>>>
>>>>>>>>>> vi flink-1.6.0/conf/log4j.properties
>>>>>>>>>>
>>>>>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>>>>>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>>>>>>>>>>
>>>>>>>>>> Then I ran the following kind of test:
>>>>>>>>>>
>>>>>>>>>> - Cancelled the on-going job with savepoint created at ~Sep 18
>>>>>>>>>> 08:35 UTC 2018
>>>>>>>>>> - Started a new cluster & job with DEBUG enabled at ~09:13,
>>>>>>>>>> restored from that previous cluster's savepoint
>>>>>>>>>> - Ran until caught up offsets
>>>>>>>>>> - Cancelled the job with a new savepoint
>>>>>>>>>> - Started a new job _without_ DEBUG, which restored the new
>>>>>>>>>> savepoint, let it keep running so that it will eventually write the 
>>>>>>>>>> output
>>>>>>>>>>
>>>>>>>>>> Then on the next day, after results had been flushed when the
>>>>>>>>>> 24-hour window closed, I compared the results again with a batch 
>>>>>>>>>> version's
>>>>>>>>>> output. And found some missing ids as usual.
>>>>>>>>>>
>>>>>>>>>> I drilled down to one specific missing id (I'm replacing the
>>>>>>>>>> actual value with AN12345 below), which was not found in the stream 
>>>>>>>>>> output,
>>>>>>>>>> but was found in batch output & flink DEBUG logs.
>>>>>>>>>>
>>>>>>>>>> Related to that id, I gathered the following information:
>>>>>>>>>>
>>>>>>>>>> 2018-09-18~09:13:21,000 job started & savepoint is restored
>>>>>>>>>>
>>>>>>>>>> 2018-09-18 09:14:29,085 missing id is processed for the first
>>>>>>>>>> time, proved by this log line:
>>>>>>>>>> 2018-09-18 09:14:29,085 DEBUG
>>>>>>>>>> com.rovio.ds.flink.uniqueid.DistinctFunction                  -
>>>>>>>>>> DistinctFunction.reduce returns: s.aid1=AN12345
>>>>>>>>>>
>>>>>>>>>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint
>>>>>>>>>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint
>>>>>>>>>>
>>>>>>>>>> (
>>>>>>>>>> more occurrences of checkpoints (~1 min checkpointing time + ~1
>>>>>>>>>> min delay before next)
>>>>>>>>>> /
>>>>>>>>>> more occurrences of DistinctFunction.reduce
>>>>>>>>>> )
>>>>>>>>>>
>>>>>>>>>> 2018-09-18 09:23:45,053 missing id is processed for the last time
>>>>>>>>>>
>>>>>>>>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled
>>>>>>>>>>
>>>>>>>>>> To be noted, there was high backpressure after restoring from
>>>>>>>>>> savepoint until the stream caught up with the kafka offsets. 
>>>>>>>>>> Although, our
>>>>>>>>>> job uses assign timestamps & watermarks on the flink kafka consumer 
>>>>>>>>>> itself,
>>>>>>>>>> so event time of all partitions is synchronized. As expected, we 
>>>>>>>>>> don't get
>>>>>>>>>> any late data in the late data side output.
>>>>>>>>>>
>>>>>>>>>> From this we can see that the missing ids are processed by the
>>>>>>>>>> reducer, but they must get lost somewhere before the 24-hour window 
>>>>>>>>>> is
>>>>>>>>>> triggered.
>>>>>>>>>>
>>>>>>>>>> I think it's worth mentioning once more that the stream doesn't
>>>>>>>>>> miss any ids if we let it's running without interruptions / state 
>>>>>>>>>> restoring.
>>>>>>>>>>
>>>>>>>>>> What's next?
>>>>>>>>>>
>>>>>>>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <
>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>
>>>>>>>>>>> > only when the 24-hour window triggers, BucketingSink gets a
>>>>>>>>>>> burst of input
>>>>>>>>>>>
>>>>>>>>>>> This is of course totally true, my understanding is the same. We
>>>>>>>>>>> cannot exclude problem there for sure, just savepoints are used a 
>>>>>>>>>>> lot w/o
>>>>>>>>>>> problem reports and BucketingSink is known to be problematic with 
>>>>>>>>>>> s3. That
>>>>>>>>>>> is why, I asked you:
>>>>>>>>>>>
>>>>>>>>>>> > You also wrote that the timestamps of lost event are
>>>>>>>>>>> 'probably' around the time of the savepoint, if it is not yet for 
>>>>>>>>>>> sure I
>>>>>>>>>>> would also check it.
>>>>>>>>>>>
>>>>>>>>>>> Although, bucketing sink might loose any data at the end of the
>>>>>>>>>>> day (also from the middle). The fact, that it is always around the 
>>>>>>>>>>> time of
>>>>>>>>>>> taking a savepoint and not random, is surely suspicious and possible
>>>>>>>>>>> savepoint failures need to be investigated.
>>>>>>>>>>>
>>>>>>>>>>> Regarding the s3 problem, s3 doc says:
>>>>>>>>>>>
>>>>>>>>>>> > The caveat is that if you make a HEAD or GET request to the
>>>>>>>>>>> key name (to find if the object exists) before creating the object, 
>>>>>>>>>>> Amazon
>>>>>>>>>>> S3 provides 'eventual consistency' for read-after-write.
>>>>>>>>>>>
>>>>>>>>>>> The algorithm you suggest is how it is roughly implemented now
>>>>>>>>>>> (BucketingSink.openNewPartFile). My understanding is that
>>>>>>>>>>> 'eventual consistency’ means that even if you just created file 
>>>>>>>>>>> (its name
>>>>>>>>>>> is key) it can be that you do not get it in the list or exists 
>>>>>>>>>>> (HEAD)
>>>>>>>>>>> returns false and you risk to rewrite the previous part.
>>>>>>>>>>>
>>>>>>>>>>> The BucketingSink was designed for a standard file system. s3 is
>>>>>>>>>>> used over a file system wrapper atm but does not always provide 
>>>>>>>>>>> normal file
>>>>>>>>>>> system guarantees. See also last example in [1].
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Andrey
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82
>>>>>>>>>>>
>>>>>>>>>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Andrey, thank you very much for the debugging suggestions, I'll
>>>>>>>>>>> try them.
>>>>>>>>>>>
>>>>>>>>>>> In the meanwhile two more questions, please:
>>>>>>>>>>>
>>>>>>>>>>> > Just to keep in mind this problem with s3 and exclude it for
>>>>>>>>>>> sure. I would also check whether the size of missing events is 
>>>>>>>>>>> around the
>>>>>>>>>>> batch size of BucketingSink or not.
>>>>>>>>>>>
>>>>>>>>>>> Fair enough, but I also want to focus on debugging the most
>>>>>>>>>>> probable subject first. So what do you think about this – true or 
>>>>>>>>>>> false:
>>>>>>>>>>> only when the 24-hour window triggers, BucketinSink gets a burst of 
>>>>>>>>>>> input.
>>>>>>>>>>> Around the state restoring point (middle of the day) it doesn't get 
>>>>>>>>>>> any
>>>>>>>>>>> input, so it can't lose anything either. Isn't this true, or have I 
>>>>>>>>>>> totally
>>>>>>>>>>> missed how Flink works in triggering window results? I would not 
>>>>>>>>>>> expect
>>>>>>>>>>> there to be any optimization that speculatively triggers early 
>>>>>>>>>>> results of a
>>>>>>>>>>> regular time window to the downstream operators.
>>>>>>>>>>>
>>>>>>>>>>> > The old BucketingSink has in general problem with s3.
>>>>>>>>>>> Internally BucketingSink queries s3 as a file system to list already
>>>>>>>>>>> written file parts (batches) and determine index of the next part 
>>>>>>>>>>> to start.
>>>>>>>>>>> Due to eventual consistency of checking file existence in s3 [1], 
>>>>>>>>>>> the
>>>>>>>>>>> BucketingSink can rewrite the previously written part and basically 
>>>>>>>>>>> loose
>>>>>>>>>>> it.
>>>>>>>>>>>
>>>>>>>>>>> I was wondering, what does S3's "read-after-write consistency"
>>>>>>>>>>> (mentioned on the page you linked) actually mean. It seems that 
>>>>>>>>>>> this might
>>>>>>>>>>> be possible:
>>>>>>>>>>> - LIST keys, find current max index
>>>>>>>>>>> - choose next index = max + 1
>>>>>>>>>>> - HEAD next index: if it exists, keep adding + 1 until key
>>>>>>>>>>> doesn't exist on S3
>>>>>>>>>>>
>>>>>>>>>>> But definitely sounds easier if a sink keeps track of files in a
>>>>>>>>>>> way that's guaranteed to be consistent.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Juho
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <
>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> true, StreamingFileSink does not support s3 in 1.6.0, it is
>>>>>>>>>>>> planned for the next 1.7 release, sorry for confusion.
>>>>>>>>>>>> The old BucketingSink has in general problem with s3.
>>>>>>>>>>>> Internally BucketingSink queries s3 as a file system
>>>>>>>>>>>> to list already written file parts (batches) and determine
>>>>>>>>>>>> index of the next part to start. Due to eventual consistency of 
>>>>>>>>>>>> checking
>>>>>>>>>>>> file existence in s3 [1], the BucketingSink can rewrite the 
>>>>>>>>>>>> previously
>>>>>>>>>>>> written part and basically loose it. It should be fixed for
>>>>>>>>>>>> StreamingFileSink in 1.7 where Flink keeps its own track of 
>>>>>>>>>>>> written parts
>>>>>>>>>>>> and does not rely on s3 as a file system.
>>>>>>>>>>>> I also include Kostas, he might add more details.
>>>>>>>>>>>>
>>>>>>>>>>>> Just to keep in mind this problem with s3 and exclude it for
>>>>>>>>>>>> sure  I would also check whether the size of missing events is 
>>>>>>>>>>>> around the
>>>>>>>>>>>> batch size of BucketingSink or not. You also wrote that the 
>>>>>>>>>>>> timestamps of
>>>>>>>>>>>> lost event are 'probably' around the time of the savepoint, if it 
>>>>>>>>>>>> is not
>>>>>>>>>>>> yet for sure I would also check it.
>>>>>>>>>>>>
>>>>>>>>>>>> Have you already checked the log files of job manager and task
>>>>>>>>>>>> managers for the job running before and after the restore from the 
>>>>>>>>>>>> check
>>>>>>>>>>>> point? Is everything successful there, no errors, relevant 
>>>>>>>>>>>> warnings or
>>>>>>>>>>>> exceptions?
>>>>>>>>>>>>
>>>>>>>>>>>> As the next step, I would suggest to log all encountered events
>>>>>>>>>>>> in DistinctFunction.reduce if possible for production data and 
>>>>>>>>>>>> check
>>>>>>>>>>>> whether the missed events are eventually processed before or after 
>>>>>>>>>>>> the
>>>>>>>>>>>> savepoint. The following log message indicates a border between 
>>>>>>>>>>>> the events
>>>>>>>>>>>> that should be included into the savepoint (logged before) or not:
>>>>>>>>>>>> “{} ({}, synchronous part) in thread {} took {} ms” (template)
>>>>>>>>>>>> Also check if the savepoint has been overall completed:
>>>>>>>>>>>> "{} ({}, asynchronous part) in thread {} took {} ms."
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Andrey
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html
>>>>>>>>>>>>
>>>>>>>>>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> Using StreamingFileSink is not a convenient option for
>>>>>>>>>>>> production use for us as it doesn't support s3*. I could use
>>>>>>>>>>>> StreamingFileSink just to verify, but I don't see much point in 
>>>>>>>>>>>> doing so.
>>>>>>>>>>>> Please consider my previous comment:
>>>>>>>>>>>>
>>>>>>>>>>>> > I realized that BucketingSink must not play any role in this
>>>>>>>>>>>> problem. This is because only when the 24-hour window triggers,
>>>>>>>>>>>> BucketingSink gets a burst of input. Around the state restoring 
>>>>>>>>>>>> point
>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose 
>>>>>>>>>>>> anything
>>>>>>>>>>>> either (right?).
>>>>>>>>>>>>
>>>>>>>>>>>> I could also use a kafka sink instead, but I can't imagine how
>>>>>>>>>>>> there could be any difference. It's very real that the sink 
>>>>>>>>>>>> doesn't get any
>>>>>>>>>>>> input for a long time until the 24-hour window closes, and then it 
>>>>>>>>>>>> quickly
>>>>>>>>>>>> writes out everything because it's not that much data eventually 
>>>>>>>>>>>> for the
>>>>>>>>>>>> distinct values.
>>>>>>>>>>>>
>>>>>>>>>>>> Any ideas for debugging what's happening around the savepoint &
>>>>>>>>>>>> restoration time?
>>>>>>>>>>>>
>>>>>>>>>>>> *) I actually implemented StreamingFileSink as an alternative
>>>>>>>>>>>> sink. This was before I came to realize that most likely the sink 
>>>>>>>>>>>> component
>>>>>>>>>>>> has nothing to do with the data loss problem. I tried it with 
>>>>>>>>>>>> s3n:// path
>>>>>>>>>>>> just to see an exception being thrown. In the source code I indeed 
>>>>>>>>>>>> then
>>>>>>>>>>>> found an explicit check for the target path scheme to be "hdfs://".
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <
>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Ok, I think before further debugging the window reduced state,
>>>>>>>>>>>>> could you try the new ‘StreamingFileSink’ [1] introduced in
>>>>>>>>>>>>> Flink 1.6.0 instead of the previous 'BucketingSink’?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, sorry for my confusing comment. I just meant that it
>>>>>>>>>>>>> seems like there's a bug somewhere now that the output is missing 
>>>>>>>>>>>>> some data.
>>>>>>>>>>>>>
>>>>>>>>>>>>> > I would wait and check the actual output in s3 because it is
>>>>>>>>>>>>> the main result of the job
>>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, and that's what I have already done. There seems to be
>>>>>>>>>>>>> always some data loss with the production data volumes, if the 
>>>>>>>>>>>>> job has been
>>>>>>>>>>>>> restarted on that day.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Would you have any suggestions for how to debug this further?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Many thanks for stepping in.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <
>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So it is a per key deduplication job.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Yes, I would wait and check the actual output in s3 because
>>>>>>>>>>>>>> it is the main result of the job and
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> > The late data around the time of taking savepoint might be
>>>>>>>>>>>>>> not included into the savepoint but it should be behind the 
>>>>>>>>>>>>>> snapshotted
>>>>>>>>>>>>>> offset in Kafka.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> is not a bug, it is a possible behaviour.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The savepoint is a snapshot of the data in transient which is
>>>>>>>>>>>>>> already consumed from Kafka.
>>>>>>>>>>>>>> Basically the full contents of the window result is split
>>>>>>>>>>>>>> between the savepoint and what can come after the savepoint'ed 
>>>>>>>>>>>>>> offset in
>>>>>>>>>>>>>> Kafka but before the window result is written into s3.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Allowed lateness should not affect it, I am just saying that
>>>>>>>>>>>>>> the final result in s3 should include all records after it.
>>>>>>>>>>>>>> This is what should be guaranteed but not the contents of the
>>>>>>>>>>>>>> intermediate savepoint.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for your answer!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I check for the missed data from the final output on s3. So I
>>>>>>>>>>>>>> wait until the next day, then run the same thing re-implemented 
>>>>>>>>>>>>>> in batch,
>>>>>>>>>>>>>> and compare the output.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> > The late data around the time of taking savepoint might be
>>>>>>>>>>>>>> not included into the savepoint but it should be behind the 
>>>>>>>>>>>>>> snapshotted
>>>>>>>>>>>>>> offset in Kafka.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Yes, I would definitely expect that. It seems like there's a
>>>>>>>>>>>>>> bug somewhere.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> > Then it should just come later after the restore and should
>>>>>>>>>>>>>> be reduced within the allowed lateness into the final result 
>>>>>>>>>>>>>> which is saved
>>>>>>>>>>>>>> into s3.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Well, as far as I know, allowed lateness doesn't play any
>>>>>>>>>>>>>> role here, because I started running the job with 
>>>>>>>>>>>>>> allowedLateness=0, and
>>>>>>>>>>>>>> still get the data loss, while my late data output doesn't 
>>>>>>>>>>>>>> receive anything.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> > Also, is this `DistinctFunction.reduce` just an example or
>>>>>>>>>>>>>> the actual implementation, basically saving just one of records 
>>>>>>>>>>>>>> inside the
>>>>>>>>>>>>>> 24h window in s3? then what is missing there?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Yes, it's the actual implementation. Note that there's a
>>>>>>>>>>>>>> keyBy before the DistinctFunction. So there's one record for 
>>>>>>>>>>>>>> each key
>>>>>>>>>>>>>> (which is the combination of a couple of fields). In practice 
>>>>>>>>>>>>>> I've seen
>>>>>>>>>>>>>> that we're missing ~2000-4000 elements on each restore, and the 
>>>>>>>>>>>>>> total
>>>>>>>>>>>>>> output is obviously much more than that.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here's the full code for the key selector:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public class MapKeySelector implements
>>>>>>>>>>>>>> KeySelector<Map<String,String>, Object> {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     private final String[] fields;
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     public MapKeySelector(String... fields) {
>>>>>>>>>>>>>>         this.fields = fields;
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>     public Object getKey(Map<String, String> event) throws
>>>>>>>>>>>>>> Exception {
>>>>>>>>>>>>>>         Tuple key =
>>>>>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance();
>>>>>>>>>>>>>>         for (int i = 0; i < fields.length; i++) {
>>>>>>>>>>>>>>             key.setField(event.getOrDefault(fields[i], ""),
>>>>>>>>>>>>>> i);
>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>         return key;
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> And a more exact example on how it's used:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>                 .keyBy(new MapKeySelector("ID", "PLAYER_ID",
>>>>>>>>>>>>>> "FIELD", "KEY_NAME", "KEY_VALUE"))
>>>>>>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <
>>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Where exactly does the data miss? When do you notice that?
>>>>>>>>>>>>>>> Do you check it:
>>>>>>>>>>>>>>> - debugging `DistinctFunction.reduce` right after resume in
>>>>>>>>>>>>>>> the middle of the day
>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>> - some distinct records miss in the final output
>>>>>>>>>>>>>>> of BucketingSink in s3 after window result is actually 
>>>>>>>>>>>>>>> triggered and saved
>>>>>>>>>>>>>>> into s3 at the end of the day? is this the main output?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The late data around the time of taking savepoint might be
>>>>>>>>>>>>>>> not included into the savepoint but it should be behind the 
>>>>>>>>>>>>>>> snapshotted
>>>>>>>>>>>>>>> offset in Kafka. Then it should just come later after the 
>>>>>>>>>>>>>>> restore and
>>>>>>>>>>>>>>> should be reduced within the allowed lateness into the final 
>>>>>>>>>>>>>>> result which
>>>>>>>>>>>>>>> is saved into s3.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Also, is this `DistinctFunction.reduce` just an example or
>>>>>>>>>>>>>>> the actual implementation, basically saving just one of records 
>>>>>>>>>>>>>>> inside the
>>>>>>>>>>>>>>> 24h window in s3? then what is missing there?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I changed to allowedLateness=0, no change, still missing
>>>>>>>>>>>>>>> data when restoring from savepoint.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <
>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I realized that BucketingSink must not play any role in
>>>>>>>>>>>>>>>> this problem. This is because only when the 24-hour window 
>>>>>>>>>>>>>>>> triggers,
>>>>>>>>>>>>>>>> BucketinSink gets a burst of input. Around the state restoring 
>>>>>>>>>>>>>>>> point
>>>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose 
>>>>>>>>>>>>>>>> anything
>>>>>>>>>>>>>>>> either (right?).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I will next try removing the allowedLateness entirely from
>>>>>>>>>>>>>>>> the equation.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> In the meanwhile, please let me know if you have any
>>>>>>>>>>>>>>>> suggestions for debugging the lost data, for example what logs 
>>>>>>>>>>>>>>>> to enable.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known
>>>>>>>>>>>>>>>> issues with that, that could contribute to lost data when 
>>>>>>>>>>>>>>>> restoring a
>>>>>>>>>>>>>>>> savepoint?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <
>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Some data is silently lost on my Flink stream job when
>>>>>>>>>>>>>>>>> state is restored from a savepoint.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Do you have any debugging hints to find out where exactly
>>>>>>>>>>>>>>>>> the data gets dropped?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> My job gathers distinct values using a 24-hour window. It
>>>>>>>>>>>>>>>>> doesn't have any custom state management.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> When I cancel the job with savepoint and restore from that
>>>>>>>>>>>>>>>>> savepoint, some data is missed. It seems to be losing just a 
>>>>>>>>>>>>>>>>> small amount
>>>>>>>>>>>>>>>>> of data. The event time of lost data is probably around the 
>>>>>>>>>>>>>>>>> time of
>>>>>>>>>>>>>>>>> savepoint. In other words the rest of the time window is not 
>>>>>>>>>>>>>>>>> entirely
>>>>>>>>>>>>>>>>> missed – collection works correctly also for (most of the) 
>>>>>>>>>>>>>>>>> events that come
>>>>>>>>>>>>>>>>> in after restoring.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> When the job processes a full 24-hour window without
>>>>>>>>>>>>>>>>> interruptions it doesn't miss anything.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Usually the problem doesn't happen in test environments
>>>>>>>>>>>>>>>>> that have smaller parallelism and smaller data volumes. But 
>>>>>>>>>>>>>>>>> in production
>>>>>>>>>>>>>>>>> volumes the job seems to be consistently missing at least 
>>>>>>>>>>>>>>>>> something on
>>>>>>>>>>>>>>>>> every restore.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This issue has consistently happened since the job was
>>>>>>>>>>>>>>>>> initially created. It was at first run on an older version of 
>>>>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm wondering if this could be for example some
>>>>>>>>>>>>>>>>> synchronization issue between the kafka consumer offsets vs. 
>>>>>>>>>>>>>>>>> what's been
>>>>>>>>>>>>>>>>> written by BucketingSink?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1. Job content, simplified
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>         kafkaStream
>>>>>>>>>>>>>>>>>                 .flatMap(new ExtractFieldsFunction())
>>>>>>>>>>>>>>>>>                 .keyBy(new MapKeySelector(1, 2, 3, 4))
>>>>>>>>>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>>>>>>>>>                 .allowedLateness(allowedLateness)
>>>>>>>>>>>>>>>>>                 .sideOutputLateData(lateDataTag)
>>>>>>>>>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>>>>>>>>>                 .addSink(sink)
>>>>>>>>>>>>>>>>>                 // use a fixed number of output partitions
>>>>>>>>>>>>>>>>>                 .setParallelism(8))
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>>>>>  * Usage: .keyBy("the", "distinct", "fields").reduce(new
>>>>>>>>>>>>>>>>> DistinctFunction())
>>>>>>>>>>>>>>>>>  */
>>>>>>>>>>>>>>>>> public class DistinctFunction implements
>>>>>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> {
>>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>>     public Map<String, String> reduce(Map<String, String>
>>>>>>>>>>>>>>>>> value1, Map<String, String> value2) {
>>>>>>>>>>>>>>>>>         return value1;
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2. State configuration
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> boolean enableIncrementalCheckpointing = true;
>>>>>>>>>>>>>>>>> String statePath = "s3n://bucket/savepoints";
>>>>>>>>>>>>>>>>> new RocksDBStateBackend(statePath,
>>>>>>>>>>>>>>>>> enableIncrementalCheckpointing);
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Checkpointing Mode Exactly Once
>>>>>>>>>>>>>>>>> Interval 1m 0s
>>>>>>>>>>>>>>>>> Timeout 10m 0s
>>>>>>>>>>>>>>>>> Minimum Pause Between Checkpoints 1m 0s
>>>>>>>>>>>>>>>>> Maximum Concurrent Checkpoints 1
>>>>>>>>>>>>>>>>> Persist Checkpoints Externally Enabled (retain on
>>>>>>>>>>>>>>>>> cancellation)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 3. BucketingSink configuration
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> We use BucketingSink, I don't think there's anything
>>>>>>>>>>>>>>>>> special here, if not the fact that we're writing to S3.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>         String outputPath = "s3://bucket/output";
>>>>>>>>>>>>>>>>>         BucketingSink<Map<String, String>> sink = new
>>>>>>>>>>>>>>>>> BucketingSink<Map<String, String>>(outputPath)
>>>>>>>>>>>>>>>>>                 .setBucketer(new ProcessdateBucketer())
>>>>>>>>>>>>>>>>>                 .setBatchSize(batchSize)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>>>>>>>>>>>>>>>         sink.setWriter(new IdJsonWriter());
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 4. Kafka & event time
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> My flink job reads the data from Kafka, using a
>>>>>>>>>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer 
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> synchronize watermarks accross all kafka partitions. We also 
>>>>>>>>>>>>>>>>> write late
>>>>>>>>>>>>>>>>> data to side output, but nothing is written there – if it 
>>>>>>>>>>>>>>>>> would, it could
>>>>>>>>>>>>>>>>> explain missed data in the main output (I'm also sure that 
>>>>>>>>>>>>>>>>> our late data
>>>>>>>>>>>>>>>>> writing works, because we previously had some actual late 
>>>>>>>>>>>>>>>>> data which ended
>>>>>>>>>>>>>>>>> up there).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 5. allowedLateness
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It may be or may not be relevant that I have also enabled
>>>>>>>>>>>>>>>>> allowedLateness with 1 minute lateness on the 24-hour window:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> If that makes sense, I could try removing allowedLateness
>>>>>>>>>>>>>>>>> entirely? That would be just to rule out that Flink doesn't 
>>>>>>>>>>>>>>>>> have a bug
>>>>>>>>>>>>>>>>> that's related to restoring state in combination with the 
>>>>>>>>>>>>>>>>> allowedLateness
>>>>>>>>>>>>>>>>> feature. After all, all of our data should be in a good 
>>>>>>>>>>>>>>>>> enough order to not
>>>>>>>>>>>>>>>>> be late, given the max out of orderness used on kafka 
>>>>>>>>>>>>>>>>> consumer timestamp
>>>>>>>>>>>>>>>>> extractor.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thank you in advance!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to