You said you cancel and restart the job. How do you then restart the Job? From 
a savepoint or externalised checkpoint? Do you also see missing data when using 
an externalised checkpoint or a savepoint?

Best,
Aljoscha

> On 20. Jul 2017, at 16:15, Francisco Blaya <francisco.bl...@hivehome.com> 
> wrote:
> 
> Forgot to add that when a job gets cancelled via the UI (this is not the case 
> when the Yarn session is killed) a part file ending in ".pending" does appear 
> in S3, but that never seems to be promoted to finished upon restart of the job
> 
> On 20 July 2017 at 11:41, Francisco Blaya <francisco.bl...@hivehome.com 
> <mailto:francisco.bl...@hivehome.com>> wrote:
> Hi,
> 
> Thanks for your answers.
> 
> @Fabian. I can see that Flink's consumer commits the offsets to Kafka, no 
> problem there. However I'd expect everything that gets read from Kafka to 
> appear in S3 at some point, even if the job gets stopped/killed before 
> flushing and then restarted. And that's what is not happening. So I see data 
> loss in S3.
> 
> @Sihua. I assume that the fact that the DataTimeBucketer is configured as the 
> sink of the stream means that its state gets snapshoted by Flink through the 
> checkpoint mechanism.
> 
> @Gordon. When I say acking I mean indeed committing the offset back to Kafka. 
> I agree with you, the problem seems to be related to the state snapshotting 
> of the bucketing sink, nothing to do with Kafka. Could you please clarify 
> what you mean with "events are considered as committed in bucketed sinks when 
> the Flink checkpoint it is part of is complete"? When you talk about 
> uncommitted events of the bucket state you mean events that haven't been 
> written to S3?
> 
> Cheers,
> Fran
> On 20 July 2017 at 07:29, Tzu-Li (Gordon) Tai <tzuli...@apache.org 
> <mailto:tzuli...@apache.org>> wrote:
> Hi,
> 
> What Fabian mentioned is true. Flink Kafka Consumer’s exactly-once guarantee 
> relies on offsets checkpoints as Flink state, and doesn’t rely on the 
> committed offsets in Kafka.
> 
>> What we found is that Flink acks Kafka immediately before even writing to S3.
> 
> 
> What you mean by ack here is the offset committing back to Kafka, correct?
> First of all, as mentioned, this behavior is not related to exactly-once. In 
> fact, in Flink 1.3, you can completely turn this off and still achieve 
> exactly-once (with Flink checkpointing enabled).
> One other thing to mention is that offsets are committed to Kafka when all 
> sinks in the job complete their state snapshot.
> 
>> What we found is that Flink acks Kafka immediately before even writing to 
>> S3. The consequence of this seems to be that if the job gets cancelled 
>> before the acked events are flushed to S3 then these are lost
> 
> 
> So, at a first look on your description, this seems like a problem with the 
> state snapshotting of the bucketing sink. This is suggesting that data is not 
> flushed to S3 properly when `snapshotState` of the bucketing sink returns. 
> I’m not entirely familiar with the bucketing sink, so this is just a 
> superficial guess from what you described.
> 
>> Flink doesn't seem to keep in its checkpointed state the fact that it acked 
>> those events but never flushed them to S3.
> 
> 
> Keep in mind that this is two separate states we’re talking about here. 1) 
> the offsets checkpointed as state of the Kafka consumer source, and 2) bucket 
> state (which should keep track of uncommitted events w.r.t. Flink’s 
> checkpoints; events are considered as committed in bucketed sinks when the 
> Flink checkpoint it is part of is complete). For details on this I recommend 
> checking out the class Javadoc of `BucketingSink`.
> 
> @Sihua
> the bucketing sink also manages bucket states to achieve exactly-once 
> semantics.
> 
> Cheers,
> Gordon
> 
> On 20 July 2017 at 10:46:52 AM, 周思华 (summerle...@163.com 
> <mailto:summerle...@163.com>) wrote:
> 
>> Hi Fran,
>> 
>> is the DataTimeBucketer acts like a memory buffer and does't managed by 
>> flink's state? If so, then i think the problem is not about Kafka, but about 
>> the DateTimeBucketer. Flink won't take snapshot for the DataTimeBucketer if 
>> it not in any state.
>> 
>> Best, 
>> Sihua Zhou
>> 
>> 
>> At 2017-07-20 03:02:20, "Fabian Hueske" <fhue...@gmail.com 
>> <mailto:fhue...@gmail.com>> wrote:
>> Hi Fran,
>> 
>> did you observe actual data loss due to the problem you are describing or 
>> are you discussing a possible issue based on your observations?
>> 
>> AFAIK, Flink's Kafka consumer keeps track of the offsets itself and includes 
>> these in the checkpoints. In case of a recovery, it does not rely on the 
>> offsets which were committed back to Kafka but only on the offsets it 
>> checkpointed itself.
>> Gordon (in CC) is familiar with all details of Flink's Kafka consumer and 
>> can give a more detailed answer.
>> 
>> Best, Fabian
>> 
>> 2017-07-19 16:55 GMT+02:00 Francisco Blaya <francisco.bl...@hivehome.com 
>> <mailto:francisco.bl...@hivehome.com>>:
>> Hi,
>> 
>> We have a Flink job running on AWS EMR sourcing a Kafka topic and persisting 
>> the events to S3 through a DateTimeBucketer. We configured the bucketer to 
>> flush to S3 with an inactivity period of 5 mins.The rate at which events are 
>> written to Kafka in the first place is very low so it is easy for us to 
>> investigate how the Flink job would recover in respect to Kafka offsets 
>> after the job gets cancelled or the Yarn session killed.
>> 
>> What we found is that Flink acks Kafka immediately before even writing to 
>> S3. The consequence of this seems to be that if the job gets cancelled 
>> before the acked events are flushed to S3 then these are lost, they don't 
>> get written when the job restarts. Flink doesn't seem to keep in its 
>> checkpointed state the fact that it acked those events but never flushed 
>> them to S3. Checkpoints are created every 5 seconds in S3.
>> 
>> We've also tried to configure externalized checkpoints throught 
>> "state.checkpoints.dir" configuration key and 
>> "env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)"
>>  in the job so that they don't automatically get cleaned up when the job 
>> gets cancelled or the Yarn session killed. We can see the job uses a 
>> restored checkpoint upon restart but still we get missing events in S3.
>> 
>> Has anyone come across this behaviour before? Are we assuming something 
>> wrong?
>> 
>> We're using EMR 5.4.0 and Flink 1.2.0.
>> 
>> Regards,
>> Fran
>> 
>> hivehome.com <http://www.hivehome.com/>
>> 
>> 
>> 
>> 
>> Hive | London | Cambridge | Houston | Toronto
>> The information contained in or attached to this email is confidential and 
>> intended only for the use of the individual(s) to which it is addressed. It 
>> may contain information which is confidential and/or covered by legal 
>> professional or other privilege. The views expressed in this email are not 
>> necessarily the views of Centrica plc, and the company, its directors, 
>> officers or employees make no representation or accept any liability for 
>> their accuracy or completeness unless expressly stated to the contrary. 
>> Centrica Connected Home Limited (company no: 5782908), registered in England 
>> and Wales with its registered office at Millstream, Maidenhead Road, 
>> Windsor, Berkshire SL4 5GD.
>> 
>> 
>> 
>>  
> 
> 
> 
> hivehome.com <http://www.hivehome.com/>
> 
> 
> 
> 
> Hive | London | Cambridge | Houston | Toronto
> The information contained in or attached to this email is confidential and 
> intended only for the use of the individual(s) to which it is addressed. It 
> may contain information which is confidential and/or covered by legal 
> professional or other privilege. The views expressed in this email are not 
> necessarily the views of Centrica plc, and the company, its directors, 
> officers or employees make no representation or accept any liability for 
> their accuracy or completeness unless expressly stated to the contrary. 
> Centrica Connected Home Limited (company no: 5782908), registered in England 
> and Wales with its registered office at Millstream, Maidenhead Road, Windsor, 
> Berkshire SL4 5GD.

Reply via email to