Hi John, I think what Konstantin is trying to say is: Flink's Kafka consumer does not start consuming from the Kafka commit offset when starting the consumer, it would actually start with the offset that's last checkpointed to external DFS. (e.g. the starting point of the consumer has no relevance with Kafka committed offset whatsoever - if checkpoint is enabled.)
This is to quote: "*the Flink Kafka Consumer does only commit offsets back to Kafka on a best-effort basis after every checkpoint. Internally Flink "commits" the [checkpoints]->[current Kafka offset] as part of its periodic checkpoints.*" However if you do not enable checkpointing, I think your consumer will by-default restart from the default kafka offset (which I think is your committed group offset). -- Rong On Mon, Jul 8, 2019 at 6:39 AM John Smith <java.dev....@gmail.com> wrote: > So when we say a sink is at least once. It's because internally it's not > checking any kind of state and it sends what it has regardless, correct? > Cause I willl build a sink that calls stored procedures. > > On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, < > konstan...@ververica.com> wrote: > >> Hi John, >> >> in case of a failure (e.g. in the SQL Sink) the Flink Job will be >> restarted from the last checkpoint. This means the offset of all Kafka >> partitions will be reset to that point in the stream along with state of >> all operators. To enable checkpointing you need to call >> StreamExecutionEnvironment#enableCheckpointing(). If you using the >> JDBCSinkFunction (which is an at-least-once sink), the output will be >> duplicated in the case of failures. >> >> To answer your questions: >> >> * For this the FlinkKafkaConsumer handles the offsets manually (no >> auto-commit). >> * No, the Flink Kafka Consumer does only commit offsets back to Kafka on >> a best-effort basis after every checkpoint. Internally Flink "commits" the >> checkpoints as part of its periodic checkpoints. >> * Yes, along with all other events between the last checkpoint and the >> failure. >> * It will continue from the last checkpoint. >> >> Hope this helps. >> >> Cheers, >> >> Konstantin >> >> On Fri, Jul 5, 2019 at 8:37 PM John Smith <java.dev....@gmail.com> wrote: >> >>> Hi using Apache Flink 1.8.0 >>> >>> I'm consuming events from Kafka using nothing fancy... >>> >>> Properties props = new Properties(); >>> props.setProperty("bootstrap.servers", kafkaAddress); >>> props.setProperty("group.id",kafkaGroup); >>> >>> FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new >>> SimpleStringSchema(),props); >>> >>> >>> Do some JSON transforms and then push to my SQL database using JDBC and >>> stored procedure. Let's assume the SQL sink fails. >>> >>> We know that Kafka can either periodically commit offsets or it can be >>> done manually based on consumers logic. >>> >>> - How is the source Kafka consumer offsets handled? >>> - Does the Flink Kafka consumer commit the offset to per event/record? >>> - Will that single event that failed be retried? >>> - So if we had 5 incoming events and say on the 3rd one it failed, will >>> it continue on the 3rd or will the job restart and try those 5 events. >>> >>> >>> >> >> -- >> >> Konstantin Knauf | Solutions Architect >> >> +49 160 91394525 >> >> >> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010 >> >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >> >