Hi,

But that’s exactly the case: producer’s transaction timeout starts when the 
external transaction starts - but FlinkKafkaProducer011 keeps an active Kafka 
transaction for the whole period between checkpoints.

As I wrote in the previous message:

> in case of failure, your timeout must also be able to cover the additional 
> downtime required for the successful job restart. Thus you should increase 
> your timeout accordingly.

I think that 15 minutes timeout is a way too small value. If your job fails 
because of some intermittent failure (for example worker crash/restart), you 
will only have a couple of minutes for a successful Flink job restart. 
Otherwise you will lose some data (because of the transaction timeouts).

Piotrek

> On 21 Mar 2018, at 10:30, Dongwon Kim <eastcirc...@gmail.com> wrote:
> 
> Hi Piotr,
> 
> Now my streaming pipeline is working without retries. 
> I decreased Flink's checkpoint interval from 15min to 10min as you suggested 
> [see screenshot_10min_ckpt.png].
> 
> I though that producer's transaction timeout starts when the external 
> transaction starts.
> The truth is that Producer's transaction timeout starts after the last 
> external checkpoint is committed.
> Now that I have 15min for Producer's transaction timeout and 10min for 
> Flink's checkpoint interval, and every checkpoint takes less than 5 minutes, 
> everything is working fine.
> Am I right?
> 
> Anyway thank you very much for the detailed explanation!
> 
> Best,
> 
> Dongwon
> 
> 
> 
> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> Please increase transaction.timeout.ms <http://transaction.timeout.ms/> to a 
> greater value or decrease Flink’s checkpoint interval, I’m pretty sure the 
> issue here is that those two values are overlapping. I think that’s even 
> visible on the screenshots. First checkpoint completed started at 14:28:48 
> and ended at 14:30:43, while the second one started at 14:45:53 and ended at 
> 14:49:16. That gives you minimal transaction duration of 15 minutes and 10 
> seconds, with maximal transaction duration of 21 minutes.
> 
> In HAPPY SCENARIO (without any failure and restarting), you should assume 
> that your timeout interval should cover with some safety margin the period 
> between start of a checkpoint and end of the NEXT checkpoint, since this is 
> the upper bound how long the transaction might be used. In your case at least 
> ~25 minutes.
> 
> On top of that, as described in the docs, 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance>
>  , in case of failure, your timeout must also be able to cover the additional 
> downtime required for the successful job restart. Thus you should increase 
> your timeout accordingly. 
> 
> Piotrek
> 
> 
>> On 20 Mar 2018, at 11:58, Dongwon Kim <eastcirc...@gmail.com 
>> <mailto:eastcirc...@gmail.com>> wrote:
>> 
>> Hi Piotr,
>> 
>> We have set producer's [transaction.timeout.ms 
>> <http://transaction.timeout.ms/>] to 15 minutes and have used the default 
>> setting for broker (15 mins).
>> As Flink's checkpoint interval is 15 minutes, it is not a situation where 
>> Kafka's timeout is smaller than Flink's checkpoint interval.
>> As our first checkpoint just takes 2 minutes, it seems like transaction is 
>> not committed properly.
>> 
>> Best,
>> 
>> - Dongwon
>> 
>> 
>> 
>> 
>> 
>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> What’s your Kafka’s transaction timeout setting? Please both check Kafka 
>> producer configuration (transaction.timeout.ms 
>> <http://transaction.timeout.ms/> property) and Kafka broker configuration. 
>> The most likely cause of such error message is when Kafka's timeout is 
>> smaller then Flink’s checkpoint interval and transactions are not committed 
>> quickly enough before timeout occurring.
>> 
>> Piotrek
>> 
>>> On 17 Mar 2018, at 07:24, Dongwon Kim <eastcirc...@gmail.com 
>>> <mailto:eastcirc...@gmail.com>> wrote:
>>> 
>>> 
>>> Hi,
>>> 
>>> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, 
>>> 7th, ... checkpoints:
>>> --
>>> java.lang.RuntimeException: Error while confirming checkpoint
>>>     at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>>     at 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
>>> attempted an operation with an old epoch. Either there is a newer producer 
>>> with the same transactionalId, or the producer's transaction has been 
>>> expired by the broker.
>>> --
>>> 
>>> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing 
>>> using Kafka sink.
>>> We use FsStateBackend to store snapshot data on HDFS.
>>> 
>>> As shown in configuration.png, my checkpoint configuration is:
>>> - Checkpointing Mode : Exactly Once
>>> - Interval : 15m 0s
>>> - Timeout : 10m 0s
>>> - Minimum Pause Between Checkpoints : 5m 0s
>>> - Maximum Concurrent Checkpoints : 1
>>> - Persist Checkpoints Externally : Disabled
>>> 
>>> After the first checkpoint completed [see history after 1st ckpt.png], the 
>>> job is restarted due to the ProducerFencedException [see exception after 
>>> 1st ckpt.png].
>>> The first checkpoint takes less than 2 minutes while my checkpoint interval 
>>> is 15m and minimum pause between checkpoints is 5m.
>>> After the job is restarted, the second checkpoint is triggered after a 
>>> while [see history after 2nd ckpt.png] and this time I've got no exception.
>>> The third checkpoint results in the same exception as after the first 
>>> checkpoint.
>>> 
>>> Can anybody let me know what's going wrong behind the scene?
>>> 
>>> Best,
>>> 
>>> Dongwon
>>> <history after 3rd ckpt.png><exception after 3rd ckpt.png><history after 
>>> 2nd ckpt.png><configuration.png><summary.png><exception after 1st 
>>> ckpt.png><history after 1st ckpt.png>
>> 
>> 
> 
> 
> <screenshot_10min_ckpt.png>

Reply via email to