… I just realized my former answer went only to Kevin directly.
(Still exercising with the mail list 😊 )

This was my original answer, there is a workaround until the respective ticket 
is solved.

Hope this helps

Thias

From: Schwalbe Matthias
Sent: Freitag, 30. Juli 2021 08:13
To: Kevin Lam <kevin....@shopify.com>
Subject: RE: Error using KafkaProducer EXACTLY_ONCE semantic + TaskManager 
Failure

Hi Kevin,

I’ve had this exactly the same way. It has got to do with a regression bug in 
FlinkKafkaInternalProducer#resumeTransaction<https://github.com/apache/flink/blob/f06faf13930f2e8acccf1e04e2c250b85bdbf48e/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java#L229>
 that surfaces for too new versions of Kafka client.

I’ve created a bug ticket: https://issues.apache.org/jira/browse/FLINK-23509 
and there propose a work around until the problem is fixed.
Also these references are related to the same problem:
https://issues.apache.org/jira/browse/FLINK-16419, and 
https://lists.apache.org/list.html?user@flink.apache.org:lte=2M:recover%20from%20svaepoint<https://lists.apache.org/list.html?user%40flink.apache.org:lte=2M:recover%20from%20svaepoint>

Feel free to get back to me for clarifications 😊

Thias

From: Kevin Lam <kevin....@shopify.com<mailto:kevin....@shopify.com>>
Sent: Donnerstag, 29. Juli 2021 19:32
To: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Error using KafkaProducer EXACTLY_ONCE semantic + TaskManager Failure

Hi user@,

We're developing a Flink application, and using the 
FlinkKafkaProducer.Semantic.EXACTLY_ONCE producer semantic to output records to 
a Kafka topic in an exactly-once way. We run our flink application on 
kubernetes.

I've observed that if a task manager fails (I've simulated this by killing a 
task-manager pod), the job will not recover or restore from the most recent 
checkpoint, and will instead enter a crash loop with the following types of 
errors [0]. If I try the same experiment (run the job, kill a task manager pod) 
with the AT_LEAST_ONCE semantic, the job recovers using the most recent 
checkpoint as expected.

We've set the transaction.timeout.ms<http://transaction.timeout.ms> to be 1 
hour on both the broker and producer side.

Any insights into what we could be doing wrong or what's going on are 
appreciated.

Thanks in advance!

[0]: ```
2021-07-28 16:55:32
org.apache.kafka.common.KafkaException: Unexpected error in 
InitProducerIdResponse; Producer attempted an operation with an old epoch. 
Either there is a newer producer with the same transactionalId, or the 
producer's transaction has been expired by the broker.
at 
org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at java.base/java.lang.Thread.run(Unknown Source)
```
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to