Ok cool. I will try to make my stored proc idempotent. So there no chance
that there's a checkpoint happens after the 5th record and the 5th record
is missed?

On Thu, 11 Jul 2019 at 05:20, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi John,
>
> let's say Flink performed a checkpoint after the 2nd record (by injecting
> a checkpoint marker into the data flow) and the sink fails on the 5th
> record.
> When Flink restarts the application, it resets the offset after the 2nd
> record (it will read the 3rd record first). Hence, the 3rd and 4th record
> will be emitted again.
>
> Best, Fabian
>
>
> Am Di., 9. Juli 2019 um 21:11 Uhr schrieb John Smith <
> java.dev....@gmail.com>:
>
>> Ok so when the sink fails on the 5th record then there's no chance that
>> the checkpoint can be at 6th event right?
>>
>> On Tue, 9 Jul 2019 at 13:51, Konstantin Knauf <konstan...@ververica.com>
>> wrote:
>>
>>> Hi John,
>>>
>>> this depends on your checkpoint interval. When enabled checkpoints are
>>> triggered periodically [1].
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html
>>>
>>>
>>>
>>> On Tue, Jul 9, 2019 at 7:30 PM John Smith <java.dev....@gmail.com>
>>> wrote:
>>>
>>>> Ok so just to be clear. Let's say we started at day 0...
>>>>
>>>> 1- Producer inserted 10 records into Kafka.
>>>> 2- Kafka Flink Consumer consumed 5 records.
>>>> 3- Some transformations applied to those records.
>>>> 4- 4 records sinked, 1 failed.
>>>> 5- Flink Job restarts because of above failure.
>>>>
>>>> When does the checkpoint happen above?
>>>> And does it mean in the above case that it will start back at 0 or will
>>>> it start at the 4th record and continue or where ever the checkpoint
>>>> happend. Example 3rd record?
>>>> My stored proc will be idempotent and I understand if messages get
>>>> replayed what to do.
>>>> Just want to try to understand when and where the checkpointing will
>>>> happen.
>>>>
>>>> On Mon, 8 Jul 2019 at 22:23, Rong Rong <walter...@gmail.com> wrote:
>>>>
>>>>> Hi John,
>>>>>
>>>>> I think what Konstantin is trying to say is: Flink's Kafka consumer
>>>>> does not start consuming from the Kafka commit offset when starting the
>>>>> consumer, it would actually start with the offset that's last checkpointed
>>>>> to external DFS. (e.g. the starting point of the consumer has no relevance
>>>>> with Kafka committed offset whatsoever - if checkpoint is enabled.)
>>>>>
>>>>> This is to quote:
>>>>> "*the Flink Kafka Consumer does only commit offsets back to Kafka on
>>>>> a best-effort basis after every checkpoint. Internally Flink "commits" the
>>>>> [checkpoints]->[current Kafka offset] as part of its periodic 
>>>>> checkpoints.*
>>>>> "
>>>>>
>>>>> However if you do not enable checkpointing, I think your consumer will
>>>>> by-default restart from the default kafka offset (which I think is your
>>>>> committed group offset).
>>>>>
>>>>> --
>>>>> Rong
>>>>>
>>>>>
>>>>> On Mon, Jul 8, 2019 at 6:39 AM John Smith <java.dev....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> So when we say a sink is at least once. It's because internally it's
>>>>>> not checking any kind of state and it sends what it has regardless,
>>>>>> correct? Cause I willl build a sink that calls stored procedures.
>>>>>>
>>>>>> On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, <
>>>>>> konstan...@ververica.com> wrote:
>>>>>>
>>>>>>> Hi John,
>>>>>>>
>>>>>>> in case of a failure (e.g. in the SQL Sink) the Flink Job will be
>>>>>>> restarted from the last checkpoint. This means the offset of all Kafka
>>>>>>> partitions will be reset to that point in the stream along with state of
>>>>>>> all operators. To enable checkpointing you need to call
>>>>>>> StreamExecutionEnvironment#enableCheckpointing(). If you using the
>>>>>>> JDBCSinkFunction (which is an at-least-once sink), the output will be
>>>>>>> duplicated in the case of failures.
>>>>>>>
>>>>>>> To answer your questions:
>>>>>>>
>>>>>>> * For this the FlinkKafkaConsumer handles the offsets manually (no
>>>>>>> auto-commit).
>>>>>>> * No, the Flink Kafka Consumer does only commit offsets back to
>>>>>>> Kafka on a best-effort basis after every checkpoint. Internally Flink
>>>>>>> "commits" the checkpoints as part of its periodic checkpoints.
>>>>>>> * Yes, along with all other events between the last checkpoint and
>>>>>>> the failure.
>>>>>>> * It will continue from the last checkpoint.
>>>>>>>
>>>>>>> Hope this helps.
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Konstantin
>>>>>>>
>>>>>>> On Fri, Jul 5, 2019 at 8:37 PM John Smith <java.dev....@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi using Apache Flink 1.8.0
>>>>>>>>
>>>>>>>> I'm consuming events from Kafka using nothing fancy...
>>>>>>>>
>>>>>>>> Properties props = new Properties();
>>>>>>>> props.setProperty("bootstrap.servers", kafkaAddress);
>>>>>>>> props.setProperty("group.id",kafkaGroup);
>>>>>>>>
>>>>>>>> FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, 
>>>>>>>> new SimpleStringSchema(),props);
>>>>>>>>
>>>>>>>>
>>>>>>>> Do some JSON transforms and then push to my SQL database using JDBC
>>>>>>>> and stored procedure. Let's assume the SQL sink fails.
>>>>>>>>
>>>>>>>> We know that Kafka can either periodically commit offsets or it can
>>>>>>>> be done manually based on consumers logic.
>>>>>>>>
>>>>>>>> - How is the source Kafka consumer offsets handled?
>>>>>>>> - Does the Flink Kafka consumer commit the offset to per
>>>>>>>> event/record?
>>>>>>>> - Will that single event that failed be retried?
>>>>>>>> - So if we had 5 incoming events and say on the 3rd one it failed,
>>>>>>>> will it continue on the 3rd or will the job restart and try those 5 
>>>>>>>> events.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>
>>>>>>> +49 160 91394525
>>>>>>>
>>>>>>>
>>>>>>> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Ververica GmbH
>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>>>>
>>>>>>
>>>
>>> --
>>>
>>> Konstantin Knauf | Solutions Architect
>>>
>>> +49 160 91394525
>>>
>>>
>>> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010
>>>
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>>
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>
>>

Reply via email to