Hi,

What’s your Kafka’s transaction timeout setting? Please both check Kafka 
producer configuration (transaction.timeout.ms property) and Kafka broker 
configuration. The most likely cause of such error message is when Kafka's 
timeout is smaller then Flink’s checkpoint interval and transactions are not 
committed quickly enough before timeout occurring.

Piotrek

> On 17 Mar 2018, at 07:24, Dongwon Kim <eastcirc...@gmail.com> wrote:
> 
> 
> Hi,
> 
> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 
> 7th, ... checkpoints:
> --
> java.lang.RuntimeException: Error while confirming checkpoint
>       at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
> attempted an operation with an old epoch. Either there is a newer producer 
> with the same transactionalId, or the producer's transaction has been expired 
> by the broker.
> --
> 
> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing 
> using Kafka sink.
> We use FsStateBackend to store snapshot data on HDFS.
> 
> As shown in configuration.png, my checkpoint configuration is:
> - Checkpointing Mode : Exactly Once
> - Interval : 15m 0s
> - Timeout : 10m 0s
> - Minimum Pause Between Checkpoints : 5m 0s
> - Maximum Concurrent Checkpoints : 1
> - Persist Checkpoints Externally : Disabled
> 
> After the first checkpoint completed [see history after 1st ckpt.png], the 
> job is restarted due to the ProducerFencedException [see exception after 1st 
> ckpt.png].
> The first checkpoint takes less than 2 minutes while my checkpoint interval 
> is 15m and minimum pause between checkpoints is 5m.
> After the job is restarted, the second checkpoint is triggered after a while 
> [see history after 2nd ckpt.png] and this time I've got no exception.
> The third checkpoint results in the same exception as after the first 
> checkpoint.
> 
> Can anybody let me know what's going wrong behind the scene?
> 
> Best,
> 
> Dongwon
> <history after 3rd ckpt.png><exception after 3rd ckpt.png><history after 2nd 
> ckpt.png><configuration.png><summary.png><exception after 1st 
> ckpt.png><history after 1st ckpt.png>

Reply via email to