Hi,

Please increase transaction.timeout.ms to a greater value or decrease Flink’s 
checkpoint interval, I’m pretty sure the issue here is that those two values 
are overlapping. I think that’s even visible on the screenshots. First 
checkpoint completed started at 14:28:48 and ended at 14:30:43, while the 
second one started at 14:45:53 and ended at 14:49:16. That gives you minimal 
transaction duration of 15 minutes and 10 seconds, with maximal transaction 
duration of 21 minutes.

In HAPPY SCENARIO (without any failure and restarting), you should assume that 
your timeout interval should cover with some safety margin the period between 
start of a checkpoint and end of the NEXT checkpoint, since this is the upper 
bound how long the transaction might be used. In your case at least ~25 minutes.

On top of that, as described in the docs, 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance>
 , in case of failure, your timeout must also be able to cover the additional 
downtime required for the successful job restart. Thus you should increase your 
timeout accordingly. 

Piotrek


> On 20 Mar 2018, at 11:58, Dongwon Kim <eastcirc...@gmail.com> wrote:
> 
> Hi Piotr,
> 
> We have set producer's [transaction.timeout.ms 
> <http://transaction.timeout.ms/>] to 15 minutes and have used the default 
> setting for broker (15 mins).
> As Flink's checkpoint interval is 15 minutes, it is not a situation where 
> Kafka's timeout is smaller than Flink's checkpoint interval.
> As our first checkpoint just takes 2 minutes, it seems like transaction is 
> not committed properly.
> 
> Best,
> 
> - Dongwon
> 
> 
> 
> 
> 
> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> What’s your Kafka’s transaction timeout setting? Please both check Kafka 
> producer configuration (transaction.timeout.ms 
> <http://transaction.timeout.ms/> property) and Kafka broker configuration. 
> The most likely cause of such error message is when Kafka's timeout is 
> smaller then Flink’s checkpoint interval and transactions are not committed 
> quickly enough before timeout occurring.
> 
> Piotrek
> 
>> On 17 Mar 2018, at 07:24, Dongwon Kim <eastcirc...@gmail.com 
>> <mailto:eastcirc...@gmail.com>> wrote:
>> 
>> 
>> Hi,
>> 
>> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 
>> 7th, ... checkpoints:
>> --
>> java.lang.RuntimeException: Error while confirming checkpoint
>>      at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>      at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>      at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
>> attempted an operation with an old epoch. Either there is a newer producer 
>> with the same transactionalId, or the producer's transaction has been 
>> expired by the broker.
>> --
>> 
>> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing 
>> using Kafka sink.
>> We use FsStateBackend to store snapshot data on HDFS.
>> 
>> As shown in configuration.png, my checkpoint configuration is:
>> - Checkpointing Mode : Exactly Once
>> - Interval : 15m 0s
>> - Timeout : 10m 0s
>> - Minimum Pause Between Checkpoints : 5m 0s
>> - Maximum Concurrent Checkpoints : 1
>> - Persist Checkpoints Externally : Disabled
>> 
>> After the first checkpoint completed [see history after 1st ckpt.png], the 
>> job is restarted due to the ProducerFencedException [see exception after 1st 
>> ckpt.png].
>> The first checkpoint takes less than 2 minutes while my checkpoint interval 
>> is 15m and minimum pause between checkpoints is 5m.
>> After the job is restarted, the second checkpoint is triggered after a while 
>> [see history after 2nd ckpt.png] and this time I've got no exception.
>> The third checkpoint results in the same exception as after the first 
>> checkpoint.
>> 
>> Can anybody let me know what's going wrong behind the scene?
>> 
>> Best,
>> 
>> Dongwon
>> <history after 3rd ckpt.png><exception after 3rd ckpt.png><history after 2nd 
>> ckpt.png><configuration.png><summary.png><exception after 1st 
>> ckpt.png><history after 1st ckpt.png>
> 
> 

Reply via email to