Hi,

We are using end to end exact-once flink + kafka and encountered belowing
exception which usually came after checkpoint failures:
```














*Caused by: org.apache.kafka.common.errors.ProducerFencedException:
Producer attempted an operation with an old epoch. Either there is a newer
producer with the same transactionalId, or the producer's transaction has
been expired by the broker.2020-07-28 16:27:51,633 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job xxx
(f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to
FAILING.java.lang.RuntimeException: Error while confirming checkpoint at
org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)Caused by:
org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
failed, logging first encountered failure at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842)
at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214) ... 5
more*
```
We did some end to end tests and noticed whenever such a thing happens,
there will be a data loss.

Referring to several related questions, I understand I need to increase `
transaction.timeout.ms`  because:
```
*Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions
that were started before taking a checkpoint, after recovering from the
said checkpoint. If the time between Flink application crash and completed
restart is larger than Kafka’s transaction timeout there will be data loss
(Kafka will automatically abort transactions that exceeded timeout time).*
```

But I want to confirm with the community that:
*Does an exception like this will always lead to data loss? *

I asked because we get this exception sometimes even when the checkpoint
succeeds.

Setup:
Flink 1.9.1

Best
Lu

Reply via email to