Andrey, thank you very much for the debugging suggestions, I'll try them.

In the meanwhile two more questions, please:

> Just to keep in mind this problem with s3 and exclude it for sure. I
would also check whether the size of missing events is around the batch
size of BucketingSink or not.

Fair enough, but I also want to focus on debugging the most probable
subject first. So what do you think about this – true or false: only when
the 24-hour window triggers, BucketinSink gets a burst of input. Around the
state restoring point (middle of the day) it doesn't get any input, so it
can't lose anything either. Isn't this true, or have I totally missed how
Flink works in triggering window results? I would not expect there to be
any optimization that speculatively triggers early results of a regular
time window to the downstream operators.

> The old BucketingSink has in general problem with s3. Internally
BucketingSink queries s3 as a file system to list already written file
parts (batches) and determine index of the next part to start. Due to
eventual consistency of checking file existence in s3 [1], the
BucketingSink can rewrite the previously written part and basically loose
it.

I was wondering, what does S3's "read-after-write consistency" (mentioned
on the page you linked) actually mean. It seems that this might be possible:
- LIST keys, find current max index
- choose next index = max + 1
- HEAD next index: if it exists, keep adding + 1 until key doesn't exist on
S3

But definitely sounds easier if a sink keeps track of files in a way that's
guaranteed to be consistent.

Cheers,
Juho

On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <and...@data-artisans.com>
wrote:

> Hi,
>
> true, StreamingFileSink does not support s3 in 1.6.0, it is planned for
> the next 1.7 release, sorry for confusion.
> The old BucketingSink has in general problem with s3.
> Internally BucketingSink queries s3 as a file system
> to list already written file parts (batches) and determine index of the
> next part to start. Due to eventual consistency of checking file existence
> in s3 [1], the BucketingSink can rewrite the previously written part and
> basically loose it. It should be fixed for StreamingFileSink in 1.7 where
> Flink keeps its own track of written parts and does not rely on s3 as a
> file system.
> I also include Kostas, he might add more details.
>
> Just to keep in mind this problem with s3 and exclude it for sure  I would
> also check whether the size of missing events is around the batch size of
> BucketingSink or not. You also wrote that the timestamps of lost event are
> 'probably' around the time of the savepoint, if it is not yet for sure I
> would also check it.
>
> Have you already checked the log files of job manager and task managers
> for the job running before and after the restore from the check point? Is
> everything successful there, no errors, relevant warnings or exceptions?
>
> As the next step, I would suggest to log all encountered events in
> DistinctFunction.reduce if possible for production data and check whether
> the missed events are eventually processed before or after the savepoint.
> The following log message indicates a border between the events that should
> be included into the savepoint (logged before) or not:
> “{} ({}, synchronous part) in thread {} took {} ms” (template)
> Also check if the savepoint has been overall completed:
> "{} ({}, asynchronous part) in thread {} took {} ms."
>
> Best,
> Andrey
>
> [1] https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html
>
> On 24 Aug 2018, at 20:41, Juho Autio <juho.au...@rovio.com> wrote:
>
> Hi,
>
> Using StreamingFileSink is not a convenient option for production use for
> us as it doesn't support s3*. I could use StreamingFileSink just to verify,
> but I don't see much point in doing so. Please consider my previous comment:
>
> > I realized that BucketingSink must not play any role in this problem.
> This is because only when the 24-hour window triggers, BucketingSink gets a
> burst of input. Around the state restoring point (middle of the day) it
> doesn't get any input, so it can't lose anything either (right?).
>
> I could also use a kafka sink instead, but I can't imagine how there could
> be any difference. It's very real that the sink doesn't get any input for a
> long time until the 24-hour window closes, and then it quickly writes out
> everything because it's not that much data eventually for the distinct
> values.
>
> Any ideas for debugging what's happening around the savepoint &
> restoration time?
>
> *) I actually implemented StreamingFileSink as an alternative sink. This
> was before I came to realize that most likely the sink component has
> nothing to do with the data loss problem. I tried it with s3n:// path just
> to see an exception being thrown. In the source code I indeed then found an
> explicit check for the target path scheme to be "hdfs://".
>
> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <and...@data-artisans.com>
> wrote:
>
>> Ok, I think before further debugging the window reduced state,
>> could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0
>> instead of the previous 'BucketingSink’?
>>
>> Cheers,
>> Andrey
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>
>> On 24 Aug 2018, at 18:03, Juho Autio <juho.au...@rovio.com> wrote:
>>
>> Yes, sorry for my confusing comment. I just meant that it seems like
>> there's a bug somewhere now that the output is missing some data.
>>
>> > I would wait and check the actual output in s3 because it is the main
>> result of the job
>>
>> Yes, and that's what I have already done. There seems to be always some
>> data loss with the production data volumes, if the job has been restarted
>> on that day.
>>
>> Would you have any suggestions for how to debug this further?
>>
>> Many thanks for stepping in.
>>
>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <and...@data-artisans.com>
>> wrote:
>>
>>> Hi Juho,
>>>
>>> So it is a per key deduplication job.
>>>
>>> Yes, I would wait and check the actual output in s3 because it is the
>>> main result of the job and
>>>
>>> > The late data around the time of taking savepoint might be not
>>> included into the savepoint but it should be behind the snapshotted offset
>>> in Kafka.
>>>
>>> is not a bug, it is a possible behaviour.
>>>
>>> The savepoint is a snapshot of the data in transient which is already
>>> consumed from Kafka.
>>> Basically the full contents of the window result is split between the
>>> savepoint and what can come after the savepoint'ed offset in Kafka but
>>> before the window result is written into s3.
>>>
>>> Allowed lateness should not affect it, I am just saying that the final
>>> result in s3 should include all records after it.
>>> This is what should be guaranteed but not the contents of the
>>> intermediate savepoint.
>>>
>>> Cheers,
>>> Andrey
>>>
>>> On 24 Aug 2018, at 16:52, Juho Autio <juho.au...@rovio.com> wrote:
>>>
>>> Thanks for your answer!
>>>
>>> I check for the missed data from the final output on s3. So I wait until
>>> the next day, then run the same thing re-implemented in batch, and compare
>>> the output.
>>>
>>> > The late data around the time of taking savepoint might be not
>>> included into the savepoint but it should be behind the snapshotted offset
>>> in Kafka.
>>>
>>> Yes, I would definitely expect that. It seems like there's a bug
>>> somewhere.
>>>
>>> > Then it should just come later after the restore and should be reduced
>>> within the allowed lateness into the final result which is saved into s3.
>>>
>>> Well, as far as I know, allowed lateness doesn't play any role here,
>>> because I started running the job with allowedLateness=0, and still get the
>>> data loss, while my late data output doesn't receive anything.
>>>
>>> > Also, is this `DistinctFunction.reduce` just an example or the actual
>>> implementation, basically saving just one of records inside the 24h window
>>> in s3? then what is missing there?
>>>
>>> Yes, it's the actual implementation. Note that there's a keyBy before
>>> the DistinctFunction. So there's one record for each key (which is the
>>> combination of a couple of fields). In practice I've seen that we're
>>> missing ~2000-4000 elements on each restore, and the total output is
>>> obviously much more than that.
>>>
>>> Here's the full code for the key selector:
>>>
>>> public class MapKeySelector implements KeySelector<Map<String,String>,
>>> Object> {
>>>
>>>     private final String[] fields;
>>>
>>>     public MapKeySelector(String... fields) {
>>>         this.fields = fields;
>>>     }
>>>
>>>     @Override
>>>     public Object getKey(Map<String, String> event) throws Exception {
>>>         Tuple key = Tuple.getTupleClass(fields.length).newInstance();
>>>         for (int i = 0; i < fields.length; i++) {
>>>             key.setField(event.getOrDefault(fields[i], ""), i);
>>>         }
>>>         return key;
>>>     }
>>> }
>>>
>>> And a more exact example on how it's used:
>>>
>>>                 .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD",
>>> "KEY_NAME", "KEY_VALUE"))
>>>                 .timeWindow(Time.days(1))
>>>                 .reduce(new DistinctFunction())
>>>
>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <
>>> and...@data-artisans.com> wrote:
>>>
>>>> Hi Juho,
>>>>
>>>> Where exactly does the data miss? When do you notice that?
>>>> Do you check it:
>>>> - debugging `DistinctFunction.reduce` right after resume in the middle
>>>> of the day
>>>> or
>>>> - some distinct records miss in the final output of BucketingSink in s3
>>>> after window result is actually triggered and saved into s3 at the end of
>>>> the day? is this the main output?
>>>>
>>>> The late data around the time of taking savepoint might be not included
>>>> into the savepoint but it should be behind the snapshotted offset in Kafka.
>>>> Then it should just come later after the restore and should be reduced
>>>> within the allowed lateness into the final result which is saved into s3.
>>>>
>>>> Also, is this `DistinctFunction.reduce` just an example or the actual
>>>> implementation, basically saving just one of records inside the 24h window
>>>> in s3? then what is missing there?
>>>>
>>>> Cheers,
>>>> Andrey
>>>>
>>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.au...@rovio.com> wrote:
>>>>
>>>> I changed to allowedLateness=0, no change, still missing data when
>>>> restoring from savepoint.
>>>>
>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <juho.au...@rovio.com>
>>>> wrote:
>>>>
>>>>> I realized that BucketingSink must not play any role in this problem.
>>>>> This is because only when the 24-hour window triggers, BucketinSink gets a
>>>>> burst of input. Around the state restoring point (middle of the day) it
>>>>> doesn't get any input, so it can't lose anything either (right?).
>>>>>
>>>>> I will next try removing the allowedLateness entirely from the
>>>>> equation.
>>>>>
>>>>> In the meanwhile, please let me know if you have any suggestions for
>>>>> debugging the lost data, for example what logs to enable.
>>>>>
>>>>> We use FlinkKafkaConsumer010 btw. Are there any known issues with
>>>>> that, that could contribute to lost data when restoring a savepoint?
>>>>>
>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <juho.au...@rovio.com>
>>>>> wrote:
>>>>>
>>>>>> Some data is silently lost on my Flink stream job when state is
>>>>>> restored from a savepoint.
>>>>>>
>>>>>> Do you have any debugging hints to find out where exactly the data
>>>>>> gets dropped?
>>>>>>
>>>>>> My job gathers distinct values using a 24-hour window. It doesn't
>>>>>> have any custom state management.
>>>>>>
>>>>>> When I cancel the job with savepoint and restore from that savepoint,
>>>>>> some data is missed. It seems to be losing just a small amount of data. 
>>>>>> The
>>>>>> event time of lost data is probably around the time of savepoint. In 
>>>>>> other
>>>>>> words the rest of the time window is not entirely missed – collection 
>>>>>> works
>>>>>> correctly also for (most of the) events that come in after restoring.
>>>>>>
>>>>>> When the job processes a full 24-hour window without interruptions it
>>>>>> doesn't miss anything.
>>>>>>
>>>>>> Usually the problem doesn't happen in test environments that have
>>>>>> smaller parallelism and smaller data volumes. But in production volumes 
>>>>>> the
>>>>>> job seems to be consistently missing at least something on every restore.
>>>>>>
>>>>>> This issue has consistently happened since the job was initially
>>>>>> created. It was at first run on an older version of Flink 1.5-SNAPSHOT 
>>>>>> and
>>>>>> it still happens on both Flink 1.5.2 & 1.6.0.
>>>>>>
>>>>>> I'm wondering if this could be for example some synchronization issue
>>>>>> between the kafka consumer offsets vs. what's been written by 
>>>>>> BucketingSink?
>>>>>>
>>>>>> 1. Job content, simplified
>>>>>>
>>>>>>         kafkaStream
>>>>>>                 .flatMap(new ExtractFieldsFunction())
>>>>>>                 .keyBy(new MapKeySelector(1, 2, 3, 4))
>>>>>>                 .timeWindow(Time.days(1))
>>>>>>                 .allowedLateness(allowedLateness)
>>>>>>                 .sideOutputLateData(lateDataTag)
>>>>>>                 .reduce(new DistinctFunction())
>>>>>>                 .addSink(sink)
>>>>>>                 // use a fixed number of output partitions
>>>>>>                 .setParallelism(8))
>>>>>>
>>>>>> /**
>>>>>>  * Usage: .keyBy("the", "distinct", "fields").reduce(new
>>>>>> DistinctFunction())
>>>>>>  */
>>>>>> public class DistinctFunction implements
>>>>>> ReduceFunction<java.util.Map<String, String>> {
>>>>>>     @Override
>>>>>>     public Map<String, String> reduce(Map<String, String> value1,
>>>>>> Map<String, String> value2) {
>>>>>>         return value1;
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> 2. State configuration
>>>>>>
>>>>>> boolean enableIncrementalCheckpointing = true;
>>>>>> String statePath = "s3n://bucket/savepoints";
>>>>>> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);
>>>>>>
>>>>>> Checkpointing Mode Exactly Once
>>>>>> Interval 1m 0s
>>>>>> Timeout 10m 0s
>>>>>> Minimum Pause Between Checkpoints 1m 0s
>>>>>> Maximum Concurrent Checkpoints 1
>>>>>> Persist Checkpoints Externally Enabled (retain on cancellation)
>>>>>>
>>>>>> 3. BucketingSink configuration
>>>>>>
>>>>>> We use BucketingSink, I don't think there's anything special here, if
>>>>>> not the fact that we're writing to S3.
>>>>>>
>>>>>>         String outputPath = "s3://bucket/output";
>>>>>>         BucketingSink<Map<String, String>> sink = new
>>>>>> BucketingSink<Map<String, String>>(outputPath)
>>>>>>                 .setBucketer(new ProcessdateBucketer())
>>>>>>                 .setBatchSize(batchSize)
>>>>>>                 .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>>>>
>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>>>>         sink.setWriter(new IdJsonWriter());
>>>>>>
>>>>>> 4. Kafka & event time
>>>>>>
>>>>>> My flink job reads the data from Kafka, using a
>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to
>>>>>> synchronize watermarks accross all kafka partitions. We also write late
>>>>>> data to side output, but nothing is written there – if it would, it could
>>>>>> explain missed data in the main output (I'm also sure that our late data
>>>>>> writing works, because we previously had some actual late data which 
>>>>>> ended
>>>>>> up there).
>>>>>>
>>>>>> 5. allowedLateness
>>>>>>
>>>>>> It may be or may not be relevant that I have also enabled
>>>>>> allowedLateness with 1 minute lateness on the 24-hour window:
>>>>>>
>>>>>> If that makes sense, I could try removing allowedLateness entirely?
>>>>>> That would be just to rule out that Flink doesn't have a bug that's 
>>>>>> related
>>>>>> to restoring state in combination with the allowedLateness feature. After
>>>>>> all, all of our data should be in a good enough order to not be late, 
>>>>>> given
>>>>>> the max out of orderness used on kafka consumer timestamp extractor.
>>>>>>
>>>>>> Thank you in advance!
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>
>

Reply via email to