Sorry not posting on the mail list was my mistake :/

On Wed, 13 Feb 2019 at 15:01, Juho Autio <juho.au...@rovio.com> wrote:

> Thanks for stepping in, did you post outside of the mailing list on
> purpose btw?
>
> This I did long time ago:
>
> To rule out for good any questions about sink behaviour, the job was
>> killed and started with an additional Kafka sink.
>> The same number of ids were missed in both outputs: KafkaSink &
>> BucketingSink.
>
>
> (I wrote about that On Oct 1, 2018 in this email thread)
>
> After that I did the savepoint analysis with Bravo.
>
> Currently I'm indeed trying to get suggestions how to debug further, for
> example, where to add additional kafka output, to catch where the data gets
> lost. That would probably be somewhere in Flink's internals.
>
> I could try to share the full code also, but IMHO the problem has been
> quite well narrowed down, considering that data can be found in savepoint,
> savepoint is successfully restored, and after restoring the data doesn't go
> to "user code" (like the reducer) any more.
>
> On Wed, Feb 13, 2019 at 3:47 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>
>> Hi Juho!
>> I think the reason you are not getting much answers here is because it is
>> very hard to debug this problem remotely.
>> Seemingly you do very normal operations, the state contains all the
>> required data and nobody else has hit a similar problem for ages.
>>
>> My best guess would be some bug with the deduplication or output writing
>> logic but without a complete code example its very hard to say anything
>> useful.
>> Did you try writing it to Kafka to see if the output is there? (that way
>> we could rule out the dedup probllem)
>>
>> Cheers,
>> Gyula
>>
>> On Wed, Feb 13, 2019 at 2:37 PM Juho Autio <juho.au...@rovio.com> wrote:
>>
>>> Stefan (or anyone!), please, could I have some feedback on the findings
>>> that I reported on Dec 21, 2018? This is still a major blocker..
>>>
>>> On Thu, Jan 31, 2019 at 11:46 AM Juho Autio <juho.au...@rovio.com>
>>> wrote:
>>>
>>>> Hello, is there anyone that could help with this?
>>>>
>>>> On Fri, Jan 11, 2019 at 8:14 AM Juho Autio <juho.au...@rovio.com>
>>>> wrote:
>>>>
>>>>> Stefan, would you have time to comment?
>>>>>
>>>>> On Wednesday, January 2, 2019, Juho Autio <juho.au...@rovio.com>
>>>>> wrote:
>>>>>
>>>>>> Bump – does anyone know if Stefan will be available to comment the
>>>>>> latest findings? Thanks.
>>>>>>
>>>>>> On Fri, Dec 21, 2018 at 2:33 PM Juho Autio <juho.au...@rovio.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Stefan, I managed to analyze savepoint with bravo. It seems that the
>>>>>>> data that's missing from output *is* found in savepoint.
>>>>>>>
>>>>>>> I simplified my test case to the following:
>>>>>>>
>>>>>>> - job 1 has bee running for ~10 days
>>>>>>> - savepoint X created & job 1 cancelled
>>>>>>> - job 2 started with restore from savepoint X
>>>>>>>
>>>>>>> Then I waited until the next day so that job 2 has triggered the 24
>>>>>>> hour window.
>>>>>>>
>>>>>>> Then I analyzed the output & savepoint:
>>>>>>>
>>>>>>> - compare job 2 output with the output of a batch pyspark script =>
>>>>>>> find 4223 missing rows
>>>>>>> - pick one of the missing rows (say, id Z)
>>>>>>> - read savepoint X with bravo, filter for id Z => Z was found in the
>>>>>>> savepoint!
>>>>>>>
>>>>>>> How can it be possible that the value is in state but doesn't end up
>>>>>>> in output after state has been restored & window is eventually 
>>>>>>> triggered?
>>>>>>>
>>>>>>> I also did similar analysis on the previous case where I savepointed
>>>>>>> & restored the job multiple times (5) within the same 24-hour window. A
>>>>>>> missing id that I drilled down to, was found in all of those savepoints,
>>>>>>> yet missing from the output that gets written at the end of the day. 
>>>>>>> This
>>>>>>> is even more surprising: that the missing ID was written to the new
>>>>>>> savepoints also after restoring. Is the reducer state somehow decoupled
>>>>>>> from the window contents?
>>>>>>>
>>>>>>> Big thanks to bravo-developer Gyula for guiding me through to be
>>>>>>> able read the reducer state! https://github.com/king/bravo/pull/11
>>>>>>>
>>>>>>> Gyula also had an idea for how to troubleshoot the missing data in a
>>>>>>> scalable way: I could add some "side effect kafka output" on individual
>>>>>>> operators. This should allow tracking more closely at which point the 
>>>>>>> data
>>>>>>> gets lost. However, maybe this would have to be in some Flink's internal
>>>>>>> components, and I'm not sure which those would be.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Juho
>>>>>>>
>>>>>>> On Mon, Nov 19, 2018 at 11:52 AM Juho Autio <juho.au...@rovio.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Hi Stefan,
>>>>>>>>
>>>>>>>> Bravo doesn't currently support reading a reducer state. I gave it
>>>>>>>> a try but couldn't get to a working implementation yet. If anyone can
>>>>>>>> provide some insight on how to make this work, please share at github:
>>>>>>>> https://github.com/king/bravo/pull/11
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>> On Tue, Oct 23, 2018 at 3:32 PM Juho Autio <juho.au...@rovio.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I was glad to find that bravo had now been updated to support
>>>>>>>>> installing bravo to a local maven repo.
>>>>>>>>>
>>>>>>>>> I was able to load a checkpoint created by my job, thanks to the
>>>>>>>>> example provided in bravo README, but I'm still missing the essential 
>>>>>>>>> piece.
>>>>>>>>>
>>>>>>>>> My code was:
>>>>>>>>>
>>>>>>>>>         OperatorStateReader reader = new OperatorStateReader(env2,
>>>>>>>>> savepoint, "DistinctFunction");
>>>>>>>>>         DontKnowWhatTypeThisIs reducingState =
>>>>>>>>> reader.readKeyedStates(what should I put here?);
>>>>>>>>>
>>>>>>>>> I don't know how to read the values collected from reduce() calls
>>>>>>>>> in the state. Is there a way to access the reducing state of the 
>>>>>>>>> window
>>>>>>>>> with bravo? I'm a bit confused how this works, because when I check 
>>>>>>>>> with
>>>>>>>>> debugger, flink internally uses a ReducingStateDescriptor
>>>>>>>>> with name=window-contents, but still reading operator state for
>>>>>>>>> "DistinctFunction" didn't at least throw an exception 
>>>>>>>>> ("window-contents"
>>>>>>>>> threw – obviously there's no operator by that name).
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Juho
>>>>>>>>>
>>>>>>>>> On Mon, Oct 15, 2018 at 2:25 PM Juho Autio <juho.au...@rovio.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Stefan,
>>>>>>>>>>
>>>>>>>>>> Sorry but it doesn't seem immediately clear to me what's a good
>>>>>>>>>> way to use https://github.com/king/bravo.
>>>>>>>>>>
>>>>>>>>>> How are people using it? Would you for example modify
>>>>>>>>>> build.gradle somehow to publish the bravo as a library 
>>>>>>>>>> locally/internally?
>>>>>>>>>> Or add code directly in the bravo project (locally) and run it from 
>>>>>>>>>> there
>>>>>>>>>> (using an IDE, for example)? Also it doesn't seem like the bravo 
>>>>>>>>>> gradle
>>>>>>>>>> project supports building a flink job jar, but if it does, how do I 
>>>>>>>>>> do it?
>>>>>>>>>>
>>>>>>>>>> Thanks.
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio <juho.au...@rovio.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Good then, I'll try to analyze the savepoints with Bravo. Thanks!
>>>>>>>>>>>
>>>>>>>>>>> > How would you assume that backpressure would influence your
>>>>>>>>>>> updates? Updates to each local state still happen event-by-event, 
>>>>>>>>>>> in a
>>>>>>>>>>> single reader/writing thread.
>>>>>>>>>>>
>>>>>>>>>>> Sure, just an ignorant guess by me. I'm not familiar with most
>>>>>>>>>>> of Flink's internals. Any way high backpressure is not a seen on 
>>>>>>>>>>> this job
>>>>>>>>>>> after it has caught up the lag, so at I thought it would be worth
>>>>>>>>>>> mentioning.
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter <
>>>>>>>>>>> s.rich...@data-artisans.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.au...@rovio.com
>>>>>>>>>>>> >:
>>>>>>>>>>>>
>>>>>>>>>>>> > you could take a look at Bravo [1] to query your savepoints
>>>>>>>>>>>> and to check if the state in the savepoint complete w.r.t your 
>>>>>>>>>>>> expectations
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks. I'm not 100% if this is the case, but to me it seemed
>>>>>>>>>>>> like the missed ids were being logged by the reducer soon after 
>>>>>>>>>>>> the job had
>>>>>>>>>>>> started (after restoring a savepoint). But on the other hand, 
>>>>>>>>>>>> after that I
>>>>>>>>>>>> also made another savepoint & restored that, so what I could check 
>>>>>>>>>>>> is: does
>>>>>>>>>>>> that next savepoint have the missed ids that were logged (a couple 
>>>>>>>>>>>> of
>>>>>>>>>>>> minutes before the savepoint was created, so there should've been 
>>>>>>>>>>>> more than
>>>>>>>>>>>> enough time to add them to the state before the savepoint was 
>>>>>>>>>>>> triggered) or
>>>>>>>>>>>> not. Any way, if I would be able to verify with Bravo that the ids 
>>>>>>>>>>>> are
>>>>>>>>>>>> missing from the savepoint (even though reduced logged that it saw 
>>>>>>>>>>>> them),
>>>>>>>>>>>> would that help in figuring out where they are lost? Is there some 
>>>>>>>>>>>> major
>>>>>>>>>>>> difference compared to just looking at the final output after 
>>>>>>>>>>>> window has
>>>>>>>>>>>> been triggered?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I think that makes a difference. For example, you can
>>>>>>>>>>>> investigate if there is a state loss or a problem with the 
>>>>>>>>>>>> windowing. In
>>>>>>>>>>>> the savepoint you could see which keys exists and to which windows 
>>>>>>>>>>>> they are
>>>>>>>>>>>> assigned. Also just to make sure there is no misunderstanding: only
>>>>>>>>>>>> elements that are in the state at the start of a savepoint are 
>>>>>>>>>>>> expected to
>>>>>>>>>>>> be part of the savepoint; all elements between start and 
>>>>>>>>>>>> completion of the
>>>>>>>>>>>> savepoint are not expected to be part of the savepoint.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> > I also doubt that the problem is about backpressure after
>>>>>>>>>>>> restore, because the job will only continue running after the 
>>>>>>>>>>>> state restore
>>>>>>>>>>>> is already completed.
>>>>>>>>>>>>
>>>>>>>>>>>> Yes, I'm not suspecting that the state restoring would be the
>>>>>>>>>>>> problem either. My concern was about backpressure possibly messing 
>>>>>>>>>>>> with the
>>>>>>>>>>>> updates of reducing state? I would tend to suspect that updating 
>>>>>>>>>>>> the state
>>>>>>>>>>>> consistently is what fails, where heavy load / backpressure might 
>>>>>>>>>>>> be a
>>>>>>>>>>>> factor.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> How would you assume that backpressure would influence your
>>>>>>>>>>>> updates? Updates to each local state still happen event-by-event, 
>>>>>>>>>>>> in a
>>>>>>>>>>>> single reader/writing thread.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter <
>>>>>>>>>>>> s.rich...@data-artisans.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> you could take a look at Bravo [1] to query your savepoints
>>>>>>>>>>>>> and to check if the state in the savepoint complete w.r.t your
>>>>>>>>>>>>> expectations. I somewhat doubt that there is a general problem 
>>>>>>>>>>>>> with the
>>>>>>>>>>>>> state/savepoints because many users are successfully running it 
>>>>>>>>>>>>> on a large
>>>>>>>>>>>>> state and I am not aware of any data loss problems, but nothing is
>>>>>>>>>>>>> impossible. What the savepoint does is also straight forward: 
>>>>>>>>>>>>> iterate a db
>>>>>>>>>>>>> snapshot and write all key/value pairs to disk, so all data that 
>>>>>>>>>>>>> was in the
>>>>>>>>>>>>> db at the time of the savepoint, should show up. I also doubt 
>>>>>>>>>>>>> that the
>>>>>>>>>>>>> problem is about backpressure after restore, because the job will 
>>>>>>>>>>>>> only
>>>>>>>>>>>>> continue running after the state restore is already completed. 
>>>>>>>>>>>>> Did you
>>>>>>>>>>>>> check if you are using exactly-one-semantics or at-least-once 
>>>>>>>>>>>>> semantics?
>>>>>>>>>>>>> Also did you check that the kafka consumer start position is 
>>>>>>>>>>>>> configured
>>>>>>>>>>>>> properly [2]? Are watermarks generated as expected after restore?
>>>>>>>>>>>>>
>>>>>>>>>>>>> One more unrelated high-level comment that I have: for a
>>>>>>>>>>>>> granularity of 24h windows, I wonder if it would not make sense 
>>>>>>>>>>>>> to use a
>>>>>>>>>>>>> batch job instead?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Stefan
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1] https://github.com/king/bravo
>>>>>>>>>>>>> [2]
>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>>>>>>>>>>>>
>>>>>>>>>>>>> Am 04.10.2018 um 14:53 schrieb Juho Autio <
>>>>>>>>>>>>> juho.au...@rovio.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the suggestions!
>>>>>>>>>>>>>
>>>>>>>>>>>>> > In general, it would be tremendously helpful to have a
>>>>>>>>>>>>> minimal working example which allows to reproduce the problem.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Definitely. The problem with reproducing has been that this
>>>>>>>>>>>>> only seems to happen in the bigger production data volumes.
>>>>>>>>>>>>>
>>>>>>>>>>>>> That's why I'm hoping to find a way to debug this with the
>>>>>>>>>>>>> production data. With that it seems to consistently cause some 
>>>>>>>>>>>>> misses every
>>>>>>>>>>>>> time the job is killed/restored.
>>>>>>>>>>>>>
>>>>>>>>>>>>> > check if it happens for shorter windows, like 1h etc
>>>>>>>>>>>>>
>>>>>>>>>>>>> What would be the benefit of that compared to 24h window?
>>>>>>>>>>>>>
>>>>>>>>>>>>> > simplify the job to not use a reduce window but simply a
>>>>>>>>>>>>> time window which outputs the window events. Then counting the 
>>>>>>>>>>>>> input and
>>>>>>>>>>>>> output events should allow you to verify the results. If you are 
>>>>>>>>>>>>> not seeing
>>>>>>>>>>>>> missing events, then it could have something to do with the 
>>>>>>>>>>>>> reducing state
>>>>>>>>>>>>> used in the reduce function.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hm, maybe, but not sure how useful that would be, because it
>>>>>>>>>>>>> wouldn't yet prove that it's related to reducing, because not 
>>>>>>>>>>>>> having a
>>>>>>>>>>>>> reduce function could also mean smaller load on the job, which 
>>>>>>>>>>>>> might alone
>>>>>>>>>>>>> be enough to make the problem not manifest.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Is there a way to debug what goes into the reducing state
>>>>>>>>>>>>> (including what gets removed or overwritten and what restored), 
>>>>>>>>>>>>> if that
>>>>>>>>>>>>> makes sense..? Maybe some suitable logging could be used to prove 
>>>>>>>>>>>>> that the
>>>>>>>>>>>>> lost data is written to the reducing state (or at least asked to 
>>>>>>>>>>>>> be
>>>>>>>>>>>>> written), but not found any more when the window closes and state 
>>>>>>>>>>>>> is
>>>>>>>>>>>>> flushed?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On configuration once more, we're using RocksDB state backend
>>>>>>>>>>>>> with asynchronous incremental checkpointing. The state is 
>>>>>>>>>>>>> restored from
>>>>>>>>>>>>> savepoints though, we haven't been using those checkpoints in 
>>>>>>>>>>>>> these tests
>>>>>>>>>>>>> (although they could be used in case of crashes – but we haven't 
>>>>>>>>>>>>> had those
>>>>>>>>>>>>> now).
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann <
>>>>>>>>>>>>> trohrm...@apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> another idea to further narrow down the problem could be to
>>>>>>>>>>>>>> simplify the job to not use a reduce window but simply a time 
>>>>>>>>>>>>>> window which
>>>>>>>>>>>>>> outputs the window events. Then counting the input and output 
>>>>>>>>>>>>>> events should
>>>>>>>>>>>>>> allow you to verify the results. If you are not seeing missing 
>>>>>>>>>>>>>> events, then
>>>>>>>>>>>>>> it could have something to do with the reducing state used in 
>>>>>>>>>>>>>> the reduce
>>>>>>>>>>>>>> function.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In general, it would be tremendously helpful to have a
>>>>>>>>>>>>>> minimal working example which allows to reproduce the problem.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin <
>>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> can you try to reduce the job to minimal reproducible
>>>>>>>>>>>>>>> example and share the job and input?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For example:
>>>>>>>>>>>>>>> - some simple records as input, e.g. tuples of primitive
>>>>>>>>>>>>>>> types saved as cvs
>>>>>>>>>>>>>>> - minimal deduplication job which processes them and misses
>>>>>>>>>>>>>>> records
>>>>>>>>>>>>>>> - check if it happens for shorter windows, like 1h etc
>>>>>>>>>>>>>>> - setup which you use for the job, ideally
>>>>>>>>>>>>>>> locally reproducible or cloud
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Sorry to insist, but we seem to be blocked for any serious
>>>>>>>>>>>>>>> usage of state in Flink if we can't rely on it to not miss data 
>>>>>>>>>>>>>>> in case of
>>>>>>>>>>>>>>> restore.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Would anyone have suggestions for how to troubleshoot this?
>>>>>>>>>>>>>>> So far I have verified with DEBUG logs that our reduce function 
>>>>>>>>>>>>>>> gets to
>>>>>>>>>>>>>>> process also the data that is missing from window output.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <
>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Andrey,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> To rule out for good any questions about sink behaviour,
>>>>>>>>>>>>>>>> the job was killed and started with an additional Kafka sink.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The same number of ids were missed in both outputs:
>>>>>>>>>>>>>>>> KafkaSink & BucketingSink.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I wonder what would be the next steps in debugging?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <
>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks, Andrey.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> > so it means that the savepoint does not loose at least
>>>>>>>>>>>>>>>>> some dropped records.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm not sure what you mean by that? I mean, it was known
>>>>>>>>>>>>>>>>> from the beginning, that not everything is lost before/after 
>>>>>>>>>>>>>>>>> restoring a
>>>>>>>>>>>>>>>>> savepoint, just some records around the time of restoration. 
>>>>>>>>>>>>>>>>> It's not 100%
>>>>>>>>>>>>>>>>> clear whether records are lost before making a savepoint or 
>>>>>>>>>>>>>>>>> after restoring
>>>>>>>>>>>>>>>>> it. Although, based on the new DEBUG logs it seems more like 
>>>>>>>>>>>>>>>>> losing some
>>>>>>>>>>>>>>>>> records that are seen ~soon after restoring. It seems like 
>>>>>>>>>>>>>>>>> Flink would be
>>>>>>>>>>>>>>>>> somehow confused either about the restored state vs. new 
>>>>>>>>>>>>>>>>> inserts to state.
>>>>>>>>>>>>>>>>> This could also be somehow linked to the high back pressure 
>>>>>>>>>>>>>>>>> on the kafka
>>>>>>>>>>>>>>>>> source while the stream is catching up.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> > If it is feasible for your setup, I suggest to insert
>>>>>>>>>>>>>>>>> one more map function after reduce and before sink.
>>>>>>>>>>>>>>>>> > etc.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Isn't that the same thing that we discussed before?
>>>>>>>>>>>>>>>>> Nothing is sent to BucketingSink before the window closes, so 
>>>>>>>>>>>>>>>>> I don't see
>>>>>>>>>>>>>>>>> how it would make any difference if we replace the 
>>>>>>>>>>>>>>>>> BucketingSink with a map
>>>>>>>>>>>>>>>>> function or another sink type. We don't create or restore 
>>>>>>>>>>>>>>>>> savepoints during
>>>>>>>>>>>>>>>>> the time when BucketingSink gets input or has open buckets – 
>>>>>>>>>>>>>>>>> that happens
>>>>>>>>>>>>>>>>> at a much later time of day. I would focus on figuring out 
>>>>>>>>>>>>>>>>> why the records
>>>>>>>>>>>>>>>>> are lost while the window is open. But I don't know how to do 
>>>>>>>>>>>>>>>>> that. Would
>>>>>>>>>>>>>>>>> you have any additional suggestions?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> so it means that the savepoint does not loose at least
>>>>>>>>>>>>>>>>>> some dropped records.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> If it is feasible for your setup, I suggest to insert one
>>>>>>>>>>>>>>>>>> more map function after reduce and before sink.
>>>>>>>>>>>>>>>>>> The map function should be called right after window is
>>>>>>>>>>>>>>>>>> triggered but before flushing to s3.
>>>>>>>>>>>>>>>>>> The result of reduce (deduped record) could be logged
>>>>>>>>>>>>>>>>>> there.
>>>>>>>>>>>>>>>>>> This should allow to check whether the processed distinct
>>>>>>>>>>>>>>>>>> records were buffered in the state after the restoration 
>>>>>>>>>>>>>>>>>> from the savepoint
>>>>>>>>>>>>>>>>>> or not. If they were buffered we should see that there was 
>>>>>>>>>>>>>>>>>> an attempt to
>>>>>>>>>>>>>>>>>> write them to the sink from the state.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Another suggestion is to try to write records to some
>>>>>>>>>>>>>>>>>> other sink or to both.
>>>>>>>>>>>>>>>>>> E.g. if you can access file system of workers, maybe just
>>>>>>>>>>>>>>>>>> into local files and check whether the records are also 
>>>>>>>>>>>>>>>>>> dropped there.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On 20 Sep 2018, at 15:37, Juho Autio <
>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Andrey!
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I was finally able to gather the DEBUG logs that you
>>>>>>>>>>>>>>>>>> suggested. In short, the reducer logged that it processed at 
>>>>>>>>>>>>>>>>>> least some of
>>>>>>>>>>>>>>>>>> the ids that were missing from the output.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> "At least some", because I didn't have the job running
>>>>>>>>>>>>>>>>>> with DEBUG logs for the full 24-hour window period. So I was 
>>>>>>>>>>>>>>>>>> only able to
>>>>>>>>>>>>>>>>>> look up if I can find *some* of the missing ids in the
>>>>>>>>>>>>>>>>>> DEBUG logs. Which I did indeed.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I changed the DistinctFunction.java to do this:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>>>     public Map<String, String> reduce(Map<String, String>
>>>>>>>>>>>>>>>>>> value1, Map<String, String> value2) {
>>>>>>>>>>>>>>>>>>         LOG.debug("DistinctFunction.reduce returns:
>>>>>>>>>>>>>>>>>> {}={}", value1.get("field"), value1.get("id"));
>>>>>>>>>>>>>>>>>>         return value1;
>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Then:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> vi flink-1.6.0/conf/log4j.properties
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Then I ran the following kind of test:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - Cancelled the on-going job with savepoint created at
>>>>>>>>>>>>>>>>>> ~Sep 18 08:35 UTC 2018
>>>>>>>>>>>>>>>>>> - Started a new cluster & job with DEBUG enabled at
>>>>>>>>>>>>>>>>>> ~09:13, restored from that previous cluster's savepoint
>>>>>>>>>>>>>>>>>> - Ran until caught up offsets
>>>>>>>>>>>>>>>>>> - Cancelled the job with a new savepoint
>>>>>>>>>>>>>>>>>> - Started a new job _without_ DEBUG, which restored the
>>>>>>>>>>>>>>>>>> new savepoint, let it keep running so that it will 
>>>>>>>>>>>>>>>>>> eventually write the
>>>>>>>>>>>>>>>>>> output
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Then on the next day, after results had been flushed when
>>>>>>>>>>>>>>>>>> the 24-hour window closed, I compared the results again with 
>>>>>>>>>>>>>>>>>> a batch
>>>>>>>>>>>>>>>>>> version's output. And found some missing ids as usual.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I drilled down to one specific missing id (I'm replacing
>>>>>>>>>>>>>>>>>> the actual value with AN12345 below), which was not found in 
>>>>>>>>>>>>>>>>>> the stream
>>>>>>>>>>>>>>>>>> output, but was found in batch output & flink DEBUG logs.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Related to that id, I gathered the following information:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 2018-09-18~09:13:21,000 job started & savepoint is
>>>>>>>>>>>>>>>>>> restored
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 2018-09-18 09:14:29,085 missing id is processed for the
>>>>>>>>>>>>>>>>>> first time, proved by this log line:
>>>>>>>>>>>>>>>>>> 2018-09-18 09:14:29,085 DEBUG
>>>>>>>>>>>>>>>>>> com.rovio.ds.flink.uniqueid.DistinctFunction                 
>>>>>>>>>>>>>>>>>>  -
>>>>>>>>>>>>>>>>>> DistinctFunction.reduce returns: s.aid1=AN12345
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 2018-09-18 09:15:14,264 first synchronous part of
>>>>>>>>>>>>>>>>>> checkpoint
>>>>>>>>>>>>>>>>>> 2018-09-18 09:15:16,544 first asynchronous part of
>>>>>>>>>>>>>>>>>> checkpoint
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> (
>>>>>>>>>>>>>>>>>> more occurrences of checkpoints (~1 min checkpointing
>>>>>>>>>>>>>>>>>> time + ~1 min delay before next)
>>>>>>>>>>>>>>>>>> /
>>>>>>>>>>>>>>>>>> more occurrences of DistinctFunction.reduce
>>>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 2018-09-18 09:23:45,053 missing id is processed for the
>>>>>>>>>>>>>>>>>> last time
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> To be noted, there was high backpressure after restoring
>>>>>>>>>>>>>>>>>> from savepoint until the stream caught up with the kafka 
>>>>>>>>>>>>>>>>>> offsets. Although,
>>>>>>>>>>>>>>>>>> our job uses assign timestamps & watermarks on the flink 
>>>>>>>>>>>>>>>>>> kafka consumer
>>>>>>>>>>>>>>>>>> itself, so event time of all partitions is synchronized. As 
>>>>>>>>>>>>>>>>>> expected, we
>>>>>>>>>>>>>>>>>> don't get any late data in the late data side output.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> From this we can see that the missing ids are processed
>>>>>>>>>>>>>>>>>> by the reducer, but they must get lost somewhere before the 
>>>>>>>>>>>>>>>>>> 24-hour window
>>>>>>>>>>>>>>>>>> is triggered.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I think it's worth mentioning once more that the stream
>>>>>>>>>>>>>>>>>> doesn't miss any ids if we let it's running without 
>>>>>>>>>>>>>>>>>> interruptions / state
>>>>>>>>>>>>>>>>>> restoring.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What's next?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> > only when the 24-hour window
>>>>>>>>>>>>>>>>>>> triggers, BucketingSink gets a burst of input
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> This is of course totally true, my understanding is the
>>>>>>>>>>>>>>>>>>> same. We cannot exclude problem there for sure, just 
>>>>>>>>>>>>>>>>>>> savepoints are used a
>>>>>>>>>>>>>>>>>>> lot w/o problem reports and BucketingSink is known to be 
>>>>>>>>>>>>>>>>>>> problematic with
>>>>>>>>>>>>>>>>>>> s3. That is why, I asked you:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> > You also wrote that the timestamps of lost event are
>>>>>>>>>>>>>>>>>>> 'probably' around the time of the savepoint, if it is not 
>>>>>>>>>>>>>>>>>>> yet for sure I
>>>>>>>>>>>>>>>>>>> would also check it.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Although, bucketing sink might loose any data at the end
>>>>>>>>>>>>>>>>>>> of the day (also from the middle). The fact, that it is 
>>>>>>>>>>>>>>>>>>> always around the
>>>>>>>>>>>>>>>>>>> time of taking a savepoint and not random, is surely 
>>>>>>>>>>>>>>>>>>> suspicious and
>>>>>>>>>>>>>>>>>>> possible savepoint failures need to be investigated.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Regarding the s3 problem, s3 doc says:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> > The caveat is that if you make a HEAD or GET request
>>>>>>>>>>>>>>>>>>> to the key name (to find if the object exists) before 
>>>>>>>>>>>>>>>>>>> creating the object,
>>>>>>>>>>>>>>>>>>> Amazon S3 provides 'eventual consistency' for 
>>>>>>>>>>>>>>>>>>> read-after-write.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The algorithm you suggest is how it is roughly
>>>>>>>>>>>>>>>>>>> implemented now (BucketingSink.openNewPartFile). My 
>>>>>>>>>>>>>>>>>>> understanding is that
>>>>>>>>>>>>>>>>>>> 'eventual consistency’ means that even if you just created 
>>>>>>>>>>>>>>>>>>> file (its name
>>>>>>>>>>>>>>>>>>> is key) it can be that you do not get it in the list or 
>>>>>>>>>>>>>>>>>>> exists (HEAD)
>>>>>>>>>>>>>>>>>>> returns false and you risk to rewrite the previous part.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The BucketingSink was designed for a standard file
>>>>>>>>>>>>>>>>>>> system. s3 is used over a file system wrapper atm but does 
>>>>>>>>>>>>>>>>>>> not always
>>>>>>>>>>>>>>>>>>> provide normal file system guarantees. See also last 
>>>>>>>>>>>>>>>>>>> example in [1].
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On 29 Aug 2018, at 12:11, Juho Autio <
>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Andrey, thank you very much for the debugging
>>>>>>>>>>>>>>>>>>> suggestions, I'll try them.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> In the meanwhile two more questions, please:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> > Just to keep in mind this problem with s3 and exclude
>>>>>>>>>>>>>>>>>>> it for sure. I would also check whether the size of missing 
>>>>>>>>>>>>>>>>>>> events is
>>>>>>>>>>>>>>>>>>> around the batch size of BucketingSink or not.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Fair enough, but I also want to focus on debugging the
>>>>>>>>>>>>>>>>>>> most probable subject first. So what do you think about 
>>>>>>>>>>>>>>>>>>> this – true or
>>>>>>>>>>>>>>>>>>> false: only when the 24-hour window triggers, BucketinSink 
>>>>>>>>>>>>>>>>>>> gets a burst of
>>>>>>>>>>>>>>>>>>> input. Around the state restoring point (middle of the day) 
>>>>>>>>>>>>>>>>>>> it doesn't get
>>>>>>>>>>>>>>>>>>> any input, so it can't lose anything either. Isn't this 
>>>>>>>>>>>>>>>>>>> true, or have I
>>>>>>>>>>>>>>>>>>> totally missed how Flink works in triggering window 
>>>>>>>>>>>>>>>>>>> results? I would not
>>>>>>>>>>>>>>>>>>> expect there to be any optimization that speculatively 
>>>>>>>>>>>>>>>>>>> triggers early
>>>>>>>>>>>>>>>>>>> results of a regular time window to the downstream 
>>>>>>>>>>>>>>>>>>> operators.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> > The old BucketingSink has in general problem with s3.
>>>>>>>>>>>>>>>>>>> Internally BucketingSink queries s3 as a file system to 
>>>>>>>>>>>>>>>>>>> list already
>>>>>>>>>>>>>>>>>>> written file parts (batches) and determine index of the 
>>>>>>>>>>>>>>>>>>> next part to start.
>>>>>>>>>>>>>>>>>>> Due to eventual consistency of checking file existence in 
>>>>>>>>>>>>>>>>>>> s3 [1], the
>>>>>>>>>>>>>>>>>>> BucketingSink can rewrite the previously written part and 
>>>>>>>>>>>>>>>>>>> basically loose
>>>>>>>>>>>>>>>>>>> it.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I was wondering, what does S3's "read-after-write
>>>>>>>>>>>>>>>>>>> consistency" (mentioned on the page you linked) actually 
>>>>>>>>>>>>>>>>>>> mean. It seems
>>>>>>>>>>>>>>>>>>> that this might be possible:
>>>>>>>>>>>>>>>>>>> - LIST keys, find current max index
>>>>>>>>>>>>>>>>>>> - choose next index = max + 1
>>>>>>>>>>>>>>>>>>> - HEAD next index: if it exists, keep adding + 1 until
>>>>>>>>>>>>>>>>>>> key doesn't exist on S3
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> But definitely sounds easier if a sink keeps track of
>>>>>>>>>>>>>>>>>>> files in a way that's guaranteed to be consistent.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>> Juho
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> true, StreamingFileSink does not support s3 in 1.6.0,
>>>>>>>>>>>>>>>>>>>> it is planned for the next 1.7 release, sorry for 
>>>>>>>>>>>>>>>>>>>> confusion.
>>>>>>>>>>>>>>>>>>>> The old BucketingSink has in general problem with s3.
>>>>>>>>>>>>>>>>>>>> Internally BucketingSink queries s3 as a file system
>>>>>>>>>>>>>>>>>>>> to list already written file parts (batches) and
>>>>>>>>>>>>>>>>>>>> determine index of the next part to start. Due to eventual 
>>>>>>>>>>>>>>>>>>>> consistency of
>>>>>>>>>>>>>>>>>>>> checking file existence in s3 [1], the BucketingSink can 
>>>>>>>>>>>>>>>>>>>> rewrite the
>>>>>>>>>>>>>>>>>>>> previously written part and basically loose it. It should 
>>>>>>>>>>>>>>>>>>>> be fixed for
>>>>>>>>>>>>>>>>>>>> StreamingFileSink in 1.7 where Flink keeps its own track 
>>>>>>>>>>>>>>>>>>>> of written parts
>>>>>>>>>>>>>>>>>>>> and does not rely on s3 as a file system.
>>>>>>>>>>>>>>>>>>>> I also include Kostas, he might add more details.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Just to keep in mind this problem with s3 and exclude
>>>>>>>>>>>>>>>>>>>> it for sure  I would also check whether the size of 
>>>>>>>>>>>>>>>>>>>> missing events is
>>>>>>>>>>>>>>>>>>>> around the batch size of BucketingSink or not. You also 
>>>>>>>>>>>>>>>>>>>> wrote that the
>>>>>>>>>>>>>>>>>>>> timestamps of lost event are 'probably' around the time of 
>>>>>>>>>>>>>>>>>>>> the savepoint,
>>>>>>>>>>>>>>>>>>>> if it is not yet for sure I would also check it.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Have you already checked the log files of job manager
>>>>>>>>>>>>>>>>>>>> and task managers for the job running before and after the 
>>>>>>>>>>>>>>>>>>>> restore from the
>>>>>>>>>>>>>>>>>>>> check point? Is everything successful there, no errors, 
>>>>>>>>>>>>>>>>>>>> relevant warnings
>>>>>>>>>>>>>>>>>>>> or exceptions?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> As the next step, I would suggest to log all
>>>>>>>>>>>>>>>>>>>> encountered events in DistinctFunction.reduce if possible 
>>>>>>>>>>>>>>>>>>>> for production
>>>>>>>>>>>>>>>>>>>> data and check whether the missed events are eventually 
>>>>>>>>>>>>>>>>>>>> processed before or
>>>>>>>>>>>>>>>>>>>> after the savepoint. The following log message indicates a 
>>>>>>>>>>>>>>>>>>>> border between
>>>>>>>>>>>>>>>>>>>> the events that should be included into the savepoint 
>>>>>>>>>>>>>>>>>>>> (logged before) or
>>>>>>>>>>>>>>>>>>>> not:
>>>>>>>>>>>>>>>>>>>> “{} ({}, synchronous part) in thread {} took {} ms”
>>>>>>>>>>>>>>>>>>>> (template)
>>>>>>>>>>>>>>>>>>>> Also check if the savepoint has been overall completed:
>>>>>>>>>>>>>>>>>>>> "{} ({}, asynchronous part) in thread {} took {} ms."
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On 24 Aug 2018, at 20:41, Juho Autio <
>>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Using StreamingFileSink is not a convenient option for
>>>>>>>>>>>>>>>>>>>> production use for us as it doesn't support s3*. I could 
>>>>>>>>>>>>>>>>>>>> use
>>>>>>>>>>>>>>>>>>>> StreamingFileSink just to verify, but I don't see much 
>>>>>>>>>>>>>>>>>>>> point in doing so.
>>>>>>>>>>>>>>>>>>>> Please consider my previous comment:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> > I realized that BucketingSink must not play any role
>>>>>>>>>>>>>>>>>>>> in this problem. This is because only when the 24-hour 
>>>>>>>>>>>>>>>>>>>> window triggers,
>>>>>>>>>>>>>>>>>>>> BucketingSink gets a burst of input. Around the state 
>>>>>>>>>>>>>>>>>>>> restoring point
>>>>>>>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't 
>>>>>>>>>>>>>>>>>>>> lose anything
>>>>>>>>>>>>>>>>>>>> either (right?).
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I could also use a kafka sink instead, but I can't
>>>>>>>>>>>>>>>>>>>> imagine how there could be any difference. It's very real 
>>>>>>>>>>>>>>>>>>>> that the sink
>>>>>>>>>>>>>>>>>>>> doesn't get any input for a long time until the 24-hour 
>>>>>>>>>>>>>>>>>>>> window closes, and
>>>>>>>>>>>>>>>>>>>> then it quickly writes out everything because it's not 
>>>>>>>>>>>>>>>>>>>> that much data
>>>>>>>>>>>>>>>>>>>> eventually for the distinct values.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Any ideas for debugging what's happening around the
>>>>>>>>>>>>>>>>>>>> savepoint & restoration time?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> *) I actually implemented StreamingFileSink as an
>>>>>>>>>>>>>>>>>>>> alternative sink. This was before I came to realize that 
>>>>>>>>>>>>>>>>>>>> most likely the
>>>>>>>>>>>>>>>>>>>> sink component has nothing to do with the data loss 
>>>>>>>>>>>>>>>>>>>> problem. I tried it
>>>>>>>>>>>>>>>>>>>> with s3n:// path just to see an exception being thrown. In 
>>>>>>>>>>>>>>>>>>>> the source code
>>>>>>>>>>>>>>>>>>>> I indeed then found an explicit check for the target path 
>>>>>>>>>>>>>>>>>>>> scheme to be
>>>>>>>>>>>>>>>>>>>> "hdfs://".
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Ok, I think before further debugging the window
>>>>>>>>>>>>>>>>>>>>> reduced state,
>>>>>>>>>>>>>>>>>>>>> could you try the new ‘StreamingFileSink’ [1]
>>>>>>>>>>>>>>>>>>>>> introduced in Flink 1.6.0 instead of the previous 
>>>>>>>>>>>>>>>>>>>>> 'BucketingSink’?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On 24 Aug 2018, at 18:03, Juho Autio <
>>>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Yes, sorry for my confusing comment. I just meant that
>>>>>>>>>>>>>>>>>>>>> it seems like there's a bug somewhere now that the output 
>>>>>>>>>>>>>>>>>>>>> is missing some
>>>>>>>>>>>>>>>>>>>>> data.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> > I would wait and check the actual output in s3
>>>>>>>>>>>>>>>>>>>>> because it is the main result of the job
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Yes, and that's what I have already done. There seems
>>>>>>>>>>>>>>>>>>>>> to be always some data loss with the production data 
>>>>>>>>>>>>>>>>>>>>> volumes, if the job
>>>>>>>>>>>>>>>>>>>>> has been restarted on that day.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Would you have any suggestions for how to debug this
>>>>>>>>>>>>>>>>>>>>> further?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Many thanks for stepping in.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> So it is a per key deduplication job.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Yes, I would wait and check the actual output in s3
>>>>>>>>>>>>>>>>>>>>>> because it is the main result of the job and
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> > The late data around the time of taking savepoint
>>>>>>>>>>>>>>>>>>>>>> might be not included into the savepoint but it should 
>>>>>>>>>>>>>>>>>>>>>> be behind the
>>>>>>>>>>>>>>>>>>>>>> snapshotted offset in Kafka.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> is not a bug, it is a possible behaviour.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> The savepoint is a snapshot of the data in transient
>>>>>>>>>>>>>>>>>>>>>> which is already consumed from Kafka.
>>>>>>>>>>>>>>>>>>>>>> Basically the full contents of the window result is
>>>>>>>>>>>>>>>>>>>>>> split between the savepoint and what can come after the 
>>>>>>>>>>>>>>>>>>>>>> savepoint'ed offset
>>>>>>>>>>>>>>>>>>>>>> in Kafka but before the window result is written into s3.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Allowed lateness should not affect it, I am just
>>>>>>>>>>>>>>>>>>>>>> saying that the final result in s3 should include all 
>>>>>>>>>>>>>>>>>>>>>> records after it.
>>>>>>>>>>>>>>>>>>>>>> This is what should be guaranteed but not the
>>>>>>>>>>>>>>>>>>>>>> contents of the intermediate savepoint.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio <
>>>>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks for your answer!
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I check for the missed data from the final output on
>>>>>>>>>>>>>>>>>>>>>> s3. So I wait until the next day, then run the same 
>>>>>>>>>>>>>>>>>>>>>> thing re-implemented in
>>>>>>>>>>>>>>>>>>>>>> batch, and compare the output.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> > The late data around the time of taking savepoint
>>>>>>>>>>>>>>>>>>>>>> might be not included into the savepoint but it should 
>>>>>>>>>>>>>>>>>>>>>> be behind the
>>>>>>>>>>>>>>>>>>>>>> snapshotted offset in Kafka.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Yes, I would definitely expect that. It seems like
>>>>>>>>>>>>>>>>>>>>>> there's a bug somewhere.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> > Then it should just come later after the restore
>>>>>>>>>>>>>>>>>>>>>> and should be reduced within the allowed lateness into 
>>>>>>>>>>>>>>>>>>>>>> the final result
>>>>>>>>>>>>>>>>>>>>>> which is saved into s3.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Well, as far as I know, allowed lateness doesn't play
>>>>>>>>>>>>>>>>>>>>>> any role here, because I started running the job with 
>>>>>>>>>>>>>>>>>>>>>> allowedLateness=0,
>>>>>>>>>>>>>>>>>>>>>> and still get the data loss, while my late data output 
>>>>>>>>>>>>>>>>>>>>>> doesn't receive
>>>>>>>>>>>>>>>>>>>>>> anything.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> > Also, is this `DistinctFunction.reduce` just an
>>>>>>>>>>>>>>>>>>>>>> example or the actual implementation, basically saving 
>>>>>>>>>>>>>>>>>>>>>> just one of records
>>>>>>>>>>>>>>>>>>>>>> inside the 24h window in s3? then what is missing there?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Yes, it's the actual implementation. Note that
>>>>>>>>>>>>>>>>>>>>>> there's a keyBy before the DistinctFunction. So there's 
>>>>>>>>>>>>>>>>>>>>>> one record for each
>>>>>>>>>>>>>>>>>>>>>> key (which is the combination of a couple of fields). In 
>>>>>>>>>>>>>>>>>>>>>> practice I've seen
>>>>>>>>>>>>>>>>>>>>>> that we're missing ~2000-4000 elements on each restore, 
>>>>>>>>>>>>>>>>>>>>>> and the total
>>>>>>>>>>>>>>>>>>>>>> output is obviously much more than that.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Here's the full code for the key selector:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> public class MapKeySelector implements
>>>>>>>>>>>>>>>>>>>>>> KeySelector<Map<String,String>, Object> {
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>     private final String[] fields;
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>     public MapKeySelector(String... fields) {
>>>>>>>>>>>>>>>>>>>>>>         this.fields = fields;
>>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>>>>>>>     public Object getKey(Map<String, String> event)
>>>>>>>>>>>>>>>>>>>>>> throws Exception {
>>>>>>>>>>>>>>>>>>>>>>         Tuple key =
>>>>>>>>>>>>>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance();
>>>>>>>>>>>>>>>>>>>>>>         for (int i = 0; i < fields.length; i++) {
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> key.setField(event.getOrDefault(fields[i], ""), i);
>>>>>>>>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>>>>>>>>         return key;
>>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> And a more exact example on how it's used:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>                 .keyBy(new MapKeySelector("ID",
>>>>>>>>>>>>>>>>>>>>>> "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
>>>>>>>>>>>>>>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>>>>>>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>>>>>>>> and...@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Where exactly does the data miss? When do you notice
>>>>>>>>>>>>>>>>>>>>>>> that?
>>>>>>>>>>>>>>>>>>>>>>> Do you check it:
>>>>>>>>>>>>>>>>>>>>>>> - debugging `DistinctFunction.reduce` right after
>>>>>>>>>>>>>>>>>>>>>>> resume in the middle of the day
>>>>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>> - some distinct records miss in the final output
>>>>>>>>>>>>>>>>>>>>>>> of BucketingSink in s3 after window result is actually 
>>>>>>>>>>>>>>>>>>>>>>> triggered and saved
>>>>>>>>>>>>>>>>>>>>>>> into s3 at the end of the day? is this the main output?
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> The late data around the time of taking savepoint
>>>>>>>>>>>>>>>>>>>>>>> might be not included into the savepoint but it should 
>>>>>>>>>>>>>>>>>>>>>>> be behind the
>>>>>>>>>>>>>>>>>>>>>>> snapshotted offset in Kafka. Then it should just come 
>>>>>>>>>>>>>>>>>>>>>>> later after the
>>>>>>>>>>>>>>>>>>>>>>> restore and should be reduced within the allowed 
>>>>>>>>>>>>>>>>>>>>>>> lateness into the final
>>>>>>>>>>>>>>>>>>>>>>> result which is saved into s3.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Also, is this `DistinctFunction.reduce` just an
>>>>>>>>>>>>>>>>>>>>>>> example or the actual implementation, basically saving 
>>>>>>>>>>>>>>>>>>>>>>> just one of records
>>>>>>>>>>>>>>>>>>>>>>> inside the 24h window in s3? then what is missing there?
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I changed to allowedLateness=0, no change, still
>>>>>>>>>>>>>>>>>>>>>>> missing data when restoring from savepoint.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I realized that BucketingSink must not play any
>>>>>>>>>>>>>>>>>>>>>>>> role in this problem. This is because only when the 
>>>>>>>>>>>>>>>>>>>>>>>> 24-hour window
>>>>>>>>>>>>>>>>>>>>>>>> triggers, BucketinSink gets a burst of input. Around 
>>>>>>>>>>>>>>>>>>>>>>>> the state restoring
>>>>>>>>>>>>>>>>>>>>>>>> point (middle of the day) it doesn't get any input, so 
>>>>>>>>>>>>>>>>>>>>>>>> it can't lose
>>>>>>>>>>>>>>>>>>>>>>>> anything either (right?).
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I will next try removing the allowedLateness
>>>>>>>>>>>>>>>>>>>>>>>> entirely from the equation.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> In the meanwhile, please let me know if you have
>>>>>>>>>>>>>>>>>>>>>>>> any suggestions for debugging the lost data, for 
>>>>>>>>>>>>>>>>>>>>>>>> example what logs to
>>>>>>>>>>>>>>>>>>>>>>>> enable.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any
>>>>>>>>>>>>>>>>>>>>>>>> known issues with that, that could contribute to lost 
>>>>>>>>>>>>>>>>>>>>>>>> data when restoring a
>>>>>>>>>>>>>>>>>>>>>>>> savepoint?
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Some data is silently lost on my Flink stream job
>>>>>>>>>>>>>>>>>>>>>>>>> when state is restored from a savepoint.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Do you have any debugging hints to find out where
>>>>>>>>>>>>>>>>>>>>>>>>> exactly the data gets dropped?
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> My job gathers distinct values using a 24-hour
>>>>>>>>>>>>>>>>>>>>>>>>> window. It doesn't have any custom state management.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> When I cancel the job with savepoint and restore
>>>>>>>>>>>>>>>>>>>>>>>>> from that savepoint, some data is missed. It seems to 
>>>>>>>>>>>>>>>>>>>>>>>>> be losing just a
>>>>>>>>>>>>>>>>>>>>>>>>> small amount of data. The event time of lost data is 
>>>>>>>>>>>>>>>>>>>>>>>>> probably around the
>>>>>>>>>>>>>>>>>>>>>>>>> time of savepoint. In other words the rest of the 
>>>>>>>>>>>>>>>>>>>>>>>>> time window is not
>>>>>>>>>>>>>>>>>>>>>>>>> entirely missed – collection works correctly also for 
>>>>>>>>>>>>>>>>>>>>>>>>> (most of the) events
>>>>>>>>>>>>>>>>>>>>>>>>> that come in after restoring.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> When the job processes a full 24-hour window
>>>>>>>>>>>>>>>>>>>>>>>>> without interruptions it doesn't miss anything.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Usually the problem doesn't happen in test
>>>>>>>>>>>>>>>>>>>>>>>>> environments that have smaller parallelism and 
>>>>>>>>>>>>>>>>>>>>>>>>> smaller data volumes. But in
>>>>>>>>>>>>>>>>>>>>>>>>> production volumes the job seems to be consistently 
>>>>>>>>>>>>>>>>>>>>>>>>> missing at least
>>>>>>>>>>>>>>>>>>>>>>>>> something on every restore.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> This issue has consistently happened since the job
>>>>>>>>>>>>>>>>>>>>>>>>> was initially created. It was at first run on an 
>>>>>>>>>>>>>>>>>>>>>>>>> older version of Flink
>>>>>>>>>>>>>>>>>>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 
>>>>>>>>>>>>>>>>>>>>>>>>> & 1.6.0.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> I'm wondering if this could be for example some
>>>>>>>>>>>>>>>>>>>>>>>>> synchronization issue between the kafka consumer 
>>>>>>>>>>>>>>>>>>>>>>>>> offsets vs. what's been
>>>>>>>>>>>>>>>>>>>>>>>>> written by BucketingSink?
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> 1. Job content, simplified
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>         kafkaStream
>>>>>>>>>>>>>>>>>>>>>>>>>                 .flatMap(new
>>>>>>>>>>>>>>>>>>>>>>>>> ExtractFieldsFunction())
>>>>>>>>>>>>>>>>>>>>>>>>>                 .keyBy(new MapKeySelector(1, 2, 3,
>>>>>>>>>>>>>>>>>>>>>>>>> 4))
>>>>>>>>>>>>>>>>>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>>>>>>>>>>>>>>>>>                 .allowedLateness(allowedLateness)
>>>>>>>>>>>>>>>>>>>>>>>>>                 .sideOutputLateData(lateDataTag)
>>>>>>>>>>>>>>>>>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>>>>>>>>>>>>>>>>>                 .addSink(sink)
>>>>>>>>>>>>>>>>>>>>>>>>>                 // use a fixed number of output
>>>>>>>>>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>>>>>>>>>                 .setParallelism(8))
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>>>>>>>>>>>>>  * Usage: .keyBy("the", "distinct",
>>>>>>>>>>>>>>>>>>>>>>>>> "fields").reduce(new DistinctFunction())
>>>>>>>>>>>>>>>>>>>>>>>>>  */
>>>>>>>>>>>>>>>>>>>>>>>>> public class DistinctFunction implements
>>>>>>>>>>>>>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> {
>>>>>>>>>>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>>>>>>>>>>     public Map<String, String> reduce(Map<String,
>>>>>>>>>>>>>>>>>>>>>>>>> String> value1, Map<String, String> value2) {
>>>>>>>>>>>>>>>>>>>>>>>>>         return value1;
>>>>>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> 2. State configuration
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> boolean enableIncrementalCheckpointing = true;
>>>>>>>>>>>>>>>>>>>>>>>>> String statePath = "s3n://bucket/savepoints";
>>>>>>>>>>>>>>>>>>>>>>>>> new RocksDBStateBackend(statePath,
>>>>>>>>>>>>>>>>>>>>>>>>> enableIncrementalCheckpointing);
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Checkpointing Mode Exactly Once
>>>>>>>>>>>>>>>>>>>>>>>>> Interval 1m 0s
>>>>>>>>>>>>>>>>>>>>>>>>> Timeout 10m 0s
>>>>>>>>>>>>>>>>>>>>>>>>> Minimum Pause Between Checkpoints 1m 0s
>>>>>>>>>>>>>>>>>>>>>>>>> Maximum Concurrent Checkpoints 1
>>>>>>>>>>>>>>>>>>>>>>>>> Persist Checkpoints Externally Enabled (retain on
>>>>>>>>>>>>>>>>>>>>>>>>> cancellation)
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> 3. BucketingSink configuration
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> We use BucketingSink, I don't think there's
>>>>>>>>>>>>>>>>>>>>>>>>> anything special here, if not the fact that we're 
>>>>>>>>>>>>>>>>>>>>>>>>> writing to S3.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>         String outputPath = "s3://bucket/output";
>>>>>>>>>>>>>>>>>>>>>>>>>         BucketingSink<Map<String, String>> sink =
>>>>>>>>>>>>>>>>>>>>>>>>> new BucketingSink<Map<String, String>>(outputPath)
>>>>>>>>>>>>>>>>>>>>>>>>>                 .setBucketer(new
>>>>>>>>>>>>>>>>>>>>>>>>> ProcessdateBucketer())
>>>>>>>>>>>>>>>>>>>>>>>>>                 .setBatchSize(batchSize)
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>>>>>>>>>>>>>>>>>>>>>>>         sink.setWriter(new IdJsonWriter());
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> 4. Kafka & event time
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> My flink job reads the data from Kafka, using a
>>>>>>>>>>>>>>>>>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka 
>>>>>>>>>>>>>>>>>>>>>>>>> consumer to
>>>>>>>>>>>>>>>>>>>>>>>>> synchronize watermarks accross all kafka partitions. 
>>>>>>>>>>>>>>>>>>>>>>>>> We also write late
>>>>>>>>>>>>>>>>>>>>>>>>> data to side output, but nothing is written there – 
>>>>>>>>>>>>>>>>>>>>>>>>> if it would, it could
>>>>>>>>>>>>>>>>>>>>>>>>> explain missed data in the main output (I'm also sure 
>>>>>>>>>>>>>>>>>>>>>>>>> that our late data
>>>>>>>>>>>>>>>>>>>>>>>>> writing works, because we previously had some actual 
>>>>>>>>>>>>>>>>>>>>>>>>> late data which ended
>>>>>>>>>>>>>>>>>>>>>>>>> up there).
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> 5. allowedLateness
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> It may be or may not be relevant that I have also
>>>>>>>>>>>>>>>>>>>>>>>>> enabled allowedLateness with 1 minute lateness on the 
>>>>>>>>>>>>>>>>>>>>>>>>> 24-hour window:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> If that makes sense, I could try removing
>>>>>>>>>>>>>>>>>>>>>>>>> allowedLateness entirely? That would be just to rule 
>>>>>>>>>>>>>>>>>>>>>>>>> out that Flink doesn't
>>>>>>>>>>>>>>>>>>>>>>>>> have a bug that's related to restoring state in 
>>>>>>>>>>>>>>>>>>>>>>>>> combination with the
>>>>>>>>>>>>>>>>>>>>>>>>> allowedLateness feature. After all, all of our data 
>>>>>>>>>>>>>>>>>>>>>>>>> should be in a good
>>>>>>>>>>>>>>>>>>>>>>>>> enough order to not be late, given the max out of 
>>>>>>>>>>>>>>>>>>>>>>>>> orderness used on kafka
>>>>>>>>>>>>>>>>>>>>>>>>> consumer timestamp extractor.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Thank you in advance!
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>
>
> --
> *Juho Autio*
> Senior Data Engineer
>
> Data Engineering, Games
> Rovio Entertainment Corporation
> Mobile: + 358 (0)45 313 0122
> juho.au...@rovio.com
> www.rovio.com
>
> *This message and its attachments may contain confidential information and
> is intended solely for the attention and use of the named addressee(s). If
> you are not the intended recipient and / or you have received this message
> in error, please contact the sender immediately and delete all material you
> have received in this message. You are hereby notified that any use of the
> information, which you have received in error in whatsoever form, is
> strictly prohibited. Thank you for your co-operation.*
>

Reply via email to