I changed to allowedLateness=0, no change, still missing data when
restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <juho.au...@rovio.com> wrote:

> I realized that BucketingSink must not play any role in this problem. This
> is because only when the 24-hour window triggers, BucketinSink gets a burst
> of input. Around the state restoring point (middle of the day) it doesn't
> get any input, so it can't lose anything either (right?).
>
> I will next try removing the allowedLateness entirely from the equation.
>
> In the meanwhile, please let me know if you have any suggestions for
> debugging the lost data, for example what logs to enable.
>
> We use FlinkKafkaConsumer010 btw. Are there any known issues with that,
> that could contribute to lost data when restoring a savepoint?
>
> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <juho.au...@rovio.com> wrote:
>
>> Some data is silently lost on my Flink stream job when state is restored
>> from a savepoint.
>>
>> Do you have any debugging hints to find out where exactly the data gets
>> dropped?
>>
>> My job gathers distinct values using a 24-hour window. It doesn't have
>> any custom state management.
>>
>> When I cancel the job with savepoint and restore from that savepoint,
>> some data is missed. It seems to be losing just a small amount of data. The
>> event time of lost data is probably around the time of savepoint. In other
>> words the rest of the time window is not entirely missed – collection works
>> correctly also for (most of the) events that come in after restoring.
>>
>> When the job processes a full 24-hour window without interruptions it
>> doesn't miss anything.
>>
>> Usually the problem doesn't happen in test environments that have smaller
>> parallelism and smaller data volumes. But in production volumes the job
>> seems to be consistently missing at least something on every restore.
>>
>> This issue has consistently happened since the job was initially created.
>> It was at first run on an older version of Flink 1.5-SNAPSHOT and it still
>> happens on both Flink 1.5.2 & 1.6.0.
>>
>> I'm wondering if this could be for example some synchronization issue
>> between the kafka consumer offsets vs. what's been written by BucketingSink?
>>
>> 1. Job content, simplified
>>
>>         kafkaStream
>>                 .flatMap(new ExtractFieldsFunction())
>>                 .keyBy(new MapKeySelector(1, 2, 3, 4))
>>                 .timeWindow(Time.days(1))
>>                 .allowedLateness(allowedLateness)
>>                 .sideOutputLateData(lateDataTag)
>>                 .reduce(new DistinctFunction())
>>                 .addSink(sink)
>>                 // use a fixed number of output partitions
>>                 .setParallelism(8))
>>
>> /**
>>  * Usage: .keyBy("the", "distinct", "fields").reduce(new
>> DistinctFunction())
>>  */
>> public class DistinctFunction implements
>> ReduceFunction<java.util.Map<String, String>> {
>>     @Override
>>     public Map<String, String> reduce(Map<String, String> value1,
>> Map<String, String> value2) {
>>         return value1;
>>     }
>> }
>>
>> 2. State configuration
>>
>> boolean enableIncrementalCheckpointing = true;
>> String statePath = "s3n://bucket/savepoints";
>> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);
>>
>> Checkpointing Mode Exactly Once
>> Interval 1m 0s
>> Timeout 10m 0s
>> Minimum Pause Between Checkpoints 1m 0s
>> Maximum Concurrent Checkpoints 1
>> Persist Checkpoints Externally Enabled (retain on cancellation)
>>
>> 3. BucketingSink configuration
>>
>> We use BucketingSink, I don't think there's anything special here, if not
>> the fact that we're writing to S3.
>>
>>         String outputPath = "s3://bucket/output";
>>         BucketingSink<Map<String, String>> sink = new
>> BucketingSink<Map<String, String>>(outputPath)
>>                 .setBucketer(new ProcessdateBucketer())
>>                 .setBatchSize(batchSize)
>>                 .setInactiveBucketThreshold(inactiveBucketThreshold)
>>
>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>         sink.setWriter(new IdJsonWriter());
>>
>> 4. Kafka & event time
>>
>> My flink job reads the data from Kafka, using a
>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to
>> synchronize watermarks accross all kafka partitions. We also write late
>> data to side output, but nothing is written there – if it would, it could
>> explain missed data in the main output (I'm also sure that our late data
>> writing works, because we previously had some actual late data which ended
>> up there).
>>
>> 5. allowedLateness
>>
>> It may be or may not be relevant that I have also enabled allowedLateness
>> with 1 minute lateness on the 24-hour window:
>>
>> If that makes sense, I could try removing allowedLateness entirely? That
>> would be just to rule out that Flink doesn't have a bug that's related to
>> restoring state in combination with the allowedLateness feature. After all,
>> all of our data should be in a good enough order to not be late, given the
>> max out of orderness used on kafka consumer timestamp extractor.
>>
>> Thank you in advance!
>>
>
>

Reply via email to