Ok so when the sink fails on the 5th record then there's no chance that the checkpoint can be at 6th event right?
On Tue, 9 Jul 2019 at 13:51, Konstantin Knauf <konstan...@ververica.com> wrote: > Hi John, > > this depends on your checkpoint interval. When enabled checkpoints are > triggered periodically [1]. > > Cheers, > > Konstantin > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html > > > > On Tue, Jul 9, 2019 at 7:30 PM John Smith <java.dev....@gmail.com> wrote: > >> Ok so just to be clear. Let's say we started at day 0... >> >> 1- Producer inserted 10 records into Kafka. >> 2- Kafka Flink Consumer consumed 5 records. >> 3- Some transformations applied to those records. >> 4- 4 records sinked, 1 failed. >> 5- Flink Job restarts because of above failure. >> >> When does the checkpoint happen above? >> And does it mean in the above case that it will start back at 0 or will >> it start at the 4th record and continue or where ever the checkpoint >> happend. Example 3rd record? >> My stored proc will be idempotent and I understand if messages get >> replayed what to do. >> Just want to try to understand when and where the checkpointing will >> happen. >> >> On Mon, 8 Jul 2019 at 22:23, Rong Rong <walter...@gmail.com> wrote: >> >>> Hi John, >>> >>> I think what Konstantin is trying to say is: Flink's Kafka consumer does >>> not start consuming from the Kafka commit offset when starting the >>> consumer, it would actually start with the offset that's last checkpointed >>> to external DFS. (e.g. the starting point of the consumer has no relevance >>> with Kafka committed offset whatsoever - if checkpoint is enabled.) >>> >>> This is to quote: >>> "*the Flink Kafka Consumer does only commit offsets back to Kafka on a >>> best-effort basis after every checkpoint. Internally Flink "commits" the >>> [checkpoints]->[current Kafka offset] as part of its periodic checkpoints.* >>> " >>> >>> However if you do not enable checkpointing, I think your consumer will >>> by-default restart from the default kafka offset (which I think is your >>> committed group offset). >>> >>> -- >>> Rong >>> >>> >>> On Mon, Jul 8, 2019 at 6:39 AM John Smith <java.dev....@gmail.com> >>> wrote: >>> >>>> So when we say a sink is at least once. It's because internally it's >>>> not checking any kind of state and it sends what it has regardless, >>>> correct? Cause I willl build a sink that calls stored procedures. >>>> >>>> On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, < >>>> konstan...@ververica.com> wrote: >>>> >>>>> Hi John, >>>>> >>>>> in case of a failure (e.g. in the SQL Sink) the Flink Job will be >>>>> restarted from the last checkpoint. This means the offset of all Kafka >>>>> partitions will be reset to that point in the stream along with state of >>>>> all operators. To enable checkpointing you need to call >>>>> StreamExecutionEnvironment#enableCheckpointing(). If you using the >>>>> JDBCSinkFunction (which is an at-least-once sink), the output will be >>>>> duplicated in the case of failures. >>>>> >>>>> To answer your questions: >>>>> >>>>> * For this the FlinkKafkaConsumer handles the offsets manually (no >>>>> auto-commit). >>>>> * No, the Flink Kafka Consumer does only commit offsets back to Kafka >>>>> on a best-effort basis after every checkpoint. Internally Flink "commits" >>>>> the checkpoints as part of its periodic checkpoints. >>>>> * Yes, along with all other events between the last checkpoint and the >>>>> failure. >>>>> * It will continue from the last checkpoint. >>>>> >>>>> Hope this helps. >>>>> >>>>> Cheers, >>>>> >>>>> Konstantin >>>>> >>>>> On Fri, Jul 5, 2019 at 8:37 PM John Smith <java.dev....@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi using Apache Flink 1.8.0 >>>>>> >>>>>> I'm consuming events from Kafka using nothing fancy... >>>>>> >>>>>> Properties props = new Properties(); >>>>>> props.setProperty("bootstrap.servers", kafkaAddress); >>>>>> props.setProperty("group.id",kafkaGroup); >>>>>> >>>>>> FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, >>>>>> new SimpleStringSchema(),props); >>>>>> >>>>>> >>>>>> Do some JSON transforms and then push to my SQL database using JDBC >>>>>> and stored procedure. Let's assume the SQL sink fails. >>>>>> >>>>>> We know that Kafka can either periodically commit offsets or it can >>>>>> be done manually based on consumers logic. >>>>>> >>>>>> - How is the source Kafka consumer offsets handled? >>>>>> - Does the Flink Kafka consumer commit the offset to per event/record? >>>>>> - Will that single event that failed be retried? >>>>>> - So if we had 5 incoming events and say on the 3rd one it failed, >>>>>> will it continue on the 3rd or will the job restart and try those 5 >>>>>> events. >>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> >>>>> Konstantin Knauf | Solutions Architect >>>>> >>>>> +49 160 91394525 >>>>> >>>>> >>>>> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010 >>>>> >>>>> >>>>> -- >>>>> >>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>> >>>>> -- >>>>> >>>>> Ververica GmbH >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>>>> >>>> > > -- > > Konstantin Knauf | Solutions Architect > > +49 160 91394525 > > > Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010 > > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >