You said you cancel and restart the job. How do you then restart the Job? From a savepoint or externalised checkpoint? Do you also see missing data when using an externalised checkpoint or a savepoint?
Best, Aljoscha > On 20. Jul 2017, at 16:15, Francisco Blaya <francisco.bl...@hivehome.com> > wrote: > > Forgot to add that when a job gets cancelled via the UI (this is not the case > when the Yarn session is killed) a part file ending in ".pending" does appear > in S3, but that never seems to be promoted to finished upon restart of the job > > On 20 July 2017 at 11:41, Francisco Blaya <francisco.bl...@hivehome.com > <mailto:francisco.bl...@hivehome.com>> wrote: > Hi, > > Thanks for your answers. > > @Fabian. I can see that Flink's consumer commits the offsets to Kafka, no > problem there. However I'd expect everything that gets read from Kafka to > appear in S3 at some point, even if the job gets stopped/killed before > flushing and then restarted. And that's what is not happening. So I see data > loss in S3. > > @Sihua. I assume that the fact that the DataTimeBucketer is configured as the > sink of the stream means that its state gets snapshoted by Flink through the > checkpoint mechanism. > > @Gordon. When I say acking I mean indeed committing the offset back to Kafka. > I agree with you, the problem seems to be related to the state snapshotting > of the bucketing sink, nothing to do with Kafka. Could you please clarify > what you mean with "events are considered as committed in bucketed sinks when > the Flink checkpoint it is part of is complete"? When you talk about > uncommitted events of the bucket state you mean events that haven't been > written to S3? > > Cheers, > Fran > On 20 July 2017 at 07:29, Tzu-Li (Gordon) Tai <tzuli...@apache.org > <mailto:tzuli...@apache.org>> wrote: > Hi, > > What Fabian mentioned is true. Flink Kafka Consumer’s exactly-once guarantee > relies on offsets checkpoints as Flink state, and doesn’t rely on the > committed offsets in Kafka. > >> What we found is that Flink acks Kafka immediately before even writing to S3. > > > What you mean by ack here is the offset committing back to Kafka, correct? > First of all, as mentioned, this behavior is not related to exactly-once. In > fact, in Flink 1.3, you can completely turn this off and still achieve > exactly-once (with Flink checkpointing enabled). > One other thing to mention is that offsets are committed to Kafka when all > sinks in the job complete their state snapshot. > >> What we found is that Flink acks Kafka immediately before even writing to >> S3. The consequence of this seems to be that if the job gets cancelled >> before the acked events are flushed to S3 then these are lost > > > So, at a first look on your description, this seems like a problem with the > state snapshotting of the bucketing sink. This is suggesting that data is not > flushed to S3 properly when `snapshotState` of the bucketing sink returns. > I’m not entirely familiar with the bucketing sink, so this is just a > superficial guess from what you described. > >> Flink doesn't seem to keep in its checkpointed state the fact that it acked >> those events but never flushed them to S3. > > > Keep in mind that this is two separate states we’re talking about here. 1) > the offsets checkpointed as state of the Kafka consumer source, and 2) bucket > state (which should keep track of uncommitted events w.r.t. Flink’s > checkpoints; events are considered as committed in bucketed sinks when the > Flink checkpoint it is part of is complete). For details on this I recommend > checking out the class Javadoc of `BucketingSink`. > > @Sihua > the bucketing sink also manages bucket states to achieve exactly-once > semantics. > > Cheers, > Gordon > > On 20 July 2017 at 10:46:52 AM, 周思华 (summerle...@163.com > <mailto:summerle...@163.com>) wrote: > >> Hi Fran, >> >> is the DataTimeBucketer acts like a memory buffer and does't managed by >> flink's state? If so, then i think the problem is not about Kafka, but about >> the DateTimeBucketer. Flink won't take snapshot for the DataTimeBucketer if >> it not in any state. >> >> Best, >> Sihua Zhou >> >> >> At 2017-07-20 03:02:20, "Fabian Hueske" <fhue...@gmail.com >> <mailto:fhue...@gmail.com>> wrote: >> Hi Fran, >> >> did you observe actual data loss due to the problem you are describing or >> are you discussing a possible issue based on your observations? >> >> AFAIK, Flink's Kafka consumer keeps track of the offsets itself and includes >> these in the checkpoints. In case of a recovery, it does not rely on the >> offsets which were committed back to Kafka but only on the offsets it >> checkpointed itself. >> Gordon (in CC) is familiar with all details of Flink's Kafka consumer and >> can give a more detailed answer. >> >> Best, Fabian >> >> 2017-07-19 16:55 GMT+02:00 Francisco Blaya <francisco.bl...@hivehome.com >> <mailto:francisco.bl...@hivehome.com>>: >> Hi, >> >> We have a Flink job running on AWS EMR sourcing a Kafka topic and persisting >> the events to S3 through a DateTimeBucketer. We configured the bucketer to >> flush to S3 with an inactivity period of 5 mins.The rate at which events are >> written to Kafka in the first place is very low so it is easy for us to >> investigate how the Flink job would recover in respect to Kafka offsets >> after the job gets cancelled or the Yarn session killed. >> >> What we found is that Flink acks Kafka immediately before even writing to >> S3. The consequence of this seems to be that if the job gets cancelled >> before the acked events are flushed to S3 then these are lost, they don't >> get written when the job restarts. Flink doesn't seem to keep in its >> checkpointed state the fact that it acked those events but never flushed >> them to S3. Checkpoints are created every 5 seconds in S3. >> >> We've also tried to configure externalized checkpoints throught >> "state.checkpoints.dir" configuration key and >> "env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)" >> in the job so that they don't automatically get cleaned up when the job >> gets cancelled or the Yarn session killed. We can see the job uses a >> restored checkpoint upon restart but still we get missing events in S3. >> >> Has anyone come across this behaviour before? Are we assuming something >> wrong? >> >> We're using EMR 5.4.0 and Flink 1.2.0. >> >> Regards, >> Fran >> >> hivehome.com <http://www.hivehome.com/> >> >> >> >> >> Hive | London | Cambridge | Houston | Toronto >> The information contained in or attached to this email is confidential and >> intended only for the use of the individual(s) to which it is addressed. It >> may contain information which is confidential and/or covered by legal >> professional or other privilege. The views expressed in this email are not >> necessarily the views of Centrica plc, and the company, its directors, >> officers or employees make no representation or accept any liability for >> their accuracy or completeness unless expressly stated to the contrary. >> Centrica Connected Home Limited (company no: 5782908), registered in England >> and Wales with its registered office at Millstream, Maidenhead Road, >> Windsor, Berkshire SL4 5GD. >> >> >> >> > > > > hivehome.com <http://www.hivehome.com/> > > > > > Hive | London | Cambridge | Houston | Toronto > The information contained in or attached to this email is confidential and > intended only for the use of the individual(s) to which it is addressed. It > may contain information which is confidential and/or covered by legal > professional or other privilege. The views expressed in this email are not > necessarily the views of Centrica plc, and the company, its directors, > officers or employees make no representation or accept any liability for > their accuracy or completeness unless expressly stated to the contrary. > Centrica Connected Home Limited (company no: 5782908), registered in England > and Wales with its registered office at Millstream, Maidenhead Road, Windsor, > Berkshire SL4 5GD.