Hi, But that’s exactly the case: producer’s transaction timeout starts when the external transaction starts - but FlinkKafkaProducer011 keeps an active Kafka transaction for the whole period between checkpoints.
As I wrote in the previous message: > in case of failure, your timeout must also be able to cover the additional > downtime required for the successful job restart. Thus you should increase > your timeout accordingly. I think that 15 minutes timeout is a way too small value. If your job fails because of some intermittent failure (for example worker crash/restart), you will only have a couple of minutes for a successful Flink job restart. Otherwise you will lose some data (because of the transaction timeouts). Piotrek > On 21 Mar 2018, at 10:30, Dongwon Kim <eastcirc...@gmail.com> wrote: > > Hi Piotr, > > Now my streaming pipeline is working without retries. > I decreased Flink's checkpoint interval from 15min to 10min as you suggested > [see screenshot_10min_ckpt.png]. > > I though that producer's transaction timeout starts when the external > transaction starts. > The truth is that Producer's transaction timeout starts after the last > external checkpoint is committed. > Now that I have 15min for Producer's transaction timeout and 10min for > Flink's checkpoint interval, and every checkpoint takes less than 5 minutes, > everything is working fine. > Am I right? > > Anyway thank you very much for the detailed explanation! > > Best, > > Dongwon > > > > On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski <pi...@data-artisans.com > <mailto:pi...@data-artisans.com>> wrote: > Hi, > > Please increase transaction.timeout.ms <http://transaction.timeout.ms/> to a > greater value or decrease Flink’s checkpoint interval, I’m pretty sure the > issue here is that those two values are overlapping. I think that’s even > visible on the screenshots. First checkpoint completed started at 14:28:48 > and ended at 14:30:43, while the second one started at 14:45:53 and ended at > 14:49:16. That gives you minimal transaction duration of 15 minutes and 10 > seconds, with maximal transaction duration of 21 minutes. > > In HAPPY SCENARIO (without any failure and restarting), you should assume > that your timeout interval should cover with some safety margin the period > between start of a checkpoint and end of the NEXT checkpoint, since this is > the upper bound how long the transaction might be used. In your case at least > ~25 minutes. > > On top of that, as described in the docs, > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance > > <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance> > , in case of failure, your timeout must also be able to cover the additional > downtime required for the successful job restart. Thus you should increase > your timeout accordingly. > > Piotrek > > >> On 20 Mar 2018, at 11:58, Dongwon Kim <eastcirc...@gmail.com >> <mailto:eastcirc...@gmail.com>> wrote: >> >> Hi Piotr, >> >> We have set producer's [transaction.timeout.ms >> <http://transaction.timeout.ms/>] to 15 minutes and have used the default >> setting for broker (15 mins). >> As Flink's checkpoint interval is 15 minutes, it is not a situation where >> Kafka's timeout is smaller than Flink's checkpoint interval. >> As our first checkpoint just takes 2 minutes, it seems like transaction is >> not committed properly. >> >> Best, >> >> - Dongwon >> >> >> >> >> >> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <pi...@data-artisans.com >> <mailto:pi...@data-artisans.com>> wrote: >> Hi, >> >> What’s your Kafka’s transaction timeout setting? Please both check Kafka >> producer configuration (transaction.timeout.ms >> <http://transaction.timeout.ms/> property) and Kafka broker configuration. >> The most likely cause of such error message is when Kafka's timeout is >> smaller then Flink’s checkpoint interval and transactions are not committed >> quickly enough before timeout occurring. >> >> Piotrek >> >>> On 17 Mar 2018, at 07:24, Dongwon Kim <eastcirc...@gmail.com >>> <mailto:eastcirc...@gmail.com>> wrote: >>> >>> >>> Hi, >>> >>> I'm faced with the following ProducerFencedException after 1st, 3rd, 5th, >>> 7th, ... checkpoints: >>> -- >>> java.lang.RuntimeException: Error while confirming checkpoint >>> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer >>> attempted an operation with an old epoch. Either there is a newer producer >>> with the same transactionalId, or the producer's transaction has been >>> expired by the broker. >>> -- >>> >>> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once processing >>> using Kafka sink. >>> We use FsStateBackend to store snapshot data on HDFS. >>> >>> As shown in configuration.png, my checkpoint configuration is: >>> - Checkpointing Mode : Exactly Once >>> - Interval : 15m 0s >>> - Timeout : 10m 0s >>> - Minimum Pause Between Checkpoints : 5m 0s >>> - Maximum Concurrent Checkpoints : 1 >>> - Persist Checkpoints Externally : Disabled >>> >>> After the first checkpoint completed [see history after 1st ckpt.png], the >>> job is restarted due to the ProducerFencedException [see exception after >>> 1st ckpt.png]. >>> The first checkpoint takes less than 2 minutes while my checkpoint interval >>> is 15m and minimum pause between checkpoints is 5m. >>> After the job is restarted, the second checkpoint is triggered after a >>> while [see history after 2nd ckpt.png] and this time I've got no exception. >>> The third checkpoint results in the same exception as after the first >>> checkpoint. >>> >>> Can anybody let me know what's going wrong behind the scene? >>> >>> Best, >>> >>> Dongwon >>> <history after 3rd ckpt.png><exception after 3rd ckpt.png><history after >>> 2nd ckpt.png><configuration.png><summary.png><exception after 1st >>> ckpt.png><history after 1st ckpt.png> >> >> > > > <screenshot_10min_ckpt.png>