Hi Lu,

In this case, as it looks from the quite fragmented log/error message that
you posted, the job has failed so Flink indeed detected some issue and that
probably means a data loss in Kafka (in such case you could probably
recover some lost records by reading with `read_uncommitted` mode from
Kafka, but that can leads to data duplication).

However a very similar error can be logged by Flink as WARN during
recovery. In that case it can mean either:
- data loss because of timeouts (keep in mind that kafka transactional
timeouts must cover: checkpoint interval + downtime during the failure +
time to restart and recover Flink job)
- transaction was already committed before, just before failure has happened

and there is unfortunately no way using Kafka API to distinguish those two
cases.

Piotrek


śr., 5 sie 2020 o 10:17 Khachatryan Roman <khachatryan.ro...@gmail.com>
napisał(a):

> Hi Lu,
>
> AFAIK, it's not going to be fixed. As you mentioned in the first email,
> Kafka should be configured so that it's transaction timeout is less than
> your max checkpoint duration.
>
> However, you should not only change transaction.timeout.ms in producer
> but also transaction.max.timeout.ms on your brokers.
> Please refer to
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#caveats
>
> Regards,
> Roman
>
>
> On Wed, Aug 5, 2020 at 12:24 AM Lu Niu <qqib...@gmail.com> wrote:
>
>> Hi, Khachatryan
>>
>> Thank you for the reply. Is that a problem that can be fixed? If so, is
>> the fix on roadmap? Thanks!
>>
>> Best
>> Lu
>>
>> On Tue, Aug 4, 2020 at 1:24 PM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi Lu,
>>>
>>> Yes, this error indicates data loss (unless there were no records in the
>>> transactions).
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Mon, Aug 3, 2020 at 9:14 PM Lu Niu <qqib...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> We are using end to end exact-once flink + kafka and
>>>> encountered belowing exception which usually came after checkpoint 
>>>> failures:
>>>> ```
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *Caused by: org.apache.kafka.common.errors.ProducerFencedException:
>>>> Producer attempted an operation with an old epoch. Either there is a newer
>>>> producer with the same transactionalId, or the producer's transaction has
>>>> been expired by the broker.2020-07-28 16:27:51,633 INFO
>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job xxx
>>>> (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to
>>>> FAILING.java.lang.RuntimeException: Error while confirming checkpoint at
>>>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219) at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
>>>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> at java.lang.Thread.run(Thread.java:748)Caused by:
>>>> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
>>>> failed, logging first encountered failure at
>>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842)
>>>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214) ... 5
>>>> more*
>>>> ```
>>>> We did some end to end tests and noticed whenever such a thing happens,
>>>> there will be a data loss.
>>>>
>>>> Referring to several related questions, I understand I need to increase
>>>> `transaction.timeout.ms`  because:
>>>> ```
>>>> *Semantic.EXACTLY_ONCE mode relies on the ability to commit
>>>> transactions that were started before taking a checkpoint, after recovering
>>>> from the said checkpoint. If the time between Flink application crash and
>>>> completed restart is larger than Kafka’s transaction timeout there will be
>>>> data loss (Kafka will automatically abort transactions that exceeded
>>>> timeout time).*
>>>> ```
>>>>
>>>> But I want to confirm with the community that:
>>>> *Does an exception like this will always lead to data loss? *
>>>>
>>>> I asked because we get this exception sometimes even when the
>>>> checkpoint succeeds.
>>>>
>>>> Setup:
>>>> Flink 1.9.1
>>>>
>>>> Best
>>>> Lu
>>>>
>>>

Reply via email to