Hi Lu, In this case, as it looks from the quite fragmented log/error message that you posted, the job has failed so Flink indeed detected some issue and that probably means a data loss in Kafka (in such case you could probably recover some lost records by reading with `read_uncommitted` mode from Kafka, but that can leads to data duplication).
However a very similar error can be logged by Flink as WARN during recovery. In that case it can mean either: - data loss because of timeouts (keep in mind that kafka transactional timeouts must cover: checkpoint interval + downtime during the failure + time to restart and recover Flink job) - transaction was already committed before, just before failure has happened and there is unfortunately no way using Kafka API to distinguish those two cases. Piotrek śr., 5 sie 2020 o 10:17 Khachatryan Roman <khachatryan.ro...@gmail.com> napisał(a): > Hi Lu, > > AFAIK, it's not going to be fixed. As you mentioned in the first email, > Kafka should be configured so that it's transaction timeout is less than > your max checkpoint duration. > > However, you should not only change transaction.timeout.ms in producer > but also transaction.max.timeout.ms on your brokers. > Please refer to > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#caveats > > Regards, > Roman > > > On Wed, Aug 5, 2020 at 12:24 AM Lu Niu <qqib...@gmail.com> wrote: > >> Hi, Khachatryan >> >> Thank you for the reply. Is that a problem that can be fixed? If so, is >> the fix on roadmap? Thanks! >> >> Best >> Lu >> >> On Tue, Aug 4, 2020 at 1:24 PM Khachatryan Roman < >> khachatryan.ro...@gmail.com> wrote: >> >>> Hi Lu, >>> >>> Yes, this error indicates data loss (unless there were no records in the >>> transactions). >>> >>> Regards, >>> Roman >>> >>> >>> On Mon, Aug 3, 2020 at 9:14 PM Lu Niu <qqib...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> We are using end to end exact-once flink + kafka and >>>> encountered belowing exception which usually came after checkpoint >>>> failures: >>>> ``` >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> *Caused by: org.apache.kafka.common.errors.ProducerFencedException: >>>> Producer attempted an operation with an old epoch. Either there is a newer >>>> producer with the same transactionalId, or the producer's transaction has >>>> been expired by the broker.2020-07-28 16:27:51,633 INFO >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job xxx >>>> (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to >>>> FAILING.java.lang.RuntimeException: Error while confirming checkpoint at >>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219) at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at >>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> at java.lang.Thread.run(Thread.java:748)Caused by: >>>> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions >>>> failed, logging first encountered failure at >>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842) >>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214) ... 5 >>>> more* >>>> ``` >>>> We did some end to end tests and noticed whenever such a thing happens, >>>> there will be a data loss. >>>> >>>> Referring to several related questions, I understand I need to increase >>>> `transaction.timeout.ms` because: >>>> ``` >>>> *Semantic.EXACTLY_ONCE mode relies on the ability to commit >>>> transactions that were started before taking a checkpoint, after recovering >>>> from the said checkpoint. If the time between Flink application crash and >>>> completed restart is larger than Kafka’s transaction timeout there will be >>>> data loss (Kafka will automatically abort transactions that exceeded >>>> timeout time).* >>>> ``` >>>> >>>> But I want to confirm with the community that: >>>> *Does an exception like this will always lead to data loss? * >>>> >>>> I asked because we get this exception sometimes even when the >>>> checkpoint succeeds. >>>> >>>> Setup: >>>> Flink 1.9.1 >>>> >>>> Best >>>> Lu >>>> >>>