Hi, Using StreamingFileSink is not a convenient option for production use for us as it doesn't support s3*. I could use StreamingFileSink just to verify, but I don't see much point in doing so. Please consider my previous comment:
> I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketingSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?). I could also use a kafka sink instead, but I can't imagine how there could be any difference. It's very real that the sink doesn't get any input for a long time until the 24-hour window closes, and then it quickly writes out everything because it's not that much data eventually for the distinct values. Any ideas for debugging what's happening around the savepoint & restoration time? *) I actually implemented StreamingFileSink as an alternative sink. This was before I came to realize that most likely the sink component has nothing to do with the data loss problem. I tried it with s3n:// path just to see an exception being thrown. In the source code I indeed then found an explicit check for the target path scheme to be "hdfs://". On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <and...@data-artisans.com> wrote: > Ok, I think before further debugging the window reduced state, > could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 > instead of the previous 'BucketingSink’? > > Cheers, > Andrey > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html > > On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com> wrote: > > Yes, sorry for my confusing comment. I just meant that it seems like > there's a bug somewhere now that the output is missing some data. > > > I would wait and check the actual output in s3 because it is the main > result of the job > > Yes, and that's what I have already done. There seems to be always some > data loss with the production data volumes, if the job has been restarted > on that day. > > Would you have any suggestions for how to debug this further? > > Many thanks for stepping in. > > On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <and...@data-artisans.com> > wrote: > >> Hi Juho, >> >> So it is a per key deduplication job. >> >> Yes, I would wait and check the actual output in s3 because it is the >> main result of the job and >> >> > The late data around the time of taking savepoint might be not included >> into the savepoint but it should be behind the snapshotted offset in Kafka. >> >> is not a bug, it is a possible behaviour. >> >> The savepoint is a snapshot of the data in transient which is already >> consumed from Kafka. >> Basically the full contents of the window result is split between the >> savepoint and what can come after the savepoint'ed offset in Kafka but >> before the window result is written into s3. >> >> Allowed lateness should not affect it, I am just saying that the final >> result in s3 should include all records after it. >> This is what should be guaranteed but not the contents of the >> intermediate savepoint. >> >> Cheers, >> Andrey >> >> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com> wrote: >> >> Thanks for your answer! >> >> I check for the missed data from the final output on s3. So I wait until >> the next day, then run the same thing re-implemented in batch, and compare >> the output. >> >> > The late data around the time of taking savepoint might be not included >> into the savepoint but it should be behind the snapshotted offset in Kafka. >> >> Yes, I would definitely expect that. It seems like there's a bug >> somewhere. >> >> > Then it should just come later after the restore and should be reduced >> within the allowed lateness into the final result which is saved into s3. >> >> Well, as far as I know, allowed lateness doesn't play any role here, >> because I started running the job with allowedLateness=0, and still get the >> data loss, while my late data output doesn't receive anything. >> >> > Also, is this `DistinctFunction.reduce` just an example or the actual >> implementation, basically saving just one of records inside the 24h window >> in s3? then what is missing there? >> >> Yes, it's the actual implementation. Note that there's a keyBy before >> the DistinctFunction. So there's one record for each key (which is the >> combination of a couple of fields). In practice I've seen that we're >> missing ~2000-4000 elements on each restore, and the total output is >> obviously much more than that. >> >> Here's the full code for the key selector: >> >> public class MapKeySelector implements KeySelector<Map<String,String>, >> Object> { >> >> private final String[] fields; >> >> public MapKeySelector(String... fields) { >> this.fields = fields; >> } >> >> @Override >> public Object getKey(Map<String, String> event) throws Exception { >> Tuple key = Tuple.getTupleClass(fields.length).newInstance(); >> for (int i = 0; i < fields.length; i++) { >> key.setField(event.getOrDefault(fields[i], ""), i); >> } >> return key; >> } >> } >> >> And a more exact example on how it's used: >> >> .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", >> "KEY_NAME", "KEY_VALUE")) >> .timeWindow(Time.days(1)) >> .reduce(new DistinctFunction()) >> >> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <and...@data-artisans.com> >> wrote: >> >>> Hi Juho, >>> >>> Where exactly does the data miss? When do you notice that? >>> Do you check it: >>> - debugging `DistinctFunction.reduce` right after resume in the middle >>> of the day >>> or >>> - some distinct records miss in the final output of BucketingSink in s3 >>> after window result is actually triggered and saved into s3 at the end of >>> the day? is this the main output? >>> >>> The late data around the time of taking savepoint might be not included >>> into the savepoint but it should be behind the snapshotted offset in Kafka. >>> Then it should just come later after the restore and should be reduced >>> within the allowed lateness into the final result which is saved into s3. >>> >>> Also, is this `DistinctFunction.reduce` just an example or the actual >>> implementation, basically saving just one of records inside the 24h window >>> in s3? then what is missing there? >>> >>> Cheers, >>> Andrey >>> >>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com> wrote: >>> >>> I changed to allowedLateness=0, no change, still missing data when >>> restoring from savepoint. >>> >>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <juho.au...@rovio.com> >>> wrote: >>> >>>> I realized that BucketingSink must not play any role in this problem. >>>> This is because only when the 24-hour window triggers, BucketinSink gets a >>>> burst of input. Around the state restoring point (middle of the day) it >>>> doesn't get any input, so it can't lose anything either (right?). >>>> >>>> I will next try removing the allowedLateness entirely from the equation. >>>> >>>> In the meanwhile, please let me know if you have any suggestions for >>>> debugging the lost data, for example what logs to enable. >>>> >>>> We use FlinkKafkaConsumer010 btw. Are there any known issues with that, >>>> that could contribute to lost data when restoring a savepoint? >>>> >>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <juho.au...@rovio.com> >>>> wrote: >>>> >>>>> Some data is silently lost on my Flink stream job when state is >>>>> restored from a savepoint. >>>>> >>>>> Do you have any debugging hints to find out where exactly the data >>>>> gets dropped? >>>>> >>>>> My job gathers distinct values using a 24-hour window. It doesn't have >>>>> any custom state management. >>>>> >>>>> When I cancel the job with savepoint and restore from that savepoint, >>>>> some data is missed. It seems to be losing just a small amount of data. >>>>> The >>>>> event time of lost data is probably around the time of savepoint. In other >>>>> words the rest of the time window is not entirely missed – collection >>>>> works >>>>> correctly also for (most of the) events that come in after restoring. >>>>> >>>>> When the job processes a full 24-hour window without interruptions it >>>>> doesn't miss anything. >>>>> >>>>> Usually the problem doesn't happen in test environments that have >>>>> smaller parallelism and smaller data volumes. But in production volumes >>>>> the >>>>> job seems to be consistently missing at least something on every restore. >>>>> >>>>> This issue has consistently happened since the job was initially >>>>> created. It was at first run on an older version of Flink 1.5-SNAPSHOT and >>>>> it still happens on both Flink 1.5.2 & 1.6.0. >>>>> >>>>> I'm wondering if this could be for example some synchronization issue >>>>> between the kafka consumer offsets vs. what's been written by >>>>> BucketingSink? >>>>> >>>>> 1. Job content, simplified >>>>> >>>>> kafkaStream >>>>> .flatMap(new ExtractFieldsFunction()) >>>>> .keyBy(new MapKeySelector(1, 2, 3, 4)) >>>>> .timeWindow(Time.days(1)) >>>>> .allowedLateness(allowedLateness) >>>>> .sideOutputLateData(lateDataTag) >>>>> .reduce(new DistinctFunction()) >>>>> .addSink(sink) >>>>> // use a fixed number of output partitions >>>>> .setParallelism(8)) >>>>> >>>>> /** >>>>> * Usage: .keyBy("the", "distinct", "fields").reduce(new >>>>> DistinctFunction()) >>>>> */ >>>>> public class DistinctFunction implements >>>>> ReduceFunction<java.util.Map<String, String>> { >>>>> @Override >>>>> public Map<String, String> reduce(Map<String, String> value1, >>>>> Map<String, String> value2) { >>>>> return value1; >>>>> } >>>>> } >>>>> >>>>> 2. State configuration >>>>> >>>>> boolean enableIncrementalCheckpointing = true; >>>>> String statePath = "s3n://bucket/savepoints"; >>>>> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing); >>>>> >>>>> Checkpointing Mode Exactly Once >>>>> Interval 1m 0s >>>>> Timeout 10m 0s >>>>> Minimum Pause Between Checkpoints 1m 0s >>>>> Maximum Concurrent Checkpoints 1 >>>>> Persist Checkpoints Externally Enabled (retain on cancellation) >>>>> >>>>> 3. BucketingSink configuration >>>>> >>>>> We use BucketingSink, I don't think there's anything special here, if >>>>> not the fact that we're writing to S3. >>>>> >>>>> String outputPath = "s3://bucket/output"; >>>>> BucketingSink<Map<String, String>> sink = new >>>>> BucketingSink<Map<String, String>>(outputPath) >>>>> .setBucketer(new ProcessdateBucketer()) >>>>> .setBatchSize(batchSize) >>>>> .setInactiveBucketThreshold(inactiveBucketThreshold) >>>>> >>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval); >>>>> sink.setWriter(new IdJsonWriter()); >>>>> >>>>> 4. Kafka & event time >>>>> >>>>> My flink job reads the data from Kafka, using a >>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to >>>>> synchronize watermarks accross all kafka partitions. We also write late >>>>> data to side output, but nothing is written there – if it would, it could >>>>> explain missed data in the main output (I'm also sure that our late data >>>>> writing works, because we previously had some actual late data which ended >>>>> up there). >>>>> >>>>> 5. allowedLateness >>>>> >>>>> It may be or may not be relevant that I have also enabled >>>>> allowedLateness with 1 minute lateness on the 24-hour window: >>>>> >>>>> If that makes sense, I could try removing allowedLateness entirely? >>>>> That would be just to rule out that Flink doesn't have a bug that's >>>>> related >>>>> to restoring state in combination with the allowedLateness feature. After >>>>> all, all of our data should be in a good enough order to not be late, >>>>> given >>>>> the max out of orderness used on kafka consumer timestamp extractor. >>>>> >>>>> Thank you in advance! >>>>> >>>> >>>> >>> >>> >> >> > >