Hi Lu,

AFAIK, it's not going to be fixed. As you mentioned in the first email,
Kafka should be configured so that it's transaction timeout is less than
your max checkpoint duration.

However, you should not only change transaction.timeout.ms in producer but
also transaction.max.timeout.ms on your brokers.
Please refer to
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#caveats

Regards,
Roman


On Wed, Aug 5, 2020 at 12:24 AM Lu Niu <qqib...@gmail.com> wrote:

> Hi, Khachatryan
>
> Thank you for the reply. Is that a problem that can be fixed? If so, is
> the fix on roadmap? Thanks!
>
> Best
> Lu
>
> On Tue, Aug 4, 2020 at 1:24 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Lu,
>>
>> Yes, this error indicates data loss (unless there were no records in the
>> transactions).
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Aug 3, 2020 at 9:14 PM Lu Niu <qqib...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> We are using end to end exact-once flink + kafka and
>>> encountered belowing exception which usually came after checkpoint failures:
>>> ```
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *Caused by: org.apache.kafka.common.errors.ProducerFencedException:
>>> Producer attempted an operation with an old epoch. Either there is a newer
>>> producer with the same transactionalId, or the producer's transaction has
>>> been expired by the broker.2020-07-28 16:27:51,633 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job xxx
>>> (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to
>>> FAILING.java.lang.RuntimeException: Error while confirming checkpoint at
>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219) at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)Caused by:
>>> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
>>> failed, logging first encountered failure at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842)
>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214) ... 5
>>> more*
>>> ```
>>> We did some end to end tests and noticed whenever such a thing happens,
>>> there will be a data loss.
>>>
>>> Referring to several related questions, I understand I need to increase `
>>> transaction.timeout.ms`  because:
>>> ```
>>> *Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions
>>> that were started before taking a checkpoint, after recovering from the
>>> said checkpoint. If the time between Flink application crash and completed
>>> restart is larger than Kafka’s transaction timeout there will be data loss
>>> (Kafka will automatically abort transactions that exceeded timeout time).*
>>> ```
>>>
>>> But I want to confirm with the community that:
>>> *Does an exception like this will always lead to data loss? *
>>>
>>> I asked because we get this exception sometimes even when the checkpoint
>>> succeeds.
>>>
>>> Setup:
>>> Flink 1.9.1
>>>
>>> Best
>>> Lu
>>>
>>

Reply via email to