Hi, Sorry for getting back to this so late but I think the underlying problem is that S3 does not behave as expected by the Bucketing Sink. See this message by Stephan on the topic: https://lists.apache.org/thread.html/34b8ede3affb965c7b5ec1e404918b39c282c258809dfd7a6c257a61@%3Cuser.flink.apache.org%3E <https://lists.apache.org/thread.html/34b8ede3affb965c7b5ec1e404918b39c282c258809dfd7a6c257a61@%3Cuser.flink.apache.org%3E>
In there he also mentions a PR that we're trying to get in that should improve working with S3 as a sink. Best, Aljoscha > On 21. Jul 2017, at 16:04, Francisco Blaya <francisco.bl...@hivehome.com> > wrote: > > Hi Aljoscha, > > I've tried both. When we restart from a manually created savepoint we see > "restored from savepoint" in the Flink Dashboard. If we restart from > externalized checkpoint we see "restored from checkpoint". In both scenarios > we lose data in S3. > > Cheers, > Fran > > On 20 July 2017 at 17:54, Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > You said you cancel and restart the job. How do you then restart the Job? > From a savepoint or externalised checkpoint? Do you also see missing data > when using an externalised checkpoint or a savepoint? > > Best, > Aljoscha > >> On 20. Jul 2017, at 16:15, Francisco Blaya <francisco.bl...@hivehome.com >> <mailto:francisco.bl...@hivehome.com>> wrote: >> >> Forgot to add that when a job gets cancelled via the UI (this is not the >> case when the Yarn session is killed) a part file ending in ".pending" does >> appear in S3, but that never seems to be promoted to finished upon restart >> of the job >> >> On 20 July 2017 at 11:41, Francisco Blaya <francisco.bl...@hivehome.com >> <mailto:francisco.bl...@hivehome.com>> wrote: >> Hi, >> >> Thanks for your answers. >> >> @Fabian. I can see that Flink's consumer commits the offsets to Kafka, no >> problem there. However I'd expect everything that gets read from Kafka to >> appear in S3 at some point, even if the job gets stopped/killed before >> flushing and then restarted. And that's what is not happening. So I see data >> loss in S3. >> >> @Sihua. I assume that the fact that the DataTimeBucketer is configured as >> the sink of the stream means that its state gets snapshoted by Flink through >> the checkpoint mechanism. >> >> @Gordon. When I say acking I mean indeed committing the offset back to >> Kafka. I agree with you, the problem seems to be related to the state >> snapshotting of the bucketing sink, nothing to do with Kafka. Could you >> please clarify what you mean with "events are considered as committed in >> bucketed sinks when the Flink checkpoint it is part of is complete"? When >> you talk about uncommitted events of the bucket state you mean events that >> haven't been written to S3? >> >> Cheers, >> Fran >> On 20 July 2017 at 07:29, Tzu-Li (Gordon) Tai <tzuli...@apache.org >> <mailto:tzuli...@apache.org>> wrote: >> Hi, >> >> What Fabian mentioned is true. Flink Kafka Consumer’s exactly-once guarantee >> relies on offsets checkpoints as Flink state, and doesn’t rely on the >> committed offsets in Kafka. >> >>> What we found is that Flink acks Kafka immediately before even writing to >>> S3. >> >> >> What you mean by ack here is the offset committing back to Kafka, correct? >> First of all, as mentioned, this behavior is not related to exactly-once. In >> fact, in Flink 1.3, you can completely turn this off and still achieve >> exactly-once (with Flink checkpointing enabled). >> One other thing to mention is that offsets are committed to Kafka when all >> sinks in the job complete their state snapshot. >> >>> What we found is that Flink acks Kafka immediately before even writing to >>> S3. The consequence of this seems to be that if the job gets cancelled >>> before the acked events are flushed to S3 then these are lost >> >> >> So, at a first look on your description, this seems like a problem with the >> state snapshotting of the bucketing sink. This is suggesting that data is >> not flushed to S3 properly when `snapshotState` of the bucketing sink >> returns. I’m not entirely familiar with the bucketing sink, so this is just >> a superficial guess from what you described. >> >>> Flink doesn't seem to keep in its checkpointed state the fact that it acked >>> those events but never flushed them to S3. >> >> >> Keep in mind that this is two separate states we’re talking about here. 1) >> the offsets checkpointed as state of the Kafka consumer source, and 2) >> bucket state (which should keep track of uncommitted events w.r.t. Flink’s >> checkpoints; events are considered as committed in bucketed sinks when the >> Flink checkpoint it is part of is complete). For details on this I recommend >> checking out the class Javadoc of `BucketingSink`. >> >> @Sihua >> the bucketing sink also manages bucket states to achieve exactly-once >> semantics. >> >> Cheers, >> Gordon >> >> On 20 July 2017 at 10:46:52 AM, 周思华 (summerle...@163.com >> <mailto:summerle...@163.com>) wrote: >> >>> Hi Fran, >>> >>> is the DataTimeBucketer acts like a memory buffer and does't managed by >>> flink's state? If so, then i think the problem is not about Kafka, but >>> about the DateTimeBucketer. Flink won't take snapshot for the >>> DataTimeBucketer if it not in any state. >>> >>> Best, >>> Sihua Zhou >>> >>> >>> At 2017-07-20 03:02:20, "Fabian Hueske" <fhue...@gmail.com >>> <mailto:fhue...@gmail.com>> wrote: >>> Hi Fran, >>> >>> did you observe actual data loss due to the problem you are describing or >>> are you discussing a possible issue based on your observations? >>> >>> AFAIK, Flink's Kafka consumer keeps track of the offsets itself and >>> includes these in the checkpoints. In case of a recovery, it does not rely >>> on the offsets which were committed back to Kafka but only on the offsets >>> it checkpointed itself. >>> Gordon (in CC) is familiar with all details of Flink's Kafka consumer and >>> can give a more detailed answer. >>> >>> Best, Fabian >>> >>> 2017-07-19 16:55 GMT+02:00 Francisco Blaya <francisco.bl...@hivehome.com >>> <mailto:francisco.bl...@hivehome.com>>: >>> Hi, >>> >>> We have a Flink job running on AWS EMR sourcing a Kafka topic and >>> persisting the events to S3 through a DateTimeBucketer. We configured the >>> bucketer to flush to S3 with an inactivity period of 5 mins.The rate at >>> which events are written to Kafka in the first place is very low so it is >>> easy for us to investigate how the Flink job would recover in respect to >>> Kafka offsets after the job gets cancelled or the Yarn session killed. >>> >>> What we found is that Flink acks Kafka immediately before even writing to >>> S3. The consequence of this seems to be that if the job gets cancelled >>> before the acked events are flushed to S3 then these are lost, they don't >>> get written when the job restarts. Flink doesn't seem to keep in its >>> checkpointed state the fact that it acked those events but never flushed >>> them to S3. Checkpoints are created every 5 seconds in S3. >>> >>> We've also tried to configure externalized checkpoints throught >>> "state.checkpoints.dir" configuration key and >>> "env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)" >>> in the job so that they don't automatically get cleaned up when the job >>> gets cancelled or the Yarn session killed. We can see the job uses a >>> restored checkpoint upon restart but still we get missing events in S3. >>> >>> Has anyone come across this behaviour before? Are we assuming something >>> wrong? >>> >>> We're using EMR 5.4.0 and Flink 1.2.0. >>> >>> Regards, >>> Fran >>> >>> hivehome.com <http://www.hivehome.com/> >>> >>> >>> >>> >>> Hive | London | Cambridge | Houston | Toronto >>> The information contained in or attached to this email is confidential and >>> intended only for the use of the individual(s) to which it is addressed. It >>> may contain information which is confidential and/or covered by legal >>> professional or other privilege. The views expressed in this email are not >>> necessarily the views of Centrica plc, and the company, its directors, >>> officers or employees make no representation or accept any liability for >>> their accuracy or completeness unless expressly stated to the contrary. >>> Centrica Connected Home Limited (company no: 5782908), registered in >>> England and Wales with its registered office at Millstream, Maidenhead >>> Road, Windsor, Berkshire SL4 5GD. >>> >>> >>> >>> >> >> >> >> hivehome.com <http://www.hivehome.com/> >> >> >> >> >> Hive | London | Cambridge | Houston | Toronto >> The information contained in or attached to this email is confidential and >> intended only for the use of the individual(s) to which it is addressed. It >> may contain information which is confidential and/or covered by legal >> professional or other privilege. The views expressed in this email are not >> necessarily the views of Centrica plc, and the company, its directors, >> officers or employees make no representation or accept any liability for >> their accuracy or completeness unless expressly stated to the contrary. >> Centrica Connected Home Limited (company no: 5782908), registered in England >> and Wales with its registered office at Millstream, Maidenhead Road, >> Windsor, Berkshire SL4 5GD. > > > > hivehome.com <http://www.hivehome.com/> > > > > > Hive | London | Cambridge | Houston | Toronto > The information contained in or attached to this email is confidential and > intended only for the use of the individual(s) to which it is addressed. It > may contain information which is confidential and/or covered by legal > professional or other privilege. The views expressed in this email are not > necessarily the views of Centrica plc, and the company, its directors, > officers or employees make no representation or accept any liability for > their accuracy or completeness unless expressly stated to the contrary. > Centrica Connected Home Limited (company no: 5782908), registered in England > and Wales with its registered office at Millstream, Maidenhead Road, Windsor, > Berkshire SL4 5GD.