Hi! Also note that although this eventual consistency seems not good enough, but for 99.99% of the time the job can run smoothly without failure. In this case the records are correct and good. Only in the 0.01% case when the job fails will user see inconsistency for a small period of time (for a checkpoint interval). If the user can tolerate this 0.01% chance of inconsistency then he can get this very low latency and mostly correct data.
Caizhi Weng <tsreape...@gmail.com> 于2022年1月6日周四 10:40写道: > Hi! > > Flink guarantees *eventual* consistency for systems without transactions > (by transaction I mean a system supporting writing a few records then > commit), or with transactions but users prefer latency than consistency. > That is to say, everything produced by Flink before a checkpoint is "not > secured" (if you need *strong* consistency). If a failure happens after a > checkpoint C, then everything produced by Flink after C should be ignored. > Only the success of C + 1 guarantees that all these records are now > consistent (up to C + 1). > > If you prefer strong consistency and can tolerate latency of a few minutes > (the latency here is your checkpoint interval), you can try hive sink. When > writing to hive sink in a streaming job, records are only visible in hive > after the next checkpoint completes (so it is ok to process a record > several times, as long as its corresponding checkpoint hasn't completed). > > group-offsets does not help in your case. There actually is an option to > commit offsets to Kafka during each checkpoint but Flink will also manage > offsets in its own state. If there is no checkpoint then group offsets > won't change. > > Sharon Xie <sharon.xie...@gmail.com> 于2022年1月5日周三 13:18写道: > >> Hi Caizhi, >> >> Thank you for the quick response. Can you help me understand how >> reprocessing the data with the earliest starting-offset ensures exactly >> once processing? 1st, the earliest offset could be way beyond the 1st >> record in my example since the first time the job started from the latest >> offset. 2nd, even if there are only two records in the topic, the 1st >> record was already processed before so I'd think the 1st record would be >> processed twice if the earliest offset is used. >> >> Another thing I found from the doc start reading position >> <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#start-reading-position> >> >> >The default option value is group-offsets which indicates to consume >> from last committed offsets in ZK / Kafka brokers. >> >> It seems that there is a way to resume processing from the >> "group-offsets" where its value would be the offset of the 1st record in my >> scenario. However, I can't make it work based on my test. I'm using >> application mode deployment so my guess is that the 2nd job (in a new >> cluster) internally has a different kafka consumer group id. Any ideas to >> make it work? >> >> >> Thanks, >> Sharon >> >> On Tue, Jan 4, 2022 at 6:06 PM Caizhi Weng <tsreape...@gmail.com> wrote: >> >>> Hi! >>> >>> This is a valid case. This starting-offset is the offset for Kafka >>> source to read from when the job starts *without checkpoint*. That is >>> to say, if your job has been running for a while, completed several >>> checkpoints and then restarted, Kafka source won't read from >>> starting-offset, but from the offset logged in the checkpoint. >>> >>> As you're only processing 2 records (which I guess takes less than a >>> second) no checkpoint has been completed (if you didn't change any >>> configurations the default checkpointing interval is 10 minutes), so the >>> next time you start the same job it will still read from starting-offset, >>> which is the latest offset by your setting. >>> >>> If you would like to reprocess the second record you can set the >>> starting-offset to earliest (which is the default setting). The first >>> record will also be reprocessed but this is still valid because it is just >>> updating the result for the first record (which is the same as your >>> previous execution). >>> >>> Sharon Xie <sharon.xie...@gmail.com> 于2022年1月5日周三 02:56写道: >>> >>>> Can someone help me understand how Flink deals with the following >>>> scenario? >>>> >>>> I have a job that reads from a source Kafka (starting-offset: latest) >>>> and writes to a sink Kafka with exactly-once execution. Let's say that I >>>> have 2 records in the source. The 1st one is processed without issue and >>>> the job fails when the 2nd record is processed due to a parsing error. I >>>> want to update the job with a fix for the 2nd record and resume processing >>>> from the offset of the 2nd record. >>>> >>>> However, I can't find a way to stop the job with a savepoint because >>>> the job is in a failed state. If I just cancel the job without a savepoint, >>>> the job will start from the new "latest" offset next time I start it. >>>> >>>> Is this a valid case? If so, how to handle this case so that I can >>>> resume processing from the 2nd record's offset after I update the job? >>>> >>>> >>>> Thanks, >>>> Sharon >>>> >>>>