Hi Piotr, Thanks a lot. I need exactly once in my use case, but instead of having the risk of losing data, at least once is more acceptable when error occurred.
Best, Tony Wei Piotr Nowojski <pi...@data-artisans.com> 於 2019年8月12日 週一 下午3:27寫道: > Hi, > > Yes, if it’s due to transaction timeout you will lose the data. > > Whether can you fallback to at least once, that depends on Kafka, not on > Flink, since it’s the Kafka that timeouts those transactions and I don’t > see in the documentation anything that could override this [1]. You might > try disabling the mechanism via setting ` > transaction.abort.timed.out.transaction.cleanup.interval.ms` or ` > transaction.remove.expired.transaction.cleanup.interval.ms`, but that’s > question more to Kafka guys. Maybe Becket could help with this. > > Also it MIGHT be that Kafka doesn’t remove records from the topics when > aborting the transaction and MAYBE you can still access them via > “READ_UNCOMMITTED” mode. But that’s again, question to Kafka. > > Sorry that I can not help more. > > If you do not care about exactly once, why don’t you just set the > connector to at least once mode? > > Piotrek > > On 12 Aug 2019, at 06:29, Tony Wei <tony19920...@gmail.com> wrote: > > Hi, > > I had the same exception recently. I want to confirm that if it is due to > transaction timeout, > then I will lose those data. Am I right? Can I make it fall back to at > least once semantic in > this situation? > > Best, > Tony Wei > > Piotr Nowojski <pi...@data-artisans.com> 於 2018年3月21日 週三 下午10:28寫道: > >> Hi, >> >> But that’s exactly the case: producer’s transaction timeout starts when >> the external transaction starts - but FlinkKafkaProducer011 keeps an active >> Kafka transaction for the whole period between checkpoints. >> >> As I wrote in the previous message: >> >> > in case of failure, your timeout must also be able to cover the >> additional downtime required for the successful job restart. Thus you >> should increase your timeout accordingly. >> >> I think that 15 minutes timeout is a way too small value. If your job >> fails because of some intermittent failure (for example worker >> crash/restart), you will only have a couple of minutes for a successful >> Flink job restart. Otherwise you will lose some data (because of the >> transaction timeouts). >> >> Piotrek >> >> On 21 Mar 2018, at 10:30, Dongwon Kim <eastcirc...@gmail.com> wrote: >> >> Hi Piotr, >> >> Now my streaming pipeline is working without retries. >> I decreased Flink's checkpoint interval from 15min to 10min as you >> suggested [see screenshot_10min_ckpt.png]. >> >> I though that producer's transaction timeout starts when the external >> transaction starts. >> The truth is that Producer's transaction timeout starts after the last >> external checkpoint is committed. >> Now that I have 15min for Producer's transaction timeout and 10min for >> Flink's checkpoint interval, and every checkpoint takes less than 5 >> minutes, everything is working fine. >> Am I right? >> >> Anyway thank you very much for the detailed explanation! >> >> Best, >> >> Dongwon >> >> >> >> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski <pi...@data-artisans.com> >> wrote: >> >>> Hi, >>> >>> Please increase transaction.timeout.ms to a greater value or decrease >>> Flink’s checkpoint interval, I’m pretty sure the issue here is that those >>> two values are overlapping. I think that’s even visible on the screenshots. >>> First checkpoint completed started at 14:28:48 and ended at 14:30:43, while >>> the second one started at 14:45:53 and ended at 14:49:16. That gives you >>> minimal transaction duration of 15 minutes and 10 seconds, with maximal >>> transaction duration of 21 minutes. >>> >>> In HAPPY SCENARIO (without any failure and restarting), you should >>> assume that your timeout interval should cover with some safety margin the >>> period between start of a checkpoint and end of the NEXT checkpoint, since >>> this is the upper bound how long the transaction might be used. In your >>> case at least ~25 minutes. >>> >>> On top of that, as described in the docs, >>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance >>> , >>> in case of failure, your timeout must also be able to cover the additional >>> downtime required for the successful job restart. Thus you should increase >>> your timeout accordingly. >>> >>> Piotrek >>> >>> >>> On 20 Mar 2018, at 11:58, Dongwon Kim <eastcirc...@gmail.com> wrote: >>> >>> Hi Piotr, >>> >>> We have set producer's [transaction.timeout.ms] to 15 minutes and have >>> used the default setting for broker (15 mins). >>> As Flink's checkpoint interval is 15 minutes, it is not a situation >>> where Kafka's timeout is smaller than Flink's checkpoint interval. >>> As our first checkpoint just takes 2 minutes, it seems like transaction >>> is not committed properly. >>> >>> Best, >>> >>> - Dongwon >>> >>> >>> >>> >>> >>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <pi...@data-artisans.com >>> > wrote: >>> >>>> Hi, >>>> >>>> What’s your Kafka’s transaction timeout setting? Please both check >>>> Kafka producer configuration (transaction.timeout.ms property) and >>>> Kafka broker configuration. The most likely cause of such error message is >>>> when Kafka's timeout is smaller then Flink’s checkpoint interval and >>>> transactions are not committed quickly enough before timeout occurring. >>>> >>>> Piotrek >>>> >>>> On 17 Mar 2018, at 07:24, Dongwon Kim <eastcirc...@gmail.com> wrote: >>>> >>>> >>>> Hi, >>>> >>>> I'm faced with the following ProducerFencedException after 1st, 3rd, >>>> 5th, 7th, ... checkpoints: >>>> >>>> -- >>>> >>>> java.lang.RuntimeException: Error while confirming checkpoint >>>> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260) >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException: >>>> Producer attempted an operation with an old epoch. Either there is a newer >>>> producer with the same transactionalId, or the producer's transaction has >>>> been expired by the broker. >>>> >>>> -- >>>> >>>> >>>> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once >>>> processing using Kafka sink. >>>> We use FsStateBackend to store snapshot data on HDFS. >>>> >>>> As shown in configuration.png, my checkpoint configuration is: >>>> - Checkpointing Mode : Exactly Once >>>> - Interval : 15m 0s >>>> - Timeout : 10m 0s >>>> - Minimum Pause Between Checkpoints : 5m 0s >>>> - Maximum Concurrent Checkpoints : 1 >>>> - Persist Checkpoints Externally : Disabled >>>> >>>> After the first checkpoint completed [see history after 1st ckpt.png], >>>> the job is restarted due to the ProducerFencedException [see exception >>>> after 1st ckpt.png]. >>>> The first checkpoint takes less than 2 minutes while my checkpoint >>>> interval is 15m and minimum pause between checkpoints is 5m. >>>> After the job is restarted, the second checkpoint is triggered after a >>>> while [see history after 2nd ckpt.png] and this time I've got no exception. >>>> The third checkpoint results in the same exception as after the first >>>> checkpoint. >>>> >>>> Can anybody let me know what's going wrong behind the scene? >>>> >>>> Best, >>>> >>>> Dongwon >>>> <history after 3rd ckpt.png><exception after 3rd ckpt.png><history >>>> after 2nd ckpt.png><configuration.png><summary.png><exception after >>>> 1st ckpt.png><history after 1st ckpt.png> >>>> >>>> >>>> >>> >>> >> <screenshot_10min_ckpt.png> >> >> >> >