Ok so just to be clear. Let's say we started at day 0... 1- Producer inserted 10 records into Kafka. 2- Kafka Flink Consumer consumed 5 records. 3- Some transformations applied to those records. 4- 4 records sinked, 1 failed. 5- Flink Job restarts because of above failure.
When does the checkpoint happen above? And does it mean in the above case that it will start back at 0 or will it start at the 4th record and continue or where ever the checkpoint happend. Example 3rd record? My stored proc will be idempotent and I understand if messages get replayed what to do. Just want to try to understand when and where the checkpointing will happen. On Mon, 8 Jul 2019 at 22:23, Rong Rong <walter...@gmail.com> wrote: > Hi John, > > I think what Konstantin is trying to say is: Flink's Kafka consumer does > not start consuming from the Kafka commit offset when starting the > consumer, it would actually start with the offset that's last checkpointed > to external DFS. (e.g. the starting point of the consumer has no relevance > with Kafka committed offset whatsoever - if checkpoint is enabled.) > > This is to quote: > "*the Flink Kafka Consumer does only commit offsets back to Kafka on a > best-effort basis after every checkpoint. Internally Flink "commits" the > [checkpoints]->[current Kafka offset] as part of its periodic checkpoints.* > " > > However if you do not enable checkpointing, I think your consumer will > by-default restart from the default kafka offset (which I think is your > committed group offset). > > -- > Rong > > > On Mon, Jul 8, 2019 at 6:39 AM John Smith <java.dev....@gmail.com> wrote: > >> So when we say a sink is at least once. It's because internally it's not >> checking any kind of state and it sends what it has regardless, correct? >> Cause I willl build a sink that calls stored procedures. >> >> On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, < >> konstan...@ververica.com> wrote: >> >>> Hi John, >>> >>> in case of a failure (e.g. in the SQL Sink) the Flink Job will be >>> restarted from the last checkpoint. This means the offset of all Kafka >>> partitions will be reset to that point in the stream along with state of >>> all operators. To enable checkpointing you need to call >>> StreamExecutionEnvironment#enableCheckpointing(). If you using the >>> JDBCSinkFunction (which is an at-least-once sink), the output will be >>> duplicated in the case of failures. >>> >>> To answer your questions: >>> >>> * For this the FlinkKafkaConsumer handles the offsets manually (no >>> auto-commit). >>> * No, the Flink Kafka Consumer does only commit offsets back to Kafka on >>> a best-effort basis after every checkpoint. Internally Flink "commits" the >>> checkpoints as part of its periodic checkpoints. >>> * Yes, along with all other events between the last checkpoint and the >>> failure. >>> * It will continue from the last checkpoint. >>> >>> Hope this helps. >>> >>> Cheers, >>> >>> Konstantin >>> >>> On Fri, Jul 5, 2019 at 8:37 PM John Smith <java.dev....@gmail.com> >>> wrote: >>> >>>> Hi using Apache Flink 1.8.0 >>>> >>>> I'm consuming events from Kafka using nothing fancy... >>>> >>>> Properties props = new Properties(); >>>> props.setProperty("bootstrap.servers", kafkaAddress); >>>> props.setProperty("group.id",kafkaGroup); >>>> >>>> FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new >>>> SimpleStringSchema(),props); >>>> >>>> >>>> Do some JSON transforms and then push to my SQL database using JDBC and >>>> stored procedure. Let's assume the SQL sink fails. >>>> >>>> We know that Kafka can either periodically commit offsets or it can be >>>> done manually based on consumers logic. >>>> >>>> - How is the source Kafka consumer offsets handled? >>>> - Does the Flink Kafka consumer commit the offset to per event/record? >>>> - Will that single event that failed be retried? >>>> - So if we had 5 incoming events and say on the 3rd one it failed, will >>>> it continue on the 3rd or will the job restart and try those 5 events. >>>> >>>> >>>> >>> >>> -- >>> >>> Konstantin Knauf | Solutions Architect >>> >>> +49 160 91394525 >>> >>> >>> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010 >>> >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>> >>