Hi Piotr,

Thanks a lot. I need exactly once in my use case, but instead of having the
risk of losing data, at least once is more acceptable when error occurred.

Best,
Tony Wei

Piotr Nowojski <pi...@data-artisans.com> 於 2019年8月12日 週一 下午3:27寫道:

> Hi,
>
> Yes, if it’s due to transaction timeout you will lose the data.
>
> Whether can you fallback to at least once, that depends on Kafka, not on
> Flink, since it’s the Kafka that timeouts those transactions and I don’t
> see in the documentation anything that could override this [1]. You might
> try disabling the mechanism via setting `
> transaction.abort.timed.out.transaction.cleanup.interval.ms` or `
> transaction.remove.expired.transaction.cleanup.interval.ms`, but that’s
> question more to Kafka guys. Maybe Becket could help with this.
>
> Also it MIGHT be that Kafka doesn’t remove records from the topics when
> aborting the transaction and MAYBE you can still access them via
> “READ_UNCOMMITTED” mode. But that’s again, question to Kafka.
>
> Sorry that I can not help more.
>
> If you do not care about exactly once, why don’t you just set the
> connector to at least once mode?
>
> Piotrek
>
> On 12 Aug 2019, at 06:29, Tony Wei <tony19920...@gmail.com> wrote:
>
> Hi,
>
> I had the same exception recently. I want to confirm that if it is due to
> transaction timeout,
> then I will lose those data. Am I right? Can I make it fall back to at
> least once semantic in
> this situation?
>
> Best,
> Tony Wei
>
> Piotr Nowojski <pi...@data-artisans.com> 於 2018年3月21日 週三 下午10:28寫道:
>
>> Hi,
>>
>> But that’s exactly the case: producer’s transaction timeout starts when
>> the external transaction starts - but FlinkKafkaProducer011 keeps an active
>> Kafka transaction for the whole period between checkpoints.
>>
>> As I wrote in the previous message:
>>
>> > in case of failure, your timeout must also be able to cover the
>> additional downtime required for the successful job restart. Thus you
>> should increase your timeout accordingly.
>>
>> I think that 15 minutes timeout is a way too small value. If your job
>> fails because of some intermittent failure (for example worker
>> crash/restart), you will only have a couple of minutes for a successful
>> Flink job restart. Otherwise you will lose some data (because of the
>> transaction timeouts).
>>
>> Piotrek
>>
>> On 21 Mar 2018, at 10:30, Dongwon Kim <eastcirc...@gmail.com> wrote:
>>
>> Hi Piotr,
>>
>> Now my streaming pipeline is working without retries.
>> I decreased Flink's checkpoint interval from 15min to 10min as you
>> suggested [see screenshot_10min_ckpt.png].
>>
>> I though that producer's transaction timeout starts when the external
>> transaction starts.
>> The truth is that Producer's transaction timeout starts after the last
>> external checkpoint is committed.
>> Now that I have 15min for Producer's transaction timeout and 10min for
>> Flink's checkpoint interval, and every checkpoint takes less than 5
>> minutes, everything is working fine.
>> Am I right?
>>
>> Anyway thank you very much for the detailed explanation!
>>
>> Best,
>>
>> Dongwon
>>
>>
>>
>> On Tue, Mar 20, 2018 at 8:10 PM, Piotr Nowojski <pi...@data-artisans.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Please increase transaction.timeout.ms to a greater value or decrease
>>> Flink’s checkpoint interval, I’m pretty sure the issue here is that those
>>> two values are overlapping. I think that’s even visible on the screenshots.
>>> First checkpoint completed started at 14:28:48 and ended at 14:30:43, while
>>> the second one started at 14:45:53 and ended at 14:49:16. That gives you
>>> minimal transaction duration of 15 minutes and 10 seconds, with maximal
>>> transaction duration of 21 minutes.
>>>
>>> In HAPPY SCENARIO (without any failure and restarting), you should
>>> assume that your timeout interval should cover with some safety margin the
>>> period between start of a checkpoint and end of the NEXT checkpoint, since
>>> this is the upper bound how long the transaction might be used. In your
>>> case at least ~25 minutes.
>>>
>>> On top of that, as described in the docs,
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance
>>>  ,
>>> in case of failure, your timeout must also be able to cover the additional
>>> downtime required for the successful job restart. Thus you should increase
>>> your timeout accordingly.
>>>
>>> Piotrek
>>>
>>>
>>> On 20 Mar 2018, at 11:58, Dongwon Kim <eastcirc...@gmail.com> wrote:
>>>
>>> Hi Piotr,
>>>
>>> We have set producer's [transaction.timeout.ms] to 15 minutes and have
>>> used the default setting for broker (15 mins).
>>> As Flink's checkpoint interval is 15 minutes, it is not a situation
>>> where Kafka's timeout is smaller than Flink's checkpoint interval.
>>> As our first checkpoint just takes 2 minutes, it seems like transaction
>>> is not committed properly.
>>>
>>> Best,
>>>
>>> - Dongwon
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Mar 20, 2018 at 6:32 PM, Piotr Nowojski <pi...@data-artisans.com
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> What’s your Kafka’s transaction timeout setting? Please both check
>>>> Kafka producer configuration (transaction.timeout.ms property) and
>>>> Kafka broker configuration. The most likely cause of such error message is
>>>> when Kafka's timeout is smaller then Flink’s checkpoint interval and
>>>> transactions are not committed quickly enough before timeout occurring.
>>>>
>>>> Piotrek
>>>>
>>>> On 17 Mar 2018, at 07:24, Dongwon Kim <eastcirc...@gmail.com> wrote:
>>>>
>>>>
>>>> Hi,
>>>>
>>>> I'm faced with the following ProducerFencedException after 1st, 3rd,
>>>> 5th, 7th, ... checkpoints:
>>>>
>>>> --
>>>>
>>>> java.lang.RuntimeException: Error while confirming checkpoint
>>>>    at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
>>>>    at 
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>    at 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>    at 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>    at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException: 
>>>> Producer attempted an operation with an old epoch. Either there is a newer 
>>>> producer with the same transactionalId, or the producer's transaction has 
>>>> been expired by the broker.
>>>>
>>>> --
>>>>
>>>>
>>>> FYI, I'm using Flink 1.4.0 and testing end-to-end exactly once
>>>> processing using Kafka sink.
>>>> We use FsStateBackend to store snapshot data on HDFS.
>>>>
>>>> As shown in configuration.png, my checkpoint configuration is:
>>>> - Checkpointing Mode : Exactly Once
>>>> - Interval : 15m 0s
>>>> - Timeout : 10m 0s
>>>> - Minimum Pause Between Checkpoints : 5m 0s
>>>> - Maximum Concurrent Checkpoints : 1
>>>> - Persist Checkpoints Externally : Disabled
>>>>
>>>> After the first checkpoint completed [see history after 1st ckpt.png],
>>>> the job is restarted due to the ProducerFencedException [see exception
>>>> after 1st ckpt.png].
>>>> The first checkpoint takes less than 2 minutes while my checkpoint
>>>> interval is 15m and minimum pause between checkpoints is 5m.
>>>> After the job is restarted, the second checkpoint is triggered after a
>>>> while [see history after 2nd ckpt.png] and this time I've got no exception.
>>>> The third checkpoint results in the same exception as after the first
>>>> checkpoint.
>>>>
>>>> Can anybody let me know what's going wrong behind the scene?
>>>>
>>>> Best,
>>>>
>>>> Dongwon
>>>> <history after 3rd ckpt.png><exception after 3rd ckpt.png><history
>>>> after 2nd ckpt.png><configuration.png><summary.png><exception after
>>>> 1st ckpt.png><history after 1st ckpt.png>
>>>>
>>>>
>>>>
>>>
>>>
>> <screenshot_10min_ckpt.png>
>>
>>
>>
>

Reply via email to