Ok so when the sink fails on the 5th record then there's no chance that the
checkpoint can be at 6th event right?

On Tue, 9 Jul 2019 at 13:51, Konstantin Knauf <konstan...@ververica.com>
wrote:

> Hi John,
>
> this depends on your checkpoint interval. When enabled checkpoints are
> triggered periodically [1].
>
> Cheers,
>
> Konstantin
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html
>
>
>
> On Tue, Jul 9, 2019 at 7:30 PM John Smith <java.dev....@gmail.com> wrote:
>
>> Ok so just to be clear. Let's say we started at day 0...
>>
>> 1- Producer inserted 10 records into Kafka.
>> 2- Kafka Flink Consumer consumed 5 records.
>> 3- Some transformations applied to those records.
>> 4- 4 records sinked, 1 failed.
>> 5- Flink Job restarts because of above failure.
>>
>> When does the checkpoint happen above?
>> And does it mean in the above case that it will start back at 0 or will
>> it start at the 4th record and continue or where ever the checkpoint
>> happend. Example 3rd record?
>> My stored proc will be idempotent and I understand if messages get
>> replayed what to do.
>> Just want to try to understand when and where the checkpointing will
>> happen.
>>
>> On Mon, 8 Jul 2019 at 22:23, Rong Rong <walter...@gmail.com> wrote:
>>
>>> Hi John,
>>>
>>> I think what Konstantin is trying to say is: Flink's Kafka consumer does
>>> not start consuming from the Kafka commit offset when starting the
>>> consumer, it would actually start with the offset that's last checkpointed
>>> to external DFS. (e.g. the starting point of the consumer has no relevance
>>> with Kafka committed offset whatsoever - if checkpoint is enabled.)
>>>
>>> This is to quote:
>>> "*the Flink Kafka Consumer does only commit offsets back to Kafka on a
>>> best-effort basis after every checkpoint. Internally Flink "commits" the
>>> [checkpoints]->[current Kafka offset] as part of its periodic checkpoints.*
>>> "
>>>
>>> However if you do not enable checkpointing, I think your consumer will
>>> by-default restart from the default kafka offset (which I think is your
>>> committed group offset).
>>>
>>> --
>>> Rong
>>>
>>>
>>> On Mon, Jul 8, 2019 at 6:39 AM John Smith <java.dev....@gmail.com>
>>> wrote:
>>>
>>>> So when we say a sink is at least once. It's because internally it's
>>>> not checking any kind of state and it sends what it has regardless,
>>>> correct? Cause I willl build a sink that calls stored procedures.
>>>>
>>>> On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, <
>>>> konstan...@ververica.com> wrote:
>>>>
>>>>> Hi John,
>>>>>
>>>>> in case of a failure (e.g. in the SQL Sink) the Flink Job will be
>>>>> restarted from the last checkpoint. This means the offset of all Kafka
>>>>> partitions will be reset to that point in the stream along with state of
>>>>> all operators. To enable checkpointing you need to call
>>>>> StreamExecutionEnvironment#enableCheckpointing(). If you using the
>>>>> JDBCSinkFunction (which is an at-least-once sink), the output will be
>>>>> duplicated in the case of failures.
>>>>>
>>>>> To answer your questions:
>>>>>
>>>>> * For this the FlinkKafkaConsumer handles the offsets manually (no
>>>>> auto-commit).
>>>>> * No, the Flink Kafka Consumer does only commit offsets back to Kafka
>>>>> on a best-effort basis after every checkpoint. Internally Flink "commits"
>>>>> the checkpoints as part of its periodic checkpoints.
>>>>> * Yes, along with all other events between the last checkpoint and the
>>>>> failure.
>>>>> * It will continue from the last checkpoint.
>>>>>
>>>>> Hope this helps.
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Konstantin
>>>>>
>>>>> On Fri, Jul 5, 2019 at 8:37 PM John Smith <java.dev....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi using Apache Flink 1.8.0
>>>>>>
>>>>>> I'm consuming events from Kafka using nothing fancy...
>>>>>>
>>>>>> Properties props = new Properties();
>>>>>> props.setProperty("bootstrap.servers", kafkaAddress);
>>>>>> props.setProperty("group.id",kafkaGroup);
>>>>>>
>>>>>> FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, 
>>>>>> new SimpleStringSchema(),props);
>>>>>>
>>>>>>
>>>>>> Do some JSON transforms and then push to my SQL database using JDBC
>>>>>> and stored procedure. Let's assume the SQL sink fails.
>>>>>>
>>>>>> We know that Kafka can either periodically commit offsets or it can
>>>>>> be done manually based on consumers logic.
>>>>>>
>>>>>> - How is the source Kafka consumer offsets handled?
>>>>>> - Does the Flink Kafka consumer commit the offset to per event/record?
>>>>>> - Will that single event that failed be retried?
>>>>>> - So if we had 5 incoming events and say on the 3rd one it failed,
>>>>>> will it continue on the 3rd or will the job restart and try those 5 
>>>>>> events.
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Konstantin Knauf | Solutions Architect
>>>>>
>>>>> +49 160 91394525
>>>>>
>>>>>
>>>>> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>
>>>>> --
>>>>>
>>>>> Ververica GmbH
>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>>
>>>>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
>
> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010
>
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>

Reply via email to