
Sorry for getting back to this so late but I think the underlying problem is 
that S3 does not behave as expected by the Bucketing Sink. See this message by 
Stephan on the topic: 

In there he also mentions a PR that we're trying to get in that should improve 
working with S3 as a sink.


> On 21. Jul 2017, at 16:04, Francisco Blaya <francisco.bl...@hivehome.com> 
> wrote:
> Hi Aljoscha,
> I've tried both. When we restart from a manually created savepoint we see 
> "restored from savepoint"  in the Flink Dashboard. If we restart from 
> externalized checkpoint we see "restored from checkpoint". In both scenarios 
> we lose data in S3.
> Cheers,
> Fran
> On 20 July 2017 at 17:54, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> You said you cancel and restart the job. How do you then restart the Job? 
> From a savepoint or externalised checkpoint? Do you also see missing data 
> when using an externalised checkpoint or a savepoint?
> Best,
> Aljoscha
>> On 20. Jul 2017, at 16:15, Francisco Blaya <francisco.bl...@hivehome.com 
>> <mailto:francisco.bl...@hivehome.com>> wrote:
>> Forgot to add that when a job gets cancelled via the UI (this is not the 
>> case when the Yarn session is killed) a part file ending in ".pending" does 
>> appear in S3, but that never seems to be promoted to finished upon restart 
>> of the job
>> On 20 July 2017 at 11:41, Francisco Blaya <francisco.bl...@hivehome.com 
>> <mailto:francisco.bl...@hivehome.com>> wrote:
>> Hi,
>> Thanks for your answers.
>> @Fabian. I can see that Flink's consumer commits the offsets to Kafka, no 
>> problem there. However I'd expect everything that gets read from Kafka to 
>> appear in S3 at some point, even if the job gets stopped/killed before 
>> flushing and then restarted. And that's what is not happening. So I see data 
>> loss in S3.
>> @Sihua. I assume that the fact that the DataTimeBucketer is configured as 
>> the sink of the stream means that its state gets snapshoted by Flink through 
>> the checkpoint mechanism.
>> @Gordon. When I say acking I mean indeed committing the offset back to 
>> Kafka. I agree with you, the problem seems to be related to the state 
>> snapshotting of the bucketing sink, nothing to do with Kafka. Could you 
>> please clarify what you mean with "events are considered as committed in 
>> bucketed sinks when the Flink checkpoint it is part of is complete"? When 
>> you talk about uncommitted events of the bucket state you mean events that 
>> haven't been written to S3?
>> Cheers,
>> Fran
>> On 20 July 2017 at 07:29, Tzu-Li (Gordon) Tai <tzuli...@apache.org 
>> <mailto:tzuli...@apache.org>> wrote:
>> Hi,
>> What Fabian mentioned is true. Flink Kafka Consumer’s exactly-once guarantee 
>> relies on offsets checkpoints as Flink state, and doesn’t rely on the 
>> committed offsets in Kafka.
>>> What we found is that Flink acks Kafka immediately before even writing to 
>>> S3.
>> What you mean by ack here is the offset committing back to Kafka, correct?
>> First of all, as mentioned, this behavior is not related to exactly-once. In 
>> fact, in Flink 1.3, you can completely turn this off and still achieve 
>> exactly-once (with Flink checkpointing enabled).
>> One other thing to mention is that offsets are committed to Kafka when all 
>> sinks in the job complete their state snapshot.
>>> What we found is that Flink acks Kafka immediately before even writing to 
>>> S3. The consequence of this seems to be that if the job gets cancelled 
>>> before the acked events are flushed to S3 then these are lost
>> So, at a first look on your description, this seems like a problem with the 
>> state snapshotting of the bucketing sink. This is suggesting that data is 
>> not flushed to S3 properly when `snapshotState` of the bucketing sink 
>> returns. I’m not entirely familiar with the bucketing sink, so this is just 
>> a superficial guess from what you described.
>>> Flink doesn't seem to keep in its checkpointed state the fact that it acked 
>>> those events but never flushed them to S3.
>> Keep in mind that this is two separate states we’re talking about here. 1) 
>> the offsets checkpointed as state of the Kafka consumer source, and 2) 
>> bucket state (which should keep track of uncommitted events w.r.t. Flink’s 
>> checkpoints; events are considered as committed in bucketed sinks when the 
>> Flink checkpoint it is part of is complete). For details on this I recommend 
>> checking out the class Javadoc of `BucketingSink`.
>> @Sihua
>> the bucketing sink also manages bucket states to achieve exactly-once 
>> semantics.
>> Cheers,
>> Gordon
>> On 20 July 2017 at 10:46:52 AM, 周思华 (summerle...@163.com 
>> <mailto:summerle...@163.com>) wrote:
>>> Hi Fran,
>>> is the DataTimeBucketer acts like a memory buffer and does't managed by 
>>> flink's state? If so, then i think the problem is not about Kafka, but 
>>> about the DateTimeBucketer. Flink won't take snapshot for the 
>>> DataTimeBucketer if it not in any state.
>>> Best, 
>>> Sihua Zhou
>>> At 2017-07-20 03:02:20, "Fabian Hueske" <fhue...@gmail.com 
>>> <mailto:fhue...@gmail.com>> wrote:
>>> Hi Fran,
>>> did you observe actual data loss due to the problem you are describing or 
>>> are you discussing a possible issue based on your observations?
>>> AFAIK, Flink's Kafka consumer keeps track of the offsets itself and 
>>> includes these in the checkpoints. In case of a recovery, it does not rely 
>>> on the offsets which were committed back to Kafka but only on the offsets 
>>> it checkpointed itself.
>>> Gordon (in CC) is familiar with all details of Flink's Kafka consumer and 
>>> can give a more detailed answer.
>>> Best, Fabian
>>> 2017-07-19 16:55 GMT+02:00 Francisco Blaya <francisco.bl...@hivehome.com 
>>> <mailto:francisco.bl...@hivehome.com>>:
>>> Hi,
>>> We have a Flink job running on AWS EMR sourcing a Kafka topic and 
>>> persisting the events to S3 through a DateTimeBucketer. We configured the 
>>> bucketer to flush to S3 with an inactivity period of 5 mins.The rate at 
>>> which events are written to Kafka in the first place is very low so it is 
>>> easy for us to investigate how the Flink job would recover in respect to 
>>> Kafka offsets after the job gets cancelled or the Yarn session killed.
>>> What we found is that Flink acks Kafka immediately before even writing to 
>>> S3. The consequence of this seems to be that if the job gets cancelled 
>>> before the acked events are flushed to S3 then these are lost, they don't 
>>> get written when the job restarts. Flink doesn't seem to keep in its 
>>> checkpointed state the fact that it acked those events but never flushed 
>>> them to S3. Checkpoints are created every 5 seconds in S3.
>>> We've also tried to configure externalized checkpoints throught 
>>> "state.checkpoints.dir" configuration key and 
>>> "env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)"
>>>  in the job so that they don't automatically get cleaned up when the job 
>>> gets cancelled or the Yarn session killed. We can see the job uses a 
>>> restored checkpoint upon restart but still we get missing events in S3.
>>> Has anyone come across this behaviour before? Are we assuming something 
>>> wrong?
>>> We're using EMR 5.4.0 and Flink 1.2.0.
>>> Regards,
>>> Fran
>>> hivehome.com <http://www.hivehome.com/>
>>> Hive | London | Cambridge | Houston | Toronto
>>> The information contained in or attached to this email is confidential and 
>>> intended only for the use of the individual(s) to which it is addressed. It 
>>> may contain information which is confidential and/or covered by legal 
>>> professional or other privilege. The views expressed in this email are not 
>>> necessarily the views of Centrica plc, and the company, its directors, 
>>> officers or employees make no representation or accept any liability for 
>>> their accuracy or completeness unless expressly stated to the contrary. 
>>> Centrica Connected Home Limited (company no: 5782908), registered in 
>>> England and Wales with its registered office at Millstream, Maidenhead 
>>> Road, Windsor, Berkshire SL4 5GD.
>> hivehome.com <http://www.hivehome.com/>
>> Hive | London | Cambridge | Houston | Toronto
>> The information contained in or attached to this email is confidential and 
>> intended only for the use of the individual(s) to which it is addressed. It 
>> may contain information which is confidential and/or covered by legal 
>> professional or other privilege. The views expressed in this email are not 
>> necessarily the views of Centrica plc, and the company, its directors, 
>> officers or employees make no representation or accept any liability for 
>> their accuracy or completeness unless expressly stated to the contrary. 
>> Centrica Connected Home Limited (company no: 5782908), registered in England 
>> and Wales with its registered office at Millstream, Maidenhead Road, 
>> Windsor, Berkshire SL4 5GD.
> hivehome.com <http://www.hivehome.com/>
> Hive | London | Cambridge | Houston | Toronto
> The information contained in or attached to this email is confidential and 
> intended only for the use of the individual(s) to which it is addressed. It 
> may contain information which is confidential and/or covered by legal 
> professional or other privilege. The views expressed in this email are not 
> necessarily the views of Centrica plc, and the company, its directors, 
> officers or employees make no representation or accept any liability for 
> their accuracy or completeness unless expressly stated to the contrary. 
> Centrica Connected Home Limited (company no: 5782908), registered in England 
> and Wales with its registered office at Millstream, Maidenhead Road, Windsor, 
> Berkshire SL4 5GD.

Reply via email to