Hi, Khachatryan Thank you for the reply. Is that a problem that can be fixed? If so, is the fix on roadmap? Thanks!
Best Lu On Tue, Aug 4, 2020 at 1:24 PM Khachatryan Roman < khachatryan.ro...@gmail.com> wrote: > Hi Lu, > > Yes, this error indicates data loss (unless there were no records in the > transactions). > > Regards, > Roman > > > On Mon, Aug 3, 2020 at 9:14 PM Lu Niu <qqib...@gmail.com> wrote: > >> Hi, >> >> We are using end to end exact-once flink + kafka and encountered belowing >> exception which usually came after checkpoint failures: >> ``` >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> *Caused by: org.apache.kafka.common.errors.ProducerFencedException: >> Producer attempted an operation with an old epoch. Either there is a newer >> producer with the same transactionalId, or the producer's transaction has >> been expired by the broker.2020-07-28 16:27:51,633 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job xxx >> (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to >> FAILING.java.lang.RuntimeException: Error while confirming checkpoint at >> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219) at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at >> java.util.concurrent.FutureTask.run(FutureTask.java:266) at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748)Caused by: >> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions >> failed, logging first encountered failure at >> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842) >> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214) ... 5 >> more* >> ``` >> We did some end to end tests and noticed whenever such a thing happens, >> there will be a data loss. >> >> Referring to several related questions, I understand I need to increase ` >> transaction.timeout.ms` because: >> ``` >> *Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions >> that were started before taking a checkpoint, after recovering from the >> said checkpoint. If the time between Flink application crash and completed >> restart is larger than Kafka’s transaction timeout there will be data loss >> (Kafka will automatically abort transactions that exceeded timeout time).* >> ``` >> >> But I want to confirm with the community that: >> *Does an exception like this will always lead to data loss? * >> >> I asked because we get this exception sometimes even when the checkpoint >> succeeds. >> >> Setup: >> Flink 1.9.1 >> >> Best >> Lu >> >