I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.
On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <juho.au...@rovio.com> wrote: > I realized that BucketingSink must not play any role in this problem. This > is because only when the 24-hour window triggers, BucketinSink gets a burst > of input. Around the state restoring point (middle of the day) it doesn't > get any input, so it can't lose anything either (right?). > > I will next try removing the allowedLateness entirely from the equation. > > In the meanwhile, please let me know if you have any suggestions for > debugging the lost data, for example what logs to enable. > > We use FlinkKafkaConsumer010 btw. Are there any known issues with that, > that could contribute to lost data when restoring a savepoint? > > On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <juho.au...@rovio.com> wrote: > >> Some data is silently lost on my Flink stream job when state is restored >> from a savepoint. >> >> Do you have any debugging hints to find out where exactly the data gets >> dropped? >> >> My job gathers distinct values using a 24-hour window. It doesn't have >> any custom state management. >> >> When I cancel the job with savepoint and restore from that savepoint, >> some data is missed. It seems to be losing just a small amount of data. The >> event time of lost data is probably around the time of savepoint. In other >> words the rest of the time window is not entirely missed – collection works >> correctly also for (most of the) events that come in after restoring. >> >> When the job processes a full 24-hour window without interruptions it >> doesn't miss anything. >> >> Usually the problem doesn't happen in test environments that have smaller >> parallelism and smaller data volumes. But in production volumes the job >> seems to be consistently missing at least something on every restore. >> >> This issue has consistently happened since the job was initially created. >> It was at first run on an older version of Flink 1.5-SNAPSHOT and it still >> happens on both Flink 1.5.2 & 1.6.0. >> >> I'm wondering if this could be for example some synchronization issue >> between the kafka consumer offsets vs. what's been written by BucketingSink? >> >> 1. Job content, simplified >> >> kafkaStream >> .flatMap(new ExtractFieldsFunction()) >> .keyBy(new MapKeySelector(1, 2, 3, 4)) >> .timeWindow(Time.days(1)) >> .allowedLateness(allowedLateness) >> .sideOutputLateData(lateDataTag) >> .reduce(new DistinctFunction()) >> .addSink(sink) >> // use a fixed number of output partitions >> .setParallelism(8)) >> >> /** >> * Usage: .keyBy("the", "distinct", "fields").reduce(new >> DistinctFunction()) >> */ >> public class DistinctFunction implements >> ReduceFunction<java.util.Map<String, String>> { >> @Override >> public Map<String, String> reduce(Map<String, String> value1, >> Map<String, String> value2) { >> return value1; >> } >> } >> >> 2. State configuration >> >> boolean enableIncrementalCheckpointing = true; >> String statePath = "s3n://bucket/savepoints"; >> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing); >> >> Checkpointing Mode Exactly Once >> Interval 1m 0s >> Timeout 10m 0s >> Minimum Pause Between Checkpoints 1m 0s >> Maximum Concurrent Checkpoints 1 >> Persist Checkpoints Externally Enabled (retain on cancellation) >> >> 3. BucketingSink configuration >> >> We use BucketingSink, I don't think there's anything special here, if not >> the fact that we're writing to S3. >> >> String outputPath = "s3://bucket/output"; >> BucketingSink<Map<String, String>> sink = new >> BucketingSink<Map<String, String>>(outputPath) >> .setBucketer(new ProcessdateBucketer()) >> .setBatchSize(batchSize) >> .setInactiveBucketThreshold(inactiveBucketThreshold) >> >> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval); >> sink.setWriter(new IdJsonWriter()); >> >> 4. Kafka & event time >> >> My flink job reads the data from Kafka, using a >> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to >> synchronize watermarks accross all kafka partitions. We also write late >> data to side output, but nothing is written there – if it would, it could >> explain missed data in the main output (I'm also sure that our late data >> writing works, because we previously had some actual late data which ended >> up there). >> >> 5. allowedLateness >> >> It may be or may not be relevant that I have also enabled allowedLateness >> with 1 minute lateness on the 24-hour window: >> >> If that makes sense, I could try removing allowedLateness entirely? That >> would be just to rule out that Flink doesn't have a bug that's related to >> restoring state in combination with the allowedLateness feature. After all, >> all of our data should be in a good enough order to not be late, given the >> max out of orderness used on kafka consumer timestamp extractor. >> >> Thank you in advance! >> > >