Hi user@,

We're developing a Flink application, and using the FlinkKafkaProducer.
Semantic.EXACTLY_ONCE producer semantic to output records to a Kafka topic
in an exactly-once way. We run our flink application on kubernetes.

I've observed that if a task manager fails (I've simulated this by killing
a task-manager pod), the job will not recover or restore from the most
recent checkpoint, and will instead enter a crash loop with the following
types of errors [0]. If I try the same experiment (run the job, kill a task
manager pod) with the AT_LEAST_ONCE semantic, the job recovers using the
most recent checkpoint as expected.

We've set the transaction.timeout.ms to be 1 hour on both the broker and
producer side.

Any insights into what we could be doing wrong or what's going on are
appreciated.

Thanks in advance!

[0]: ```
2021-07-28 16:55:32
org.apache.kafka.common.KafkaException: Unexpected error in
InitProducerIdResponse; Producer attempted an operation with an old epoch.
Either there is a newer producer with the same transactionalId, or the
producer's transaction has been expired by the broker.
at
org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
at
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
at
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
at
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at java.base/java.lang.Thread.run(Unknown Source)
```

Reply via email to