Hi Piotr, We have set producer's [transaction.timeout.ms] to 15 minutes and have used the default setting for broker (15 mins). As Flink's checkpoint interval is 15 minutes, it is not a situation where Kafka's timeout is smaller than Flink's checkpoint interval. As our first checkpoint just takes 2 minutes, it seems like transaction is not committed properly.
Best, - Dongwon On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <pi...@data-artisans.com> wrote: > Hi, > > What’s your Kafka’s transaction timeout setting? Please both check Kafka > producer configuration (transaction.timeout.ms property) and Kafka broker > configuration. The most likely cause of such error message is when Kafka's > timeout is smaller then Flink’s checkpoint interval and transactions are > not committed quickly enough before timeout occurring. > > Piotrek > > On 17 Mar 2018, at 07:24, Dongwon Kim <eastcirc...@gmail.com> wrote: > > > Hi, > > I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, > 7th, ... checkpoints: > > -- > > java.lang.RuntimeException: Error while confirming checkpoint > at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer > attempted an operation with an old epoch. Either there is a newer producer > with the same transactionalId, or the producer's transaction has been expired > by the broker. > > -- > > > FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing > using Kafka sink. > We use FsStateBackend to store snapshot data on HDFS. > > As shown in configuration.png, my checkpoint configuration is: > - Checkpointing Mode : Exactly Once > - Interval : 15m 0s > - Timeout : 10m 0s > - Minimum Pause Between Checkpoints : 5m 0s > - Maximum Concurrent Checkpoints : 1 > - Persist Checkpoints Externally : Disabled > > After the first checkpoint completed [see history after 1st ckpt.png], the > job is restarted due to the ProducerFencedException [see exception after > 1st ckpt.png]. > The first checkpoint takes less than 2 minutes while my checkpoint > interval is 15m and minimum pause between checkpoints is 5m. > After the job is restarted, the second checkpoint is triggered after a > while [see history after 2nd ckpt.png] and this time I've got no exception. > The third checkpoint results in the same exception as after the first > checkpoint. > > Can anybody let me know what's going wrong behind the scene? > > Best, > > Dongwon > <history after 3rd ckpt.png><exception after 3rd ckpt.png><history after > 2nd ckpt.png><configuration.png><summary.png><exception after 1st > ckpt.png><history after 1st ckpt.png> > > >