Hi, I had the same exception recently. I want to confirm that if it is due to transaction timeout, then I will lose those data. Am I right? Can I make it fall back to at least once semantic in this situation?
Best, Tony Wei Piotr Nowojski <pi...@data-artisans.com> 於 2018年3月21日 週三 下午10:28寫道: > Hi, > > But that’s exactly the case: producer’s transaction timeout starts when > the external transaction starts - but FlinkKafkaProducer011 keeps an active > Kafka transaction for the whole period between checkpoints. > > As I wrote in the previous message: > > > in case of failure, your timeout must also be able to cover the > additional downtime required for the successful job restart. Thus you > should increase your timeout accordingly. > > I think that 15 minutes timeout is a way too small value. If your job > fails because of some intermittent failure (for example worker > crash/restart), you will only have a couple of minutes for a successful > Flink job restart. Otherwise you will lose some data (because of the > transaction timeouts). > > Piotrek > > On 21 Mar 2018, at 10:30, Dongwon Kim <eastcirc...@gmail.com> wrote: > > Hi Piotr, > > Now my streaming pipeline is working without retries. > I decreased Flink's checkpoint interval from 15min to 10min as you > suggested [see screenshot_10min_ckpt.png]. > > I though that producer's transaction timeout starts when the external > transaction starts. > The truth is that Producer's transaction timeout starts after the last > external checkpoint is committed. > Now that I have 15min for Producer's transaction timeout and 10min for > Flink's checkpoint interval, and every checkpoint takes less than 5 > minutes, everything is working fine. > Am I right? > > Anyway thank you very much for the detailed explanation! > > Best, > > Dongwon > > > > On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski <pi...@data-artisans.com> > wrote: > >> Hi, >> >> Please increase transaction.timeout.ms to a greater value or decrease >> Flink’s checkpoint interval, I’m pretty sure the issue here is that those >> two values are overlapping. I think that’s even visible on the screenshots. >> First checkpoint completed started at 14:28:48 and ended at 14:30:43, while >> the second one started at 14:45:53 and ended at 14:49:16. That gives you >> minimal transaction duration of 15 minutes and 10 seconds, with maximal >> transaction duration of 21 minutes. >> >> In HAPPY SCENARIO (without any failure and restarting), you should assume >> that your timeout interval should cover with some safety margin the period >> between start of a checkpoint and end of the NEXT checkpoint, since this is >> the upper bound how long the transaction might be used. In your case at >> least ~25 minutes. >> >> On top of that, as described in the docs, >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance >> , >> in case of failure, your timeout must also be able to cover the additional >> downtime required for the successful job restart. Thus you should increase >> your timeout accordingly. >> >> Piotrek >> >> >> On 20 Mar 2018, at 11:58, Dongwon Kim <eastcirc...@gmail.com> wrote: >> >> Hi Piotr, >> >> We have set producer's [transaction.timeout.ms] to 15 minutes and have >> used the default setting for broker (15 mins). >> As Flink's checkpoint interval is 15 minutes, it is not a situation where >> Kafka's timeout is smaller than Flink's checkpoint interval. >> As our first checkpoint just takes 2 minutes, it seems like transaction >> is not committed properly. >> >> Best, >> >> - Dongwon >> >> >> >> >> >> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <pi...@data-artisans.com> >> wrote: >> >>> Hi, >>> >>> What’s your Kafka’s transaction timeout setting? Please both check Kafka >>> producer configuration (transaction.timeout.ms property) and Kafka >>> broker configuration. The most likely cause of such error message is when >>> Kafka's timeout is smaller then Flink’s checkpoint interval and >>> transactions are not committed quickly enough before timeout occurring. >>> >>> Piotrek >>> >>> On 17 Mar 2018, at 07:24, Dongwon Kim <eastcirc...@gmail.com> wrote: >>> >>> >>> Hi, >>> >>> I'm faced with the following ProducerFencedException after 1st, 3rd, >>> 5th, 7th, ... checkpoints: >>> >>> -- >>> >>> java.lang.RuntimeException: Error while confirming checkpoint >>> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260) >>> at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer >>> attempted an operation with an old epoch. Either there is a newer producer >>> with the same transactionalId, or the producer's transaction has been >>> expired by the broker. >>> >>> -- >>> >>> >>> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once >>> processing using Kafka sink. >>> We use FsStateBackend to store snapshot data on HDFS. >>> >>> As shown in configuration.png, my checkpoint configuration is: >>> - Checkpointing Mode : Exactly Once >>> - Interval : 15m 0s >>> - Timeout : 10m 0s >>> - Minimum Pause Between Checkpoints : 5m 0s >>> - Maximum Concurrent Checkpoints : 1 >>> - Persist Checkpoints Externally : Disabled >>> >>> After the first checkpoint completed [see history after 1st ckpt.png], >>> the job is restarted due to the ProducerFencedException [see exception >>> after 1st ckpt.png]. >>> The first checkpoint takes less than 2 minutes while my checkpoint >>> interval is 15m and minimum pause between checkpoints is 5m. >>> After the job is restarted, the second checkpoint is triggered after a >>> while [see history after 2nd ckpt.png] and this time I've got no exception. >>> The third checkpoint results in the same exception as after the first >>> checkpoint. >>> >>> Can anybody let me know what's going wrong behind the scene? >>> >>> Best, >>> >>> Dongwon >>> <history after 3rd ckpt.png><exception after 3rd ckpt.png><history >>> after 2nd ckpt.png><configuration.png><summary.png><exception after 1st >>> ckpt.png><history after 1st ckpt.png> >>> >>> >>> >> >> > <screenshot_10min_ckpt.png> > > >