Hi John, let's say Flink performed a checkpoint after the 2nd record (by injecting a checkpoint marker into the data flow) and the sink fails on the 5th record. When Flink restarts the application, it resets the offset after the 2nd record (it will read the 3rd record first). Hence, the 3rd and 4th record will be emitted again.
Best, Fabian Am Di., 9. Juli 2019 um 21:11 Uhr schrieb John Smith <java.dev....@gmail.com >: > Ok so when the sink fails on the 5th record then there's no chance that > the checkpoint can be at 6th event right? > > On Tue, 9 Jul 2019 at 13:51, Konstantin Knauf <konstan...@ververica.com> > wrote: > >> Hi John, >> >> this depends on your checkpoint interval. When enabled checkpoints are >> triggered periodically [1]. >> >> Cheers, >> >> Konstantin >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html >> >> >> >> On Tue, Jul 9, 2019 at 7:30 PM John Smith <java.dev....@gmail.com> wrote: >> >>> Ok so just to be clear. Let's say we started at day 0... >>> >>> 1- Producer inserted 10 records into Kafka. >>> 2- Kafka Flink Consumer consumed 5 records. >>> 3- Some transformations applied to those records. >>> 4- 4 records sinked, 1 failed. >>> 5- Flink Job restarts because of above failure. >>> >>> When does the checkpoint happen above? >>> And does it mean in the above case that it will start back at 0 or will >>> it start at the 4th record and continue or where ever the checkpoint >>> happend. Example 3rd record? >>> My stored proc will be idempotent and I understand if messages get >>> replayed what to do. >>> Just want to try to understand when and where the checkpointing will >>> happen. >>> >>> On Mon, 8 Jul 2019 at 22:23, Rong Rong <walter...@gmail.com> wrote: >>> >>>> Hi John, >>>> >>>> I think what Konstantin is trying to say is: Flink's Kafka consumer >>>> does not start consuming from the Kafka commit offset when starting the >>>> consumer, it would actually start with the offset that's last checkpointed >>>> to external DFS. (e.g. the starting point of the consumer has no relevance >>>> with Kafka committed offset whatsoever - if checkpoint is enabled.) >>>> >>>> This is to quote: >>>> "*the Flink Kafka Consumer does only commit offsets back to Kafka on a >>>> best-effort basis after every checkpoint. Internally Flink "commits" the >>>> [checkpoints]->[current Kafka offset] as part of its periodic checkpoints.* >>>> " >>>> >>>> However if you do not enable checkpointing, I think your consumer will >>>> by-default restart from the default kafka offset (which I think is your >>>> committed group offset). >>>> >>>> -- >>>> Rong >>>> >>>> >>>> On Mon, Jul 8, 2019 at 6:39 AM John Smith <java.dev....@gmail.com> >>>> wrote: >>>> >>>>> So when we say a sink is at least once. It's because internally it's >>>>> not checking any kind of state and it sends what it has regardless, >>>>> correct? Cause I willl build a sink that calls stored procedures. >>>>> >>>>> On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, < >>>>> konstan...@ververica.com> wrote: >>>>> >>>>>> Hi John, >>>>>> >>>>>> in case of a failure (e.g. in the SQL Sink) the Flink Job will be >>>>>> restarted from the last checkpoint. This means the offset of all Kafka >>>>>> partitions will be reset to that point in the stream along with state of >>>>>> all operators. To enable checkpointing you need to call >>>>>> StreamExecutionEnvironment#enableCheckpointing(). If you using the >>>>>> JDBCSinkFunction (which is an at-least-once sink), the output will be >>>>>> duplicated in the case of failures. >>>>>> >>>>>> To answer your questions: >>>>>> >>>>>> * For this the FlinkKafkaConsumer handles the offsets manually (no >>>>>> auto-commit). >>>>>> * No, the Flink Kafka Consumer does only commit offsets back to Kafka >>>>>> on a best-effort basis after every checkpoint. Internally Flink "commits" >>>>>> the checkpoints as part of its periodic checkpoints. >>>>>> * Yes, along with all other events between the last checkpoint and >>>>>> the failure. >>>>>> * It will continue from the last checkpoint. >>>>>> >>>>>> Hope this helps. >>>>>> >>>>>> Cheers, >>>>>> >>>>>> Konstantin >>>>>> >>>>>> On Fri, Jul 5, 2019 at 8:37 PM John Smith <java.dev....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi using Apache Flink 1.8.0 >>>>>>> >>>>>>> I'm consuming events from Kafka using nothing fancy... >>>>>>> >>>>>>> Properties props = new Properties(); >>>>>>> props.setProperty("bootstrap.servers", kafkaAddress); >>>>>>> props.setProperty("group.id",kafkaGroup); >>>>>>> >>>>>>> FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, >>>>>>> new SimpleStringSchema(),props); >>>>>>> >>>>>>> >>>>>>> Do some JSON transforms and then push to my SQL database using JDBC >>>>>>> and stored procedure. Let's assume the SQL sink fails. >>>>>>> >>>>>>> We know that Kafka can either periodically commit offsets or it can >>>>>>> be done manually based on consumers logic. >>>>>>> >>>>>>> - How is the source Kafka consumer offsets handled? >>>>>>> - Does the Flink Kafka consumer commit the offset to per >>>>>>> event/record? >>>>>>> - Will that single event that failed be retried? >>>>>>> - So if we had 5 incoming events and say on the 3rd one it failed, >>>>>>> will it continue on the 3rd or will the job restart and try those 5 >>>>>>> events. >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Konstantin Knauf | Solutions Architect >>>>>> >>>>>> +49 160 91394525 >>>>>> >>>>>> >>>>>> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010 >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>>> >>>>>> -- >>>>>> >>>>>> Ververica GmbH >>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>>>>> >>>>> >> >> -- >> >> Konstantin Knauf | Solutions Architect >> >> +49 160 91394525 >> >> >> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010 >> >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >> >