Andrey, thank you very much for the debugging suggestions, I'll try them. In the meanwhile two more questions, please:
> Just to keep in mind this problem with s3 and exclude it for sure. I would also check whether the size of missing events is around the batch size of BucketingSink or not. Fair enough, but I also want to focus on debugging the most probable subject first. So what do you think about this – true or false: only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either. Isn't this true, or have I totally missed how Flink works in triggering window results? I would not expect there to be any optimization that speculatively triggers early results of a regular time window to the downstream operators. > The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it. I was wondering, what does S3's "read-after-write consistency" (mentioned on the page you linked) actually mean. It seems that this might be possible: - LIST keys, find current max index - choose next index = max + 1 - HEAD next index: if it exists, keep adding + 1 until key doesn't exist on S3 But definitely sounds easier if a sink keeps track of files in a way that's guaranteed to be consistent. Cheers, Juho On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <and...@data-artisans.com> wrote: > Hi, > > true, StreamingFileSink does not support s3 in 1.6.0, it is planned for > the next 1.7 release, sorry for confusion. > The old BucketingSink has in general problem with s3. > Internally BucketingSink queries s3 as a file system > to list already written file parts (batches) and determine index of the > next part to start. Due to eventual consistency of checking file existence > in s3 [1], the BucketingSink can rewrite the previously written part and > basically loose it. It should be fixed for StreamingFileSink in 1.7 where > Flink keeps its own track of written parts and does not rely on s3 as a > file system. > I also include Kostas, he might add more details. > > Just to keep in mind this problem with s3 and exclude it for sure I would > also check whether the size of missing events is around the batch size of > BucketingSink or not. You also wrote that the timestamps of lost event are > 'probably' around the time of the savepoint, if it is not yet for sure I > would also check it. > > Have you already checked the log files of job manager and task managers > for the job running before and after the restore from the check point? Is > everything successful there, no errors, relevant warnings or exceptions? > > As the next step, I would suggest to log all encountered events in > DistinctFunction.reduce if possible for production data and check whether > the missed events are eventually processed before or after the savepoint. > The following log message indicates a border between the events that should > be included into the savepoint (logged before) or not: > “{} ({}, synchronous part) in thread {} took {} ms” (template) > Also check if the savepoint has been overall completed: > "{} ({}, asynchronous part) in thread {} took {} ms." > > Best, > Andrey > > [1] https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html > > On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com> wrote: > > Hi, > > Using StreamingFileSink is not a convenient option for production use for > us as it doesn't support s3*. I could use StreamingFileSink just to verify, > but I don't see much point in doing so. Please consider my previous comment: > > > I realized that BucketingSink must not play any role in this problem. > This is because only when the 24-hour window triggers, BucketingSink gets a > burst of input. Around the state restoring point (middle of the day) it > doesn't get any input, so it can't lose anything either (right?). > > I could also use a kafka sink instead, but I can't imagine how there could > be any difference. It's very real that the sink doesn't get any input for a > long time until the 24-hour window closes, and then it quickly writes out > everything because it's not that much data eventually for the distinct > values. > > Any ideas for debugging what's happening around the savepoint & > restoration time? > > *) I actually implemented StreamingFileSink as an alternative sink. This > was before I came to realize that most likely the sink component has > nothing to do with the data loss problem. I tried it with s3n:// path just > to see an exception being thrown. In the source code I indeed then found an > explicit check for the target path scheme to be "hdfs://". > > On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <and...@data-artisans.com> > wrote: > >> Ok, I think before further debugging the window reduced state, >> could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 >> instead of the previous 'BucketingSink’? >> >> Cheers, >> Andrey >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html >> >> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com> wrote: >> >> Yes, sorry for my confusing comment. I just meant that it seems like >> there's a bug somewhere now that the output is missing some data. >> >> > I would wait and check the actual output in s3 because it is the main >> result of the job >> >> Yes, and that's what I have already done. There seems to be always some >> data loss with the production data volumes, if the job has been restarted >> on that day. >> >> Would you have any suggestions for how to debug this further? >> >> Many thanks for stepping in. >> >> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <and...@data-artisans.com> >> wrote: >> >>> Hi Juho, >>> >>> So it is a per key deduplication job. >>> >>> Yes, I would wait and check the actual output in s3 because it is the >>> main result of the job and >>> >>> > The late data around the time of taking savepoint might be not >>> included into the savepoint but it should be behind the snapshotted offset >>> in Kafka. >>> >>> is not a bug, it is a possible behaviour. >>> >>> The savepoint is a snapshot of the data in transient which is already >>> consumed from Kafka. >>> Basically the full contents of the window result is split between the >>> savepoint and what can come after the savepoint'ed offset in Kafka but >>> before the window result is written into s3. >>> >>> Allowed lateness should not affect it, I am just saying that the final >>> result in s3 should include all records after it. >>> This is what should be guaranteed but not the contents of the >>> intermediate savepoint. >>> >>> Cheers, >>> Andrey >>> >>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com> wrote: >>> >>> Thanks for your answer! >>> >>> I check for the missed data from the final output on s3. So I wait until >>> the next day, then run the same thing re-implemented in batch, and compare >>> the output. >>> >>> > The late data around the time of taking savepoint might be not >>> included into the savepoint but it should be behind the snapshotted offset >>> in Kafka. >>> >>> Yes, I would definitely expect that. It seems like there's a bug >>> somewhere. >>> >>> > Then it should just come later after the restore and should be reduced >>> within the allowed lateness into the final result which is saved into s3. >>> >>> Well, as far as I know, allowed lateness doesn't play any role here, >>> because I started running the job with allowedLateness=0, and still get the >>> data loss, while my late data output doesn't receive anything. >>> >>> > Also, is this `DistinctFunction.reduce` just an example or the actual >>> implementation, basically saving just one of records inside the 24h window >>> in s3? then what is missing there? >>> >>> Yes, it's the actual implementation. Note that there's a keyBy before >>> the DistinctFunction. So there's one record for each key (which is the >>> combination of a couple of fields). In practice I've seen that we're >>> missing ~2000-4000 elements on each restore, and the total output is >>> obviously much more than that. >>> >>> Here's the full code for the key selector: >>> >>> public class MapKeySelector implements KeySelector<Map<String,String>, >>> Object> { >>> >>> private final String[] fields; >>> >>> public MapKeySelector(String... fields) { >>> this.fields = fields; >>> } >>> >>> @Override >>> public Object getKey(Map<String, String> event) throws Exception { >>> Tuple key = Tuple.getTupleClass(fields.length).newInstance(); >>> for (int i = 0; i < fields.length; i++) { >>> key.setField(event.getOrDefault(fields[i], ""), i); >>> } >>> return key; >>> } >>> } >>> >>> And a more exact example on how it's used: >>> >>> .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", >>> "KEY_NAME", "KEY_VALUE")) >>> .timeWindow(Time.days(1)) >>> .reduce(new DistinctFunction()) >>> >>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin < >>> and...@data-artisans.com> wrote: >>> >>>> Hi Juho, >>>> >>>> Where exactly does the data miss? When do you notice that? >>>> Do you check it: >>>> - debugging `DistinctFunction.reduce` right after resume in the middle >>>> of the day >>>> or >>>> - some distinct records miss in the final output of BucketingSink in s3 >>>> after window result is actually triggered and saved into s3 at the end of >>>> the day? is this the main output? >>>> >>>> The late data around the time of taking savepoint might be not included >>>> into the savepoint but it should be behind the snapshotted offset in Kafka. >>>> Then it should just come later after the restore and should be reduced >>>> within the allowed lateness into the final result which is saved into s3. >>>> >>>> Also, is this `DistinctFunction.reduce` just an example or the actual >>>> implementation, basically saving just one of records inside the 24h window >>>> in s3? then what is missing there? >>>> >>>> Cheers, >>>> Andrey >>>> >>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com> wrote: >>>> >>>> I changed to allowedLateness=0, no change, still missing data when >>>> restoring from savepoint. >>>> >>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <juho.au...@rovio.com> >>>> wrote: >>>> >>>>> I realized that BucketingSink must not play any role in this problem. >>>>> This is because only when the 24-hour window triggers, BucketinSink gets a >>>>> burst of input. Around the state restoring point (middle of the day) it >>>>> doesn't get any input, so it can't lose anything either (right?). >>>>> >>>>> I will next try removing the allowedLateness entirely from the >>>>> equation. >>>>> >>>>> In the meanwhile, please let me know if you have any suggestions for >>>>> debugging the lost data, for example what logs to enable. >>>>> >>>>> We use FlinkKafkaConsumer010 btw. Are there any known issues with >>>>> that, that could contribute to lost data when restoring a savepoint? >>>>> >>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <juho.au...@rovio.com> >>>>> wrote: >>>>> >>>>>> Some data is silently lost on my Flink stream job when state is >>>>>> restored from a savepoint. >>>>>> >>>>>> Do you have any debugging hints to find out where exactly the data >>>>>> gets dropped? >>>>>> >>>>>> My job gathers distinct values using a 24-hour window. It doesn't >>>>>> have any custom state management. >>>>>> >>>>>> When I cancel the job with savepoint and restore from that savepoint, >>>>>> some data is missed. It seems to be losing just a small amount of data. >>>>>> The >>>>>> event time of lost data is probably around the time of savepoint. In >>>>>> other >>>>>> words the rest of the time window is not entirely missed – collection >>>>>> works >>>>>> correctly also for (most of the) events that come in after restoring. >>>>>> >>>>>> When the job processes a full 24-hour window without interruptions it >>>>>> doesn't miss anything. >>>>>> >>>>>> Usually the problem doesn't happen in test environments that have >>>>>> smaller parallelism and smaller data volumes. But in production volumes >>>>>> the >>>>>> job seems to be consistently missing at least something on every restore. >>>>>> >>>>>> This issue has consistently happened since the job was initially >>>>>> created. It was at first run on an older version of Flink 1.5-SNAPSHOT >>>>>> and >>>>>> it still happens on both Flink 1.5.2 & 1.6.0. >>>>>> >>>>>> I'm wondering if this could be for example some synchronization issue >>>>>> between the kafka consumer offsets vs. what's been written by >>>>>> BucketingSink? >>>>>> >>>>>> 1. Job content, simplified >>>>>> >>>>>> kafkaStream >>>>>> .flatMap(new ExtractFieldsFunction()) >>>>>> .keyBy(new MapKeySelector(1, 2, 3, 4)) >>>>>> .timeWindow(Time.days(1)) >>>>>> .allowedLateness(allowedLateness) >>>>>> .sideOutputLateData(lateDataTag) >>>>>> .reduce(new DistinctFunction()) >>>>>> .addSink(sink) >>>>>> // use a fixed number of output partitions >>>>>> .setParallelism(8)) >>>>>> >>>>>> /** >>>>>> * Usage: .keyBy("the", "distinct", "fields").reduce(new >>>>>> DistinctFunction()) >>>>>> */ >>>>>> public class DistinctFunction implements >>>>>> ReduceFunction<java.util.Map<String, String>> { >>>>>> @Override >>>>>> public Map<String, String> reduce(Map<String, String> value1, >>>>>> Map<String, String> value2) { >>>>>> return value1; >>>>>> } >>>>>> } >>>>>> >>>>>> 2. State configuration >>>>>> >>>>>> boolean enableIncrementalCheckpointing = true; >>>>>> String statePath = "s3n://bucket/savepoints"; >>>>>> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing); >>>>>> >>>>>> Checkpointing Mode Exactly Once >>>>>> Interval 1m 0s >>>>>> Timeout 10m 0s >>>>>> Minimum Pause Between Checkpoints 1m 0s >>>>>> Maximum Concurrent Checkpoints 1 >>>>>> Persist Checkpoints Externally Enabled (retain on cancellation) >>>>>> >>>>>> 3. BucketingSink configuration >>>>>> >>>>>> We use BucketingSink, I don't think there's anything special here, if >>>>>> not the fact that we're writing to S3. >>>>>> >>>>>> String outputPath = "s3://bucket/output"; >>>>>> BucketingSink<Map<String, String>> sink = new >>>>>> BucketingSink<Map<String, String>>(outputPath) >>>>>> .setBucketer(new ProcessdateBucketer()) >>>>>> .setBatchSize(batchSize) >>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold) >>>>>> >>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval); >>>>>> sink.setWriter(new IdJsonWriter()); >>>>>> >>>>>> 4. Kafka & event time >>>>>> >>>>>> My flink job reads the data from Kafka, using a >>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to >>>>>> synchronize watermarks accross all kafka partitions. We also write late >>>>>> data to side output, but nothing is written there – if it would, it could >>>>>> explain missed data in the main output (I'm also sure that our late data >>>>>> writing works, because we previously had some actual late data which >>>>>> ended >>>>>> up there). >>>>>> >>>>>> 5. allowedLateness >>>>>> >>>>>> It may be or may not be relevant that I have also enabled >>>>>> allowedLateness with 1 minute lateness on the 24-hour window: >>>>>> >>>>>> If that makes sense, I could try removing allowedLateness entirely? >>>>>> That would be just to rule out that Flink doesn't have a bug that's >>>>>> related >>>>>> to restoring state in combination with the allowedLateness feature. After >>>>>> all, all of our data should be in a good enough order to not be late, >>>>>> given >>>>>> the max out of orderness used on kafka consumer timestamp extractor. >>>>>> >>>>>> Thank you in advance! >>>>>> >>>>> >>>>> >>>> >>>> >>> >>> >> >> > >