Hi Lu,

Yes, this error indicates data loss (unless there were no records in the
transactions).

Regards,
Roman


On Mon, Aug 3, 2020 at 9:14 PM Lu Niu <qqib...@gmail.com> wrote:

> Hi,
>
> We are using end to end exact-once flink + kafka and encountered belowing
> exception which usually came after checkpoint failures:
> ```
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Caused by: org.apache.kafka.common.errors.ProducerFencedException:
> Producer attempted an operation with an old epoch. Either there is a newer
> producer with the same transactionalId, or the producer's transaction has
> been expired by the broker.2020-07-28 16:27:51,633 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job xxx
> (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to
> FAILING.java.lang.RuntimeException: Error while confirming checkpoint at
> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219) at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)Caused by:
> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
> failed, logging first encountered failure at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842)
> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214) ... 5
> more*
> ```
> We did some end to end tests and noticed whenever such a thing happens,
> there will be a data loss.
>
> Referring to several related questions, I understand I need to increase `
> transaction.timeout.ms`  because:
> ```
> *Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions
> that were started before taking a checkpoint, after recovering from the
> said checkpoint. If the time between Flink application crash and completed
> restart is larger than Kafka’s transaction timeout there will be data loss
> (Kafka will automatically abort transactions that exceeded timeout time).*
> ```
>
> But I want to confirm with the community that:
> *Does an exception like this will always lead to data loss? *
>
> I asked because we get this exception sometimes even when the checkpoint
> succeeds.
>
> Setup:
> Flink 1.9.1
>
> Best
> Lu
>

Reply via email to