Hi, Khachatryan

Thank you for the reply. Is that a problem that can be fixed? If so, is the
fix on roadmap? Thanks!

Best
Lu

On Tue, Aug 4, 2020 at 1:24 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Lu,
>
> Yes, this error indicates data loss (unless there were no records in the
> transactions).
>
> Regards,
> Roman
>
>
> On Mon, Aug 3, 2020 at 9:14 PM Lu Niu <qqib...@gmail.com> wrote:
>
>> Hi,
>>
>> We are using end to end exact-once flink + kafka and encountered belowing
>> exception which usually came after checkpoint failures:
>> ```
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *Caused by: org.apache.kafka.common.errors.ProducerFencedException:
>> Producer attempted an operation with an old epoch. Either there is a newer
>> producer with the same transactionalId, or the producer's transaction has
>> been expired by the broker.2020-07-28 16:27:51,633 INFO
>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job xxx
>> (f08fc4b1edceb3705e2cb134a8ece73d) switched from state RUNNING to
>> FAILING.java.lang.RuntimeException: Error while confirming checkpoint at
>> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1219) at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
>> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)Caused by:
>> org.apache.flink.util.FlinkRuntimeException: Committing one of transactions
>> failed, logging first encountered failure at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:295)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:842)
>> at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1214) ... 5
>> more*
>> ```
>> We did some end to end tests and noticed whenever such a thing happens,
>> there will be a data loss.
>>
>> Referring to several related questions, I understand I need to increase `
>> transaction.timeout.ms`  because:
>> ```
>> *Semantic.EXACTLY_ONCE mode relies on the ability to commit transactions
>> that were started before taking a checkpoint, after recovering from the
>> said checkpoint. If the time between Flink application crash and completed
>> restart is larger than Kafka’s transaction timeout there will be data loss
>> (Kafka will automatically abort transactions that exceeded timeout time).*
>> ```
>>
>> But I want to confirm with the community that:
>> *Does an exception like this will always lead to data loss? *
>>
>> I asked because we get this exception sometimes even when the checkpoint
>> succeeds.
>>
>> Setup:
>> Flink 1.9.1
>>
>> Best
>> Lu
>>
>

Reply via email to