I don’t think that this risk is there, I guess Fabian a quick response … see also the later comments in the ticket.
Thias From: Kevin Lam <kevin....@shopify.com> Sent: Dienstag, 3. August 2021 15:56 To: Schwalbe Matthias <matthias.schwa...@viseca.ch> Cc: user <user@flink.apache.org>; fabianp...@ververica.com Subject: Re: FW: Error using KafkaProducer EXACTLY_ONCE semantic + TaskManager Failure Thank you for the replies! Thias, I will look into that issue and the workaround. Fabian Paul had replied on https://issues.apache.org/jira/browse/FLINK-23509 mentioning that our workaround could result in data loss--is that a risk with the workaround? Fabian Paul, yes here is some more information: Checkpoint config (see [0] for more): Frequency: 2 minutes No concurrent checkpoints Checkpoint duration roughly Overall parallelism 13 - 70 depending on the operator The full exception available in the job manager logs and exception tab in the UI: ``` 2021-08-03 09:53:42 org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at java.base/java.lang.Thread.run(Unknown Source) ``` I saw that you have recently opened https://github.com/apache/flink/pull/16692, will this be fixing the issue? [0] [image.png] On Tue, Aug 3, 2021 at 8:27 AM Schwalbe Matthias <matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>> wrote: … I just realized my former answer went only to Kevin directly. (Still exercising with the mail list 😊 ) This was my original answer, there is a workaround until the respective ticket is solved. Hope this helps Thias From: Schwalbe Matthias Sent: Freitag, 30. Juli 2021 08:13 To: Kevin Lam <kevin....@shopify.com<mailto:kevin....@shopify.com>> Subject: RE: Error using KafkaProducer EXACTLY_ONCE semantic + TaskManager Failure Hi Kevin, I’ve had this exactly the same way. It has got to do with a regression bug in FlinkKafkaInternalProducer#resumeTransaction<https://github.com/apache/flink/blob/f06faf13930f2e8acccf1e04e2c250b85bdbf48e/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java#L229> that surfaces for too new versions of Kafka client. I’ve created a bug ticket: https://issues.apache.org/jira/browse/FLINK-23509 and there propose a work around until the problem is fixed. Also these references are related to the same problem: https://issues.apache.org/jira/browse/FLINK-16419, and https://lists.apache.org/list.html?user@flink.apache.org:lte=2M:recover%20from%20svaepoint<https://lists.apache.org/list.html?user%40flink.apache.org:lte=2M:recover%20from%20svaepoint> Feel free to get back to me for clarifications 😊 Thias From: Kevin Lam <kevin....@shopify.com<mailto:kevin....@shopify.com>> Sent: Donnerstag, 29. Juli 2021 19:32 To: user <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Error using KafkaProducer EXACTLY_ONCE semantic + TaskManager Failure Hi user@, We're developing a Flink application, and using the FlinkKafkaProducer.Semantic.EXACTLY_ONCE producer semantic to output records to a Kafka topic in an exactly-once way. We run our flink application on kubernetes. I've observed that if a task manager fails (I've simulated this by killing a task-manager pod), the job will not recover or restore from the most recent checkpoint, and will instead enter a crash loop with the following types of errors [0]. If I try the same experiment (run the job, kill a task manager pod) with the AT_LEAST_ONCE semantic, the job recovers using the most recent checkpoint as expected. We've set the transaction.timeout.ms<http://transaction.timeout.ms> to be 1 hour on both the broker and producer side. Any insights into what we could be doing wrong or what's going on are appreciated. Thanks in advance! [0]: ``` 2021-07-28 16:55:32 org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at java.base/java.lang.Thread.run(Unknown Source) ``` Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited. Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.