Hi Stefan, Bravo doesn't currently support reading a reducer state. I gave it a try but couldn't get to a working implementation yet. If anyone can provide some insight on how to make this work, please share at github: https://github.com/king/bravo/pull/11
Thanks. On Tue, Oct 23, 2018 at 3:32 PM Juho Autio <juho.au...@rovio.com> wrote: > I was glad to find that bravo had now been updated to support installing > bravo to a local maven repo. > > I was able to load a checkpoint created by my job, thanks to the example > provided in bravo README, but I'm still missing the essential piece. > > My code was: > > OperatorStateReader reader = new OperatorStateReader(env2, > savepoint, "DistinctFunction"); > DontKnowWhatTypeThisIs reducingState = reader.readKeyedStates(what > should I put here?); > > I don't know how to read the values collected from reduce() calls in the > state. Is there a way to access the reducing state of the window with > bravo? I'm a bit confused how this works, because when I check with > debugger, flink internally uses a ReducingStateDescriptor > with name=window-contents, but still reading operator state for > "DistinctFunction" didn't at least throw an exception ("window-contents" > threw – obviously there's no operator by that name). > > Cheers, > Juho > > On Mon, Oct 15, 2018 at 2:25 PM Juho Autio <juho.au...@rovio.com> wrote: > >> Hi Stefan, >> >> Sorry but it doesn't seem immediately clear to me what's a good way to >> use https://github.com/king/bravo. >> >> How are people using it? Would you for example modify build.gradle >> somehow to publish the bravo as a library locally/internally? Or add code >> directly in the bravo project (locally) and run it from there (using an >> IDE, for example)? Also it doesn't seem like the bravo gradle project >> supports building a flink job jar, but if it does, how do I do it? >> >> Thanks. >> >> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio <juho.au...@rovio.com> wrote: >> >>> Good then, I'll try to analyze the savepoints with Bravo. Thanks! >>> >>> > How would you assume that backpressure would influence your updates? >>> Updates to each local state still happen event-by-event, in a single >>> reader/writing thread. >>> >>> Sure, just an ignorant guess by me. I'm not familiar with most of >>> Flink's internals. Any way high backpressure is not a seen on this job >>> after it has caught up the lag, so at I thought it would be worth >>> mentioning. >>> >>> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter < >>> s.rich...@data-artisans.com> wrote: >>> >>>> Hi, >>>> >>>> Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.au...@rovio.com>: >>>> >>>> > you could take a look at Bravo [1] to query your savepoints and to >>>> check if the state in the savepoint complete w.r.t your expectations >>>> >>>> Thanks. I'm not 100% if this is the case, but to me it seemed like the >>>> missed ids were being logged by the reducer soon after the job had started >>>> (after restoring a savepoint). But on the other hand, after that I also >>>> made another savepoint & restored that, so what I could check is: does that >>>> next savepoint have the missed ids that were logged (a couple of minutes >>>> before the savepoint was created, so there should've been more than enough >>>> time to add them to the state before the savepoint was triggered) or not. >>>> Any way, if I would be able to verify with Bravo that the ids are missing >>>> from the savepoint (even though reduced logged that it saw them), would >>>> that help in figuring out where they are lost? Is there some major >>>> difference compared to just looking at the final output after window has >>>> been triggered? >>>> >>>> >>>> >>>> I think that makes a difference. For example, you can investigate if >>>> there is a state loss or a problem with the windowing. In the savepoint you >>>> could see which keys exists and to which windows they are assigned. Also >>>> just to make sure there is no misunderstanding: only elements that are in >>>> the state at the start of a savepoint are expected to be part of the >>>> savepoint; all elements between start and completion of the savepoint are >>>> not expected to be part of the savepoint. >>>> >>>> >>>> > I also doubt that the problem is about backpressure after restore, >>>> because the job will only continue running after the state restore is >>>> already completed. >>>> >>>> Yes, I'm not suspecting that the state restoring would be the problem >>>> either. My concern was about backpressure possibly messing with the updates >>>> of reducing state? I would tend to suspect that updating the state >>>> consistently is what fails, where heavy load / backpressure might be a >>>> factor. >>>> >>>> >>>> >>>> How would you assume that backpressure would influence your updates? >>>> Updates to each local state still happen event-by-event, in a single >>>> reader/writing thread. >>>> >>>> >>>> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter < >>>> s.rich...@data-artisans.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> you could take a look at Bravo [1] to query your savepoints and to >>>>> check if the state in the savepoint complete w.r.t your expectations. I >>>>> somewhat doubt that there is a general problem with the state/savepoints >>>>> because many users are successfully running it on a large state and I am >>>>> not aware of any data loss problems, but nothing is impossible. What the >>>>> savepoint does is also straight forward: iterate a db snapshot and write >>>>> all key/value pairs to disk, so all data that was in the db at the time of >>>>> the savepoint, should show up. I also doubt that the problem is about >>>>> backpressure after restore, because the job will only continue running >>>>> after the state restore is already completed. Did you check if you are >>>>> using exactly-one-semantics or at-least-once semantics? Also did you check >>>>> that the kafka consumer start position is configured properly [2]? Are >>>>> watermarks generated as expected after restore? >>>>> >>>>> One more unrelated high-level comment that I have: for a granularity >>>>> of 24h windows, I wonder if it would not make sense to use a batch job >>>>> instead? >>>>> >>>>> Best, >>>>> Stefan >>>>> >>>>> [1] https://github.com/king/bravo >>>>> [2] >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration >>>>> >>>>> Am 04.10.2018 um 14:53 schrieb Juho Autio <juho.au...@rovio.com>: >>>>> >>>>> Thanks for the suggestions! >>>>> >>>>> > In general, it would be tremendously helpful to have a minimal >>>>> working example which allows to reproduce the problem. >>>>> >>>>> Definitely. The problem with reproducing has been that this only seems >>>>> to happen in the bigger production data volumes. >>>>> >>>>> That's why I'm hoping to find a way to debug this with the production >>>>> data. With that it seems to consistently cause some misses every time the >>>>> job is killed/restored. >>>>> >>>>> > check if it happens for shorter windows, like 1h etc >>>>> >>>>> What would be the benefit of that compared to 24h window? >>>>> >>>>> > simplify the job to not use a reduce window but simply a time window >>>>> which outputs the window events. Then counting the input and output events >>>>> should allow you to verify the results. If you are not seeing missing >>>>> events, then it could have something to do with the reducing state used in >>>>> the reduce function. >>>>> >>>>> Hm, maybe, but not sure how useful that would be, because it wouldn't >>>>> yet prove that it's related to reducing, because not having a reduce >>>>> function could also mean smaller load on the job, which might alone be >>>>> enough to make the problem not manifest. >>>>> >>>>> Is there a way to debug what goes into the reducing state (including >>>>> what gets removed or overwritten and what restored), if that makes >>>>> sense..? >>>>> Maybe some suitable logging could be used to prove that the lost data is >>>>> written to the reducing state (or at least asked to be written), but not >>>>> found any more when the window closes and state is flushed? >>>>> >>>>> On configuration once more, we're using RocksDB state backend with >>>>> asynchronous incremental checkpointing. The state is restored from >>>>> savepoints though, we haven't been using those checkpoints in these tests >>>>> (although they could be used in case of crashes – but we haven't had those >>>>> now). >>>>> >>>>> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann <trohrm...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Juho, >>>>>> >>>>>> another idea to further narrow down the problem could be to simplify >>>>>> the job to not use a reduce window but simply a time window which outputs >>>>>> the window events. Then counting the input and output events should allow >>>>>> you to verify the results. If you are not seeing missing events, then it >>>>>> could have something to do with the reducing state used in the reduce >>>>>> function. >>>>>> >>>>>> In general, it would be tremendously helpful to have a minimal >>>>>> working example which allows to reproduce the problem. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin < >>>>>> and...@data-artisans.com> wrote: >>>>>> >>>>>>> Hi Juho, >>>>>>> >>>>>>> can you try to reduce the job to minimal reproducible example and >>>>>>> share the job and input? >>>>>>> >>>>>>> For example: >>>>>>> - some simple records as input, e.g. tuples of primitive types saved >>>>>>> as cvs >>>>>>> - minimal deduplication job which processes them and misses records >>>>>>> - check if it happens for shorter windows, like 1h etc >>>>>>> - setup which you use for the job, ideally locally reproducible or >>>>>>> cloud >>>>>>> >>>>>>> Best, >>>>>>> Andrey >>>>>>> >>>>>>> On 4 Oct 2018, at 11:13, Juho Autio <juho.au...@rovio.com> wrote: >>>>>>> >>>>>>> Sorry to insist, but we seem to be blocked for any serious usage of >>>>>>> state in Flink if we can't rely on it to not miss data in case of >>>>>>> restore. >>>>>>> >>>>>>> Would anyone have suggestions for how to troubleshoot this? So far I >>>>>>> have verified with DEBUG logs that our reduce function gets to process >>>>>>> also >>>>>>> the data that is missing from window output. >>>>>>> >>>>>>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <juho.au...@rovio.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Andrey, >>>>>>>> >>>>>>>> To rule out for good any questions about sink behaviour, the job >>>>>>>> was killed and started with an additional Kafka sink. >>>>>>>> >>>>>>>> The same number of ids were missed in both outputs: KafkaSink & >>>>>>>> BucketingSink. >>>>>>>> >>>>>>>> I wonder what would be the next steps in debugging? >>>>>>>> >>>>>>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <juho.au...@rovio.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks, Andrey. >>>>>>>>> >>>>>>>>> > so it means that the savepoint does not loose at least some >>>>>>>>> dropped records. >>>>>>>>> >>>>>>>>> I'm not sure what you mean by that? I mean, it was known from the >>>>>>>>> beginning, that not everything is lost before/after restoring a >>>>>>>>> savepoint, >>>>>>>>> just some records around the time of restoration. It's not 100% clear >>>>>>>>> whether records are lost before making a savepoint or after restoring >>>>>>>>> it. >>>>>>>>> Although, based on the new DEBUG logs it seems more like losing some >>>>>>>>> records that are seen ~soon after restoring. It seems like Flink >>>>>>>>> would be >>>>>>>>> somehow confused either about the restored state vs. new inserts to >>>>>>>>> state. >>>>>>>>> This could also be somehow linked to the high back pressure on the >>>>>>>>> kafka >>>>>>>>> source while the stream is catching up. >>>>>>>>> >>>>>>>>> > If it is feasible for your setup, I suggest to insert one more >>>>>>>>> map function after reduce and before sink. >>>>>>>>> > etc. >>>>>>>>> >>>>>>>>> Isn't that the same thing that we discussed before? Nothing is >>>>>>>>> sent to BucketingSink before the window closes, so I don't see how it >>>>>>>>> would >>>>>>>>> make any difference if we replace the BucketingSink with a map >>>>>>>>> function or >>>>>>>>> another sink type. We don't create or restore savepoints during the >>>>>>>>> time >>>>>>>>> when BucketingSink gets input or has open buckets – that happens at a >>>>>>>>> much >>>>>>>>> later time of day. I would focus on figuring out why the records are >>>>>>>>> lost >>>>>>>>> while the window is open. But I don't know how to do that. Would you >>>>>>>>> have >>>>>>>>> any additional suggestions? >>>>>>>>> >>>>>>>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin < >>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Juho, >>>>>>>>>> >>>>>>>>>> so it means that the savepoint does not loose at least some >>>>>>>>>> dropped records. >>>>>>>>>> >>>>>>>>>> If it is feasible for your setup, I suggest to insert one more >>>>>>>>>> map function after reduce and before sink. >>>>>>>>>> The map function should be called right after window is triggered >>>>>>>>>> but before flushing to s3. >>>>>>>>>> The result of reduce (deduped record) could be logged there. >>>>>>>>>> This should allow to check whether the processed distinct records >>>>>>>>>> were buffered in the state after the restoration from the savepoint >>>>>>>>>> or not. >>>>>>>>>> If they were buffered we should see that there was an attempt to >>>>>>>>>> write them >>>>>>>>>> to the sink from the state. >>>>>>>>>> >>>>>>>>>> Another suggestion is to try to write records to some other sink >>>>>>>>>> or to both. >>>>>>>>>> E.g. if you can access file system of workers, maybe just into >>>>>>>>>> local files and check whether the records are also dropped there. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Andrey >>>>>>>>>> >>>>>>>>>> On 20 Sep 2018, at 15:37, Juho Autio <juho.au...@rovio.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi Andrey! >>>>>>>>>> >>>>>>>>>> I was finally able to gather the DEBUG logs that you suggested. >>>>>>>>>> In short, the reducer logged that it processed at least some of the >>>>>>>>>> ids >>>>>>>>>> that were missing from the output. >>>>>>>>>> >>>>>>>>>> "At least some", because I didn't have the job running with DEBUG >>>>>>>>>> logs for the full 24-hour window period. So I was only able to look >>>>>>>>>> up if I >>>>>>>>>> can find *some* of the missing ids in the DEBUG logs. Which I >>>>>>>>>> did indeed. >>>>>>>>>> >>>>>>>>>> I changed the DistinctFunction.java to do this: >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public Map<String, String> reduce(Map<String, String> value1, >>>>>>>>>> Map<String, String> value2) { >>>>>>>>>> LOG.debug("DistinctFunction.reduce returns: {}={}", >>>>>>>>>> value1.get("field"), value1.get("id")); >>>>>>>>>> return value1; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> Then: >>>>>>>>>> >>>>>>>>>> vi flink-1.6.0/conf/log4j.properties >>>>>>>>>> >>>>>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG >>>>>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG >>>>>>>>>> >>>>>>>>>> Then I ran the following kind of test: >>>>>>>>>> >>>>>>>>>> - Cancelled the on-going job with savepoint created at ~Sep 18 >>>>>>>>>> 08:35 UTC 2018 >>>>>>>>>> - Started a new cluster & job with DEBUG enabled at ~09:13, >>>>>>>>>> restored from that previous cluster's savepoint >>>>>>>>>> - Ran until caught up offsets >>>>>>>>>> - Cancelled the job with a new savepoint >>>>>>>>>> - Started a new job _without_ DEBUG, which restored the new >>>>>>>>>> savepoint, let it keep running so that it will eventually write the >>>>>>>>>> output >>>>>>>>>> >>>>>>>>>> Then on the next day, after results had been flushed when the >>>>>>>>>> 24-hour window closed, I compared the results again with a batch >>>>>>>>>> version's >>>>>>>>>> output. And found some missing ids as usual. >>>>>>>>>> >>>>>>>>>> I drilled down to one specific missing id (I'm replacing the >>>>>>>>>> actual value with AN12345 below), which was not found in the stream >>>>>>>>>> output, >>>>>>>>>> but was found in batch output & flink DEBUG logs. >>>>>>>>>> >>>>>>>>>> Related to that id, I gathered the following information: >>>>>>>>>> >>>>>>>>>> 2018-09-18~09:13:21,000 job started & savepoint is restored >>>>>>>>>> >>>>>>>>>> 2018-09-18 09:14:29,085 missing id is processed for the first >>>>>>>>>> time, proved by this log line: >>>>>>>>>> 2018-09-18 09:14:29,085 DEBUG >>>>>>>>>> com.rovio.ds.flink.uniqueid.DistinctFunction - >>>>>>>>>> DistinctFunction.reduce returns: s.aid1=AN12345 >>>>>>>>>> >>>>>>>>>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint >>>>>>>>>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint >>>>>>>>>> >>>>>>>>>> ( >>>>>>>>>> more occurrences of checkpoints (~1 min checkpointing time + ~1 >>>>>>>>>> min delay before next) >>>>>>>>>> / >>>>>>>>>> more occurrences of DistinctFunction.reduce >>>>>>>>>> ) >>>>>>>>>> >>>>>>>>>> 2018-09-18 09:23:45,053 missing id is processed for the last time >>>>>>>>>> >>>>>>>>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled >>>>>>>>>> >>>>>>>>>> To be noted, there was high backpressure after restoring from >>>>>>>>>> savepoint until the stream caught up with the kafka offsets. >>>>>>>>>> Although, our >>>>>>>>>> job uses assign timestamps & watermarks on the flink kafka consumer >>>>>>>>>> itself, >>>>>>>>>> so event time of all partitions is synchronized. As expected, we >>>>>>>>>> don't get >>>>>>>>>> any late data in the late data side output. >>>>>>>>>> >>>>>>>>>> From this we can see that the missing ids are processed by the >>>>>>>>>> reducer, but they must get lost somewhere before the 24-hour window >>>>>>>>>> is >>>>>>>>>> triggered. >>>>>>>>>> >>>>>>>>>> I think it's worth mentioning once more that the stream doesn't >>>>>>>>>> miss any ids if we let it's running without interruptions / state >>>>>>>>>> restoring. >>>>>>>>>> >>>>>>>>>> What's next? >>>>>>>>>> >>>>>>>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin < >>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Juho, >>>>>>>>>>> >>>>>>>>>>> > only when the 24-hour window triggers, BucketingSink gets a >>>>>>>>>>> burst of input >>>>>>>>>>> >>>>>>>>>>> This is of course totally true, my understanding is the same. We >>>>>>>>>>> cannot exclude problem there for sure, just savepoints are used a >>>>>>>>>>> lot w/o >>>>>>>>>>> problem reports and BucketingSink is known to be problematic with >>>>>>>>>>> s3. That >>>>>>>>>>> is why, I asked you: >>>>>>>>>>> >>>>>>>>>>> > You also wrote that the timestamps of lost event are >>>>>>>>>>> 'probably' around the time of the savepoint, if it is not yet for >>>>>>>>>>> sure I >>>>>>>>>>> would also check it. >>>>>>>>>>> >>>>>>>>>>> Although, bucketing sink might loose any data at the end of the >>>>>>>>>>> day (also from the middle). The fact, that it is always around the >>>>>>>>>>> time of >>>>>>>>>>> taking a savepoint and not random, is surely suspicious and possible >>>>>>>>>>> savepoint failures need to be investigated. >>>>>>>>>>> >>>>>>>>>>> Regarding the s3 problem, s3 doc says: >>>>>>>>>>> >>>>>>>>>>> > The caveat is that if you make a HEAD or GET request to the >>>>>>>>>>> key name (to find if the object exists) before creating the object, >>>>>>>>>>> Amazon >>>>>>>>>>> S3 provides 'eventual consistency' for read-after-write. >>>>>>>>>>> >>>>>>>>>>> The algorithm you suggest is how it is roughly implemented now >>>>>>>>>>> (BucketingSink.openNewPartFile). My understanding is that >>>>>>>>>>> 'eventual consistency’ means that even if you just created file >>>>>>>>>>> (its name >>>>>>>>>>> is key) it can be that you do not get it in the list or exists >>>>>>>>>>> (HEAD) >>>>>>>>>>> returns false and you risk to rewrite the previous part. >>>>>>>>>>> >>>>>>>>>>> The BucketingSink was designed for a standard file system. s3 is >>>>>>>>>>> used over a file system wrapper atm but does not always provide >>>>>>>>>>> normal file >>>>>>>>>>> system guarantees. See also last example in [1]. >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Andrey >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82 >>>>>>>>>>> >>>>>>>>>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Andrey, thank you very much for the debugging suggestions, I'll >>>>>>>>>>> try them. >>>>>>>>>>> >>>>>>>>>>> In the meanwhile two more questions, please: >>>>>>>>>>> >>>>>>>>>>> > Just to keep in mind this problem with s3 and exclude it for >>>>>>>>>>> sure. I would also check whether the size of missing events is >>>>>>>>>>> around the >>>>>>>>>>> batch size of BucketingSink or not. >>>>>>>>>>> >>>>>>>>>>> Fair enough, but I also want to focus on debugging the most >>>>>>>>>>> probable subject first. So what do you think about this – true or >>>>>>>>>>> false: >>>>>>>>>>> only when the 24-hour window triggers, BucketinSink gets a burst of >>>>>>>>>>> input. >>>>>>>>>>> Around the state restoring point (middle of the day) it doesn't get >>>>>>>>>>> any >>>>>>>>>>> input, so it can't lose anything either. Isn't this true, or have I >>>>>>>>>>> totally >>>>>>>>>>> missed how Flink works in triggering window results? I would not >>>>>>>>>>> expect >>>>>>>>>>> there to be any optimization that speculatively triggers early >>>>>>>>>>> results of a >>>>>>>>>>> regular time window to the downstream operators. >>>>>>>>>>> >>>>>>>>>>> > The old BucketingSink has in general problem with s3. >>>>>>>>>>> Internally BucketingSink queries s3 as a file system to list already >>>>>>>>>>> written file parts (batches) and determine index of the next part >>>>>>>>>>> to start. >>>>>>>>>>> Due to eventual consistency of checking file existence in s3 [1], >>>>>>>>>>> the >>>>>>>>>>> BucketingSink can rewrite the previously written part and basically >>>>>>>>>>> loose >>>>>>>>>>> it. >>>>>>>>>>> >>>>>>>>>>> I was wondering, what does S3's "read-after-write consistency" >>>>>>>>>>> (mentioned on the page you linked) actually mean. It seems that >>>>>>>>>>> this might >>>>>>>>>>> be possible: >>>>>>>>>>> - LIST keys, find current max index >>>>>>>>>>> - choose next index = max + 1 >>>>>>>>>>> - HEAD next index: if it exists, keep adding + 1 until key >>>>>>>>>>> doesn't exist on S3 >>>>>>>>>>> >>>>>>>>>>> But definitely sounds easier if a sink keeps track of files in a >>>>>>>>>>> way that's guaranteed to be consistent. >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Juho >>>>>>>>>>> >>>>>>>>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin < >>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> true, StreamingFileSink does not support s3 in 1.6.0, it is >>>>>>>>>>>> planned for the next 1.7 release, sorry for confusion. >>>>>>>>>>>> The old BucketingSink has in general problem with s3. >>>>>>>>>>>> Internally BucketingSink queries s3 as a file system >>>>>>>>>>>> to list already written file parts (batches) and determine >>>>>>>>>>>> index of the next part to start. Due to eventual consistency of >>>>>>>>>>>> checking >>>>>>>>>>>> file existence in s3 [1], the BucketingSink can rewrite the >>>>>>>>>>>> previously >>>>>>>>>>>> written part and basically loose it. It should be fixed for >>>>>>>>>>>> StreamingFileSink in 1.7 where Flink keeps its own track of >>>>>>>>>>>> written parts >>>>>>>>>>>> and does not rely on s3 as a file system. >>>>>>>>>>>> I also include Kostas, he might add more details. >>>>>>>>>>>> >>>>>>>>>>>> Just to keep in mind this problem with s3 and exclude it for >>>>>>>>>>>> sure I would also check whether the size of missing events is >>>>>>>>>>>> around the >>>>>>>>>>>> batch size of BucketingSink or not. You also wrote that the >>>>>>>>>>>> timestamps of >>>>>>>>>>>> lost event are 'probably' around the time of the savepoint, if it >>>>>>>>>>>> is not >>>>>>>>>>>> yet for sure I would also check it. >>>>>>>>>>>> >>>>>>>>>>>> Have you already checked the log files of job manager and task >>>>>>>>>>>> managers for the job running before and after the restore from the >>>>>>>>>>>> check >>>>>>>>>>>> point? Is everything successful there, no errors, relevant >>>>>>>>>>>> warnings or >>>>>>>>>>>> exceptions? >>>>>>>>>>>> >>>>>>>>>>>> As the next step, I would suggest to log all encountered events >>>>>>>>>>>> in DistinctFunction.reduce if possible for production data and >>>>>>>>>>>> check >>>>>>>>>>>> whether the missed events are eventually processed before or after >>>>>>>>>>>> the >>>>>>>>>>>> savepoint. The following log message indicates a border between >>>>>>>>>>>> the events >>>>>>>>>>>> that should be included into the savepoint (logged before) or not: >>>>>>>>>>>> “{} ({}, synchronous part) in thread {} took {} ms” (template) >>>>>>>>>>>> Also check if the savepoint has been overall completed: >>>>>>>>>>>> "{} ({}, asynchronous part) in thread {} took {} ms." >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Andrey >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html >>>>>>>>>>>> >>>>>>>>>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> Using StreamingFileSink is not a convenient option for >>>>>>>>>>>> production use for us as it doesn't support s3*. I could use >>>>>>>>>>>> StreamingFileSink just to verify, but I don't see much point in >>>>>>>>>>>> doing so. >>>>>>>>>>>> Please consider my previous comment: >>>>>>>>>>>> >>>>>>>>>>>> > I realized that BucketingSink must not play any role in this >>>>>>>>>>>> problem. This is because only when the 24-hour window triggers, >>>>>>>>>>>> BucketingSink gets a burst of input. Around the state restoring >>>>>>>>>>>> point >>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose >>>>>>>>>>>> anything >>>>>>>>>>>> either (right?). >>>>>>>>>>>> >>>>>>>>>>>> I could also use a kafka sink instead, but I can't imagine how >>>>>>>>>>>> there could be any difference. It's very real that the sink >>>>>>>>>>>> doesn't get any >>>>>>>>>>>> input for a long time until the 24-hour window closes, and then it >>>>>>>>>>>> quickly >>>>>>>>>>>> writes out everything because it's not that much data eventually >>>>>>>>>>>> for the >>>>>>>>>>>> distinct values. >>>>>>>>>>>> >>>>>>>>>>>> Any ideas for debugging what's happening around the savepoint & >>>>>>>>>>>> restoration time? >>>>>>>>>>>> >>>>>>>>>>>> *) I actually implemented StreamingFileSink as an alternative >>>>>>>>>>>> sink. This was before I came to realize that most likely the sink >>>>>>>>>>>> component >>>>>>>>>>>> has nothing to do with the data loss problem. I tried it with >>>>>>>>>>>> s3n:// path >>>>>>>>>>>> just to see an exception being thrown. In the source code I indeed >>>>>>>>>>>> then >>>>>>>>>>>> found an explicit check for the target path scheme to be "hdfs://". >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin < >>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Ok, I think before further debugging the window reduced state, >>>>>>>>>>>>> could you try the new ‘StreamingFileSink’ [1] introduced in >>>>>>>>>>>>> Flink 1.6.0 instead of the previous 'BucketingSink’? >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers, >>>>>>>>>>>>> Andrey >>>>>>>>>>>>> >>>>>>>>>>>>> [1] >>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html >>>>>>>>>>>>> >>>>>>>>>>>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Yes, sorry for my confusing comment. I just meant that it >>>>>>>>>>>>> seems like there's a bug somewhere now that the output is missing >>>>>>>>>>>>> some data. >>>>>>>>>>>>> >>>>>>>>>>>>> > I would wait and check the actual output in s3 because it is >>>>>>>>>>>>> the main result of the job >>>>>>>>>>>>> >>>>>>>>>>>>> Yes, and that's what I have already done. There seems to be >>>>>>>>>>>>> always some data loss with the production data volumes, if the >>>>>>>>>>>>> job has been >>>>>>>>>>>>> restarted on that day. >>>>>>>>>>>>> >>>>>>>>>>>>> Would you have any suggestions for how to debug this further? >>>>>>>>>>>>> >>>>>>>>>>>>> Many thanks for stepping in. >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin < >>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>> >>>>>>>>>>>>>> So it is a per key deduplication job. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Yes, I would wait and check the actual output in s3 because >>>>>>>>>>>>>> it is the main result of the job and >>>>>>>>>>>>>> >>>>>>>>>>>>>> > The late data around the time of taking savepoint might be >>>>>>>>>>>>>> not included into the savepoint but it should be behind the >>>>>>>>>>>>>> snapshotted >>>>>>>>>>>>>> offset in Kafka. >>>>>>>>>>>>>> >>>>>>>>>>>>>> is not a bug, it is a possible behaviour. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The savepoint is a snapshot of the data in transient which is >>>>>>>>>>>>>> already consumed from Kafka. >>>>>>>>>>>>>> Basically the full contents of the window result is split >>>>>>>>>>>>>> between the savepoint and what can come after the savepoint'ed >>>>>>>>>>>>>> offset in >>>>>>>>>>>>>> Kafka but before the window result is written into s3. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Allowed lateness should not affect it, I am just saying that >>>>>>>>>>>>>> the final result in s3 should include all records after it. >>>>>>>>>>>>>> This is what should be guaranteed but not the contents of the >>>>>>>>>>>>>> intermediate savepoint. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for your answer! >>>>>>>>>>>>>> >>>>>>>>>>>>>> I check for the missed data from the final output on s3. So I >>>>>>>>>>>>>> wait until the next day, then run the same thing re-implemented >>>>>>>>>>>>>> in batch, >>>>>>>>>>>>>> and compare the output. >>>>>>>>>>>>>> >>>>>>>>>>>>>> > The late data around the time of taking savepoint might be >>>>>>>>>>>>>> not included into the savepoint but it should be behind the >>>>>>>>>>>>>> snapshotted >>>>>>>>>>>>>> offset in Kafka. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Yes, I would definitely expect that. It seems like there's a >>>>>>>>>>>>>> bug somewhere. >>>>>>>>>>>>>> >>>>>>>>>>>>>> > Then it should just come later after the restore and should >>>>>>>>>>>>>> be reduced within the allowed lateness into the final result >>>>>>>>>>>>>> which is saved >>>>>>>>>>>>>> into s3. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Well, as far as I know, allowed lateness doesn't play any >>>>>>>>>>>>>> role here, because I started running the job with >>>>>>>>>>>>>> allowedLateness=0, and >>>>>>>>>>>>>> still get the data loss, while my late data output doesn't >>>>>>>>>>>>>> receive anything. >>>>>>>>>>>>>> >>>>>>>>>>>>>> > Also, is this `DistinctFunction.reduce` just an example or >>>>>>>>>>>>>> the actual implementation, basically saving just one of records >>>>>>>>>>>>>> inside the >>>>>>>>>>>>>> 24h window in s3? then what is missing there? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Yes, it's the actual implementation. Note that there's a >>>>>>>>>>>>>> keyBy before the DistinctFunction. So there's one record for >>>>>>>>>>>>>> each key >>>>>>>>>>>>>> (which is the combination of a couple of fields). In practice >>>>>>>>>>>>>> I've seen >>>>>>>>>>>>>> that we're missing ~2000-4000 elements on each restore, and the >>>>>>>>>>>>>> total >>>>>>>>>>>>>> output is obviously much more than that. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Here's the full code for the key selector: >>>>>>>>>>>>>> >>>>>>>>>>>>>> public class MapKeySelector implements >>>>>>>>>>>>>> KeySelector<Map<String,String>, Object> { >>>>>>>>>>>>>> >>>>>>>>>>>>>> private final String[] fields; >>>>>>>>>>>>>> >>>>>>>>>>>>>> public MapKeySelector(String... fields) { >>>>>>>>>>>>>> this.fields = fields; >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> @Override >>>>>>>>>>>>>> public Object getKey(Map<String, String> event) throws >>>>>>>>>>>>>> Exception { >>>>>>>>>>>>>> Tuple key = >>>>>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance(); >>>>>>>>>>>>>> for (int i = 0; i < fields.length; i++) { >>>>>>>>>>>>>> key.setField(event.getOrDefault(fields[i], ""), >>>>>>>>>>>>>> i); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> return key; >>>>>>>>>>>>>> } >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> And a more exact example on how it's used: >>>>>>>>>>>>>> >>>>>>>>>>>>>> .keyBy(new MapKeySelector("ID", "PLAYER_ID", >>>>>>>>>>>>>> "FIELD", "KEY_NAME", "KEY_VALUE")) >>>>>>>>>>>>>> .timeWindow(Time.days(1)) >>>>>>>>>>>>>> .reduce(new DistinctFunction()) >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin < >>>>>>>>>>>>>> and...@data-artisans.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Where exactly does the data miss? When do you notice that? >>>>>>>>>>>>>>> Do you check it: >>>>>>>>>>>>>>> - debugging `DistinctFunction.reduce` right after resume in >>>>>>>>>>>>>>> the middle of the day >>>>>>>>>>>>>>> or >>>>>>>>>>>>>>> - some distinct records miss in the final output >>>>>>>>>>>>>>> of BucketingSink in s3 after window result is actually >>>>>>>>>>>>>>> triggered and saved >>>>>>>>>>>>>>> into s3 at the end of the day? is this the main output? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The late data around the time of taking savepoint might be >>>>>>>>>>>>>>> not included into the savepoint but it should be behind the >>>>>>>>>>>>>>> snapshotted >>>>>>>>>>>>>>> offset in Kafka. Then it should just come later after the >>>>>>>>>>>>>>> restore and >>>>>>>>>>>>>>> should be reduced within the allowed lateness into the final >>>>>>>>>>>>>>> result which >>>>>>>>>>>>>>> is saved into s3. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Also, is this `DistinctFunction.reduce` just an example or >>>>>>>>>>>>>>> the actual implementation, basically saving just one of records >>>>>>>>>>>>>>> inside the >>>>>>>>>>>>>>> 24h window in s3? then what is missing there? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I changed to allowedLateness=0, no change, still missing >>>>>>>>>>>>>>> data when restoring from savepoint. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio < >>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I realized that BucketingSink must not play any role in >>>>>>>>>>>>>>>> this problem. This is because only when the 24-hour window >>>>>>>>>>>>>>>> triggers, >>>>>>>>>>>>>>>> BucketinSink gets a burst of input. Around the state restoring >>>>>>>>>>>>>>>> point >>>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose >>>>>>>>>>>>>>>> anything >>>>>>>>>>>>>>>> either (right?). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I will next try removing the allowedLateness entirely from >>>>>>>>>>>>>>>> the equation. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> In the meanwhile, please let me know if you have any >>>>>>>>>>>>>>>> suggestions for debugging the lost data, for example what logs >>>>>>>>>>>>>>>> to enable. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known >>>>>>>>>>>>>>>> issues with that, that could contribute to lost data when >>>>>>>>>>>>>>>> restoring a >>>>>>>>>>>>>>>> savepoint? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio < >>>>>>>>>>>>>>>> juho.au...@rovio.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Some data is silently lost on my Flink stream job when >>>>>>>>>>>>>>>>> state is restored from a savepoint. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Do you have any debugging hints to find out where exactly >>>>>>>>>>>>>>>>> the data gets dropped? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> My job gathers distinct values using a 24-hour window. It >>>>>>>>>>>>>>>>> doesn't have any custom state management. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> When I cancel the job with savepoint and restore from that >>>>>>>>>>>>>>>>> savepoint, some data is missed. It seems to be losing just a >>>>>>>>>>>>>>>>> small amount >>>>>>>>>>>>>>>>> of data. The event time of lost data is probably around the >>>>>>>>>>>>>>>>> time of >>>>>>>>>>>>>>>>> savepoint. In other words the rest of the time window is not >>>>>>>>>>>>>>>>> entirely >>>>>>>>>>>>>>>>> missed – collection works correctly also for (most of the) >>>>>>>>>>>>>>>>> events that come >>>>>>>>>>>>>>>>> in after restoring. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> When the job processes a full 24-hour window without >>>>>>>>>>>>>>>>> interruptions it doesn't miss anything. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Usually the problem doesn't happen in test environments >>>>>>>>>>>>>>>>> that have smaller parallelism and smaller data volumes. But >>>>>>>>>>>>>>>>> in production >>>>>>>>>>>>>>>>> volumes the job seems to be consistently missing at least >>>>>>>>>>>>>>>>> something on >>>>>>>>>>>>>>>>> every restore. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This issue has consistently happened since the job was >>>>>>>>>>>>>>>>> initially created. It was at first run on an older version of >>>>>>>>>>>>>>>>> Flink >>>>>>>>>>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I'm wondering if this could be for example some >>>>>>>>>>>>>>>>> synchronization issue between the kafka consumer offsets vs. >>>>>>>>>>>>>>>>> what's been >>>>>>>>>>>>>>>>> written by BucketingSink? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1. Job content, simplified >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> kafkaStream >>>>>>>>>>>>>>>>> .flatMap(new ExtractFieldsFunction()) >>>>>>>>>>>>>>>>> .keyBy(new MapKeySelector(1, 2, 3, 4)) >>>>>>>>>>>>>>>>> .timeWindow(Time.days(1)) >>>>>>>>>>>>>>>>> .allowedLateness(allowedLateness) >>>>>>>>>>>>>>>>> .sideOutputLateData(lateDataTag) >>>>>>>>>>>>>>>>> .reduce(new DistinctFunction()) >>>>>>>>>>>>>>>>> .addSink(sink) >>>>>>>>>>>>>>>>> // use a fixed number of output partitions >>>>>>>>>>>>>>>>> .setParallelism(8)) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> /** >>>>>>>>>>>>>>>>> * Usage: .keyBy("the", "distinct", "fields").reduce(new >>>>>>>>>>>>>>>>> DistinctFunction()) >>>>>>>>>>>>>>>>> */ >>>>>>>>>>>>>>>>> public class DistinctFunction implements >>>>>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> { >>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>> public Map<String, String> reduce(Map<String, String> >>>>>>>>>>>>>>>>> value1, Map<String, String> value2) { >>>>>>>>>>>>>>>>> return value1; >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2. State configuration >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> boolean enableIncrementalCheckpointing = true; >>>>>>>>>>>>>>>>> String statePath = "s3n://bucket/savepoints"; >>>>>>>>>>>>>>>>> new RocksDBStateBackend(statePath, >>>>>>>>>>>>>>>>> enableIncrementalCheckpointing); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Checkpointing Mode Exactly Once >>>>>>>>>>>>>>>>> Interval 1m 0s >>>>>>>>>>>>>>>>> Timeout 10m 0s >>>>>>>>>>>>>>>>> Minimum Pause Between Checkpoints 1m 0s >>>>>>>>>>>>>>>>> Maximum Concurrent Checkpoints 1 >>>>>>>>>>>>>>>>> Persist Checkpoints Externally Enabled (retain on >>>>>>>>>>>>>>>>> cancellation) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 3. BucketingSink configuration >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> We use BucketingSink, I don't think there's anything >>>>>>>>>>>>>>>>> special here, if not the fact that we're writing to S3. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> String outputPath = "s3://bucket/output"; >>>>>>>>>>>>>>>>> BucketingSink<Map<String, String>> sink = new >>>>>>>>>>>>>>>>> BucketingSink<Map<String, String>>(outputPath) >>>>>>>>>>>>>>>>> .setBucketer(new ProcessdateBucketer()) >>>>>>>>>>>>>>>>> .setBatchSize(batchSize) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval); >>>>>>>>>>>>>>>>> sink.setWriter(new IdJsonWriter()); >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 4. Kafka & event time >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> My flink job reads the data from Kafka, using a >>>>>>>>>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> synchronize watermarks accross all kafka partitions. We also >>>>>>>>>>>>>>>>> write late >>>>>>>>>>>>>>>>> data to side output, but nothing is written there – if it >>>>>>>>>>>>>>>>> would, it could >>>>>>>>>>>>>>>>> explain missed data in the main output (I'm also sure that >>>>>>>>>>>>>>>>> our late data >>>>>>>>>>>>>>>>> writing works, because we previously had some actual late >>>>>>>>>>>>>>>>> data which ended >>>>>>>>>>>>>>>>> up there). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 5. allowedLateness >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> It may be or may not be relevant that I have also enabled >>>>>>>>>>>>>>>>> allowedLateness with 1 minute lateness on the 24-hour window: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> If that makes sense, I could try removing allowedLateness >>>>>>>>>>>>>>>>> entirely? That would be just to rule out that Flink doesn't >>>>>>>>>>>>>>>>> have a bug >>>>>>>>>>>>>>>>> that's related to restoring state in combination with the >>>>>>>>>>>>>>>>> allowedLateness >>>>>>>>>>>>>>>>> feature. After all, all of our data should be in a good >>>>>>>>>>>>>>>>> enough order to not >>>>>>>>>>>>>>>>> be late, given the max out of orderness used on kafka >>>>>>>>>>>>>>>>> consumer timestamp >>>>>>>>>>>>>>>>> extractor. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thank you in advance! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>> >>>>> >>>> >>>> >>> >> >