Ok cool. I will try to make my stored proc idempotent. So there no chance that there's a checkpoint happens after the 5th record and the 5th record is missed?
On Thu, 11 Jul 2019 at 05:20, Fabian Hueske <fhue...@gmail.com> wrote: > Hi John, > > let's say Flink performed a checkpoint after the 2nd record (by injecting > a checkpoint marker into the data flow) and the sink fails on the 5th > record. > When Flink restarts the application, it resets the offset after the 2nd > record (it will read the 3rd record first). Hence, the 3rd and 4th record > will be emitted again. > > Best, Fabian > > > Am Di., 9. Juli 2019 um 21:11 Uhr schrieb John Smith < > java.dev....@gmail.com>: > >> Ok so when the sink fails on the 5th record then there's no chance that >> the checkpoint can be at 6th event right? >> >> On Tue, 9 Jul 2019 at 13:51, Konstantin Knauf <konstan...@ververica.com> >> wrote: >> >>> Hi John, >>> >>> this depends on your checkpoint interval. When enabled checkpoints are >>> triggered periodically [1]. >>> >>> Cheers, >>> >>> Konstantin >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html >>> >>> >>> >>> On Tue, Jul 9, 2019 at 7:30 PM John Smith <java.dev....@gmail.com> >>> wrote: >>> >>>> Ok so just to be clear. Let's say we started at day 0... >>>> >>>> 1- Producer inserted 10 records into Kafka. >>>> 2- Kafka Flink Consumer consumed 5 records. >>>> 3- Some transformations applied to those records. >>>> 4- 4 records sinked, 1 failed. >>>> 5- Flink Job restarts because of above failure. >>>> >>>> When does the checkpoint happen above? >>>> And does it mean in the above case that it will start back at 0 or will >>>> it start at the 4th record and continue or where ever the checkpoint >>>> happend. Example 3rd record? >>>> My stored proc will be idempotent and I understand if messages get >>>> replayed what to do. >>>> Just want to try to understand when and where the checkpointing will >>>> happen. >>>> >>>> On Mon, 8 Jul 2019 at 22:23, Rong Rong <walter...@gmail.com> wrote: >>>> >>>>> Hi John, >>>>> >>>>> I think what Konstantin is trying to say is: Flink's Kafka consumer >>>>> does not start consuming from the Kafka commit offset when starting the >>>>> consumer, it would actually start with the offset that's last checkpointed >>>>> to external DFS. (e.g. the starting point of the consumer has no relevance >>>>> with Kafka committed offset whatsoever - if checkpoint is enabled.) >>>>> >>>>> This is to quote: >>>>> "*the Flink Kafka Consumer does only commit offsets back to Kafka on >>>>> a best-effort basis after every checkpoint. Internally Flink "commits" the >>>>> [checkpoints]->[current Kafka offset] as part of its periodic >>>>> checkpoints.* >>>>> " >>>>> >>>>> However if you do not enable checkpointing, I think your consumer will >>>>> by-default restart from the default kafka offset (which I think is your >>>>> committed group offset). >>>>> >>>>> -- >>>>> Rong >>>>> >>>>> >>>>> On Mon, Jul 8, 2019 at 6:39 AM John Smith <java.dev....@gmail.com> >>>>> wrote: >>>>> >>>>>> So when we say a sink is at least once. It's because internally it's >>>>>> not checking any kind of state and it sends what it has regardless, >>>>>> correct? Cause I willl build a sink that calls stored procedures. >>>>>> >>>>>> On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, < >>>>>> konstan...@ververica.com> wrote: >>>>>> >>>>>>> Hi John, >>>>>>> >>>>>>> in case of a failure (e.g. in the SQL Sink) the Flink Job will be >>>>>>> restarted from the last checkpoint. This means the offset of all Kafka >>>>>>> partitions will be reset to that point in the stream along with state of >>>>>>> all operators. To enable checkpointing you need to call >>>>>>> StreamExecutionEnvironment#enableCheckpointing(). If you using the >>>>>>> JDBCSinkFunction (which is an at-least-once sink), the output will be >>>>>>> duplicated in the case of failures. >>>>>>> >>>>>>> To answer your questions: >>>>>>> >>>>>>> * For this the FlinkKafkaConsumer handles the offsets manually (no >>>>>>> auto-commit). >>>>>>> * No, the Flink Kafka Consumer does only commit offsets back to >>>>>>> Kafka on a best-effort basis after every checkpoint. Internally Flink >>>>>>> "commits" the checkpoints as part of its periodic checkpoints. >>>>>>> * Yes, along with all other events between the last checkpoint and >>>>>>> the failure. >>>>>>> * It will continue from the last checkpoint. >>>>>>> >>>>>>> Hope this helps. >>>>>>> >>>>>>> Cheers, >>>>>>> >>>>>>> Konstantin >>>>>>> >>>>>>> On Fri, Jul 5, 2019 at 8:37 PM John Smith <java.dev....@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi using Apache Flink 1.8.0 >>>>>>>> >>>>>>>> I'm consuming events from Kafka using nothing fancy... >>>>>>>> >>>>>>>> Properties props = new Properties(); >>>>>>>> props.setProperty("bootstrap.servers", kafkaAddress); >>>>>>>> props.setProperty("group.id",kafkaGroup); >>>>>>>> >>>>>>>> FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, >>>>>>>> new SimpleStringSchema(),props); >>>>>>>> >>>>>>>> >>>>>>>> Do some JSON transforms and then push to my SQL database using JDBC >>>>>>>> and stored procedure. Let's assume the SQL sink fails. >>>>>>>> >>>>>>>> We know that Kafka can either periodically commit offsets or it can >>>>>>>> be done manually based on consumers logic. >>>>>>>> >>>>>>>> - How is the source Kafka consumer offsets handled? >>>>>>>> - Does the Flink Kafka consumer commit the offset to per >>>>>>>> event/record? >>>>>>>> - Will that single event that failed be retried? >>>>>>>> - So if we had 5 incoming events and say on the 3rd one it failed, >>>>>>>> will it continue on the 3rd or will the job restart and try those 5 >>>>>>>> events. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Konstantin Knauf | Solutions Architect >>>>>>> >>>>>>> +49 160 91394525 >>>>>>> >>>>>>> >>>>>>> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010 >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Ververica GmbH >>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>>>>>> >>>>>> >>> >>> -- >>> >>> Konstantin Knauf | Solutions Architect >>> >>> +49 160 91394525 >>> >>> >>> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010 >>> >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>> >>