Hi Lu, Yes, this error indicates data loss (unless there were no records in the transactions).
Regards, Roman On Mon, Aug 3, 2020 at 9:14 PM Lu Niu <qqib...@gmail.com> wrote: > Hi, > > We are using end to end exact-once flink + kafka and encountered belowing > exception which usually came after checkpoint failures: > ``` > > > > > > > > > > > > > > > *Caused by: org.apache.kafka.common.errors.ProducerFencedException: > Producer attempted an operation with an old epoch. Either there is a newer > producer with the same transactionalId, or the producer's transaction has > been expired by the broker.2020-07-28 16:27:51,633 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job xxx > (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to > FAILING.java.lang.RuntimeException: Error while confirming checkpoint at > org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219) at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at > java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748)Caused by: > org.apache.flink.util.FlinkRuntimeException: Committing one of transactions > failed, logging first encountered failure at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842) > at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214) ... 5 > more* > ``` > We did some end to end tests and noticed whenever such a thing happens, > there will be a data loss. > > Referring to several related questions, I understand I need to increase ` > transaction.timeout.ms` because: > ``` > *Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions > that were started before taking a checkpoint, after recovering from the > said checkpoint. If the time between Flink application crash and completed > restart is larger than Kafka’s transaction timeout there will be data loss > (Kafka will automatically abort transactions that exceeded timeout time).* > ``` > > But I want to confirm with the community that: > *Does an exception like this will always lead to data loss? * > > I asked because we get this exception sometimes even when the checkpoint > succeeds. > > Setup: > Flink 1.9.1 > > Best > Lu >