Hi!

Also note that although this eventual consistency seems not good enough,
but for 99.99% of the time the job can run smoothly without failure. In
this case the records are correct and good. Only in the 0.01% case when the
job fails will user see inconsistency for a small period of time (for a
checkpoint interval). If the user can tolerate this 0.01% chance of
inconsistency then he can get this very low latency and mostly correct data.

Caizhi Weng <tsreape...@gmail.com> 于2022年1月6日周四 10:40写道:

> Hi!
>
> Flink guarantees *eventual* consistency for systems without transactions
> (by transaction I mean a system supporting writing a few records then
> commit), or with transactions but users prefer latency than consistency.
> That is to say, everything produced by Flink before a checkpoint is "not
> secured" (if you need *strong* consistency). If a failure happens after a
> checkpoint C, then everything produced by Flink after C should be ignored.
> Only the success of C + 1 guarantees that all these records are now
> consistent (up to C + 1).
>
> If you prefer strong consistency and can tolerate latency of a few minutes
> (the latency here is your checkpoint interval), you can try hive sink. When
> writing to hive sink in a streaming job, records are only visible in hive
> after the next checkpoint completes (so it is ok to process a record
> several times, as long as its corresponding checkpoint hasn't completed).
>
> group-offsets does not help in your case. There actually is an option to
> commit offsets to Kafka during each checkpoint but Flink will also manage
> offsets in its own state. If there is no checkpoint then group offsets
> won't change.
>
> Sharon Xie <sharon.xie...@gmail.com> 于2022年1月5日周三 13:18写道:
>
>> Hi Caizhi,
>>
>> Thank you for the quick response. Can you help me understand how
>> reprocessing the data with the earliest starting-offset ensures exactly
>> once processing? 1st, the earliest offset could be way beyond the 1st
>> record in my example since the first time the job started from the latest
>> offset. 2nd, even if there are only two records in the topic, the 1st
>> record was already processed before so I'd think the 1st record would be
>> processed twice if the earliest offset is used.
>>
>> Another thing I found from the doc start reading position
>> <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#start-reading-position>
>>
>> >The default option value is group-offsets which indicates to consume
>> from last committed offsets in ZK / Kafka brokers.
>>
>> It seems that there is a way to resume processing from the
>> "group-offsets" where its value would be the offset of the 1st record in my
>> scenario. However, I can't make it work based on my test. I'm using
>> application mode deployment so my guess is that the 2nd job (in a new
>> cluster) internally has a different kafka consumer group id. Any ideas to
>> make it work?
>>
>>
>> Thanks,
>> Sharon
>>
>> On Tue, Jan 4, 2022 at 6:06 PM Caizhi Weng <tsreape...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> This is a valid case. This starting-offset is the offset for Kafka
>>> source to read from when the job starts *without checkpoint*. That is
>>> to say, if your job has been running for a while, completed several
>>> checkpoints and then restarted, Kafka source won't read from
>>> starting-offset, but from the offset logged in the checkpoint.
>>>
>>> As you're only processing 2 records (which I guess takes less than a
>>> second) no checkpoint has been completed (if you didn't change any
>>> configurations the default checkpointing interval is 10 minutes), so the
>>> next time you start the same job it will still read from starting-offset,
>>> which is the latest offset by your setting.
>>>
>>> If you would like to reprocess the second record you can set the
>>> starting-offset to earliest (which is the default setting). The first
>>> record will also be reprocessed but this is still valid because it is just
>>> updating the result for the first record (which is the same as your
>>> previous execution).
>>>
>>> Sharon Xie <sharon.xie...@gmail.com> 于2022年1月5日周三 02:56写道:
>>>
>>>> Can someone help me understand how Flink deals with the following
>>>> scenario?
>>>>
>>>> I have a job that reads from a source Kafka (starting-offset: latest)
>>>> and writes to a sink Kafka with exactly-once execution. Let's say that I
>>>> have 2 records in the source. The 1st one is processed without issue and
>>>> the job fails when the 2nd record is processed due to a parsing error. I
>>>> want to update the job with a fix for the 2nd record and resume processing
>>>> from the offset of the 2nd record.
>>>>
>>>> However, I can't find a way to stop the job with a savepoint because
>>>> the job is in a failed state. If I just cancel the job without a savepoint,
>>>> the job will start from the new "latest" offset next time I start it.
>>>>
>>>> Is this a valid case? If so, how to handle this case so that I can
>>>> resume processing from the 2nd record's offset after I update the job?
>>>>
>>>>
>>>> Thanks,
>>>> Sharon
>>>>
>>>>

Reply via email to