Hi Piotr,

We have set producer's [transaction.timeout.ms] to 15 minutes and have used
the default setting for broker (15 mins).
As Flink's checkpoint interval is 15 minutes, it is not a situation where
Kafka's timeout is smaller than Flink's checkpoint interval.
As our first checkpoint just takes 2 minutes, it seems like transaction is
not committed properly.

Best,

- Dongwon





On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> What’s your Kafka’s transaction timeout setting? Please both check Kafka
> producer configuration (transaction.timeout.ms property) and Kafka broker
> configuration. The most likely cause of such error message is when Kafka's
> timeout is smaller then Flink’s checkpoint interval and transactions are
> not committed quickly enough before timeout occurring.
>
> Piotrek
>
> On 17 Mar 2018, at 07:24, Dongwon Kim <eastcirc...@gmail.com> wrote:
>
>
> Hi,
>
> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th,
> 7th, ... checkpoints:
>
> --
>
> java.lang.RuntimeException: Error while confirming checkpoint
>       at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
> attempted an operation with an old epoch. Either there is a newer producer 
> with the same transactionalId, or the producer's transaction has been expired 
> by the broker.
>
> --
>
>
> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing
> using Kafka sink.
> We use FsStateBackend to store snapshot data on HDFS.
>
> As shown in configuration.png, my checkpoint configuration is:
> - Checkpointing Mode : Exactly Once
> - Interval : 15m 0s
> - Timeout : 10m 0s
> - Minimum Pause Between Checkpoints : 5m 0s
> - Maximum Concurrent Checkpoints : 1
> - Persist Checkpoints Externally : Disabled
>
> After the first checkpoint completed [see history after 1st ckpt.png], the
> job is restarted due to the ProducerFencedException [see exception after
> 1st ckpt.png].
> The first checkpoint takes less than 2 minutes while my checkpoint
> interval is 15m and minimum pause between checkpoints is 5m.
> After the job is restarted, the second checkpoint is triggered after a
> while [see history after 2nd ckpt.png] and this time I've got no exception.
> The third checkpoint results in the same exception as after the first
> checkpoint.
>
> Can anybody let me know what's going wrong behind the scene?
>
> Best,
>
> Dongwon
> <history after 3rd ckpt.png><exception after 3rd ckpt.png><history after
> 2nd ckpt.png><configuration.png><summary.png><exception after 1st
> ckpt.png><history after 1st ckpt.png>
>
>
>

Reply via email to