Hi,

What Fabian mentioned is true. Flink Kafka Consumer’s exactly-once guarantee 
relies on offsets checkpoints as Flink state, and doesn’t rely on the committed 
offsets in Kafka.

What we found is that Flink acks Kafka immediately before even writing to S3.

What you mean by ack here is the offset committing back to Kafka, correct?
First of all, as mentioned, this behavior is not related to exactly-once. In 
fact, in Flink 1.3, you can completely turn this off and still achieve 
exactly-once (with Flink checkpointing enabled).
One other thing to mention is that offsets are committed to Kafka when all 
sinks in the job complete their state snapshot.

What we found is that Flink acks Kafka immediately before even writing to S3. 
The consequence of this seems to be that if the job gets cancelled before the 
acked events are flushed to S3 then these are lost

So, at a first look on your description, this seems like a problem with the 
state snapshotting of the bucketing sink. This is suggesting that data is not 
flushed to S3 properly when `snapshotState` of the bucketing sink returns. I’m 
not entirely familiar with the bucketing sink, so this is just a superficial 
guess from what you described.

Flink doesn't seem to keep in its checkpointed state the fact that it acked 
those events but never flushed them to S3.

Keep in mind that this is two separate states we’re talking about here. 1) the 
offsets checkpointed as state of the Kafka consumer source, and 2) bucket state 
(which should keep track of uncommitted events w.r.t. Flink’s checkpoints; 
events are considered as committed in bucketed sinks when the Flink checkpoint 
it is part of is complete). For details on this I recommend checking out the 
class Javadoc of `BucketingSink`.

@Sihua
the bucketing sink also manages bucket states to achieve exactly-once semantics.

Cheers,
Gordon

On 20 July 2017 at 10:46:52 AM, 周思华 (summerle...@163.com) wrote:

Hi Fran,

is the DataTimeBucketer acts like a memory buffer and does't managed by flink's 
state? If so, then i think the problem is not about Kafka, but about the 
DateTimeBucketer. Flink won't take snapshot for the DataTimeBucketer if it not 
in any state.

Best, 
Sihua Zhou


At 2017-07-20 03:02:20, "Fabian Hueske" <fhue...@gmail.com> wrote:
Hi Fran,

did you observe actual data loss due to the problem you are describing or are 
you discussing a possible issue based on your observations?

AFAIK, Flink's Kafka consumer keeps track of the offsets itself and includes 
these in the checkpoints. In case of a recovery, it does not rely on the 
offsets which were committed back to Kafka but only on the offsets it 
checkpointed itself.
Gordon (in CC) is familiar with all details of Flink's Kafka consumer and can 
give a more detailed answer.

Best, Fabian

2017-07-19 16:55 GMT+02:00 Francisco Blaya <francisco.bl...@hivehome.com>:
Hi,

We have a Flink job running on AWS EMR sourcing a Kafka topic and persisting 
the events to S3 through a DateTimeBucketer. We configured the bucketer to 
flush to S3 with an inactivity period of 5 mins.The rate at which events are 
written to Kafka in the first place is very low so it is easy for us to 
investigate how the Flink job would recover in respect to Kafka offsets after 
the job gets cancelled or the Yarn session killed.

What we found is that Flink acks Kafka immediately before even writing to S3. 
The consequence of this seems to be that if the job gets cancelled before the 
acked events are flushed to S3 then these are lost, they don't get written when 
the job restarts. Flink doesn't seem to keep in its checkpointed state the fact 
that it acked those events but never flushed them to S3. Checkpoints are 
created every 5 seconds in S3.

We've also tried to configure externalized checkpoints throught 
"state.checkpoints.dir" configuration key and 
"env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)"
 in the job so that they don't automatically get cleaned up when the job gets 
cancelled or the Yarn session killed. We can see the job uses a restored 
checkpoint upon restart but still we get missing events in S3.

Has anyone come across this behaviour before? Are we assuming something wrong?

We're using EMR 5.4.0 and Flink 1.2.0.

Regards,
Fran

hivehome.com




Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and 
intended only for the use of the individual(s) to which it is addressed. It may 
contain information which is confidential and/or covered by legal professional 
or other privilege. The views expressed in this email are not necessarily the 
views of Centrica plc, and the company, its directors, officers or employees 
make no representation or accept any liability for their accuracy or 
completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in England 
and Wales with its registered office at Millstream, Maidenhead Road, Windsor, 
Berkshire SL4 5GD.



 

Reply via email to