Hi user@, We're developing a Flink application, and using the FlinkKafkaProducer. Semantic.EXACTLY_ONCE producer semantic to output records to a Kafka topic in an exactly-once way. We run our flink application on kubernetes.
I've observed that if a task manager fails (I've simulated this by killing a task-manager pod), the job will not recover or restore from the most recent checkpoint, and will instead enter a crash loop with the following types of errors [0]. If I try the same experiment (run the job, kill a task manager pod) with the AT_LEAST_ONCE semantic, the job recovers using the most recent checkpoint as expected. We've set the transaction.timeout.ms to be 1 hour on both the broker and producer side. Any insights into what we could be doing wrong or what's going on are appreciated. Thanks in advance! [0]: ``` 2021-07-28 16:55:32 org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352) at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at java.base/java.lang.Thread.run(Unknown Source) ```