Hello Peter, Note that when you upgrade from 2.4 to later versions in Kafka, your error handling could be modified and simplified a bit as well. You can read the example code in KIP-691 as a reference: https://cwiki.apache.org/confluence/display/KAFKA/KIP-691%3A+Enhance+Transactional+Producer+Exception+Handling
Guozhang On Fri, Feb 19, 2021 at 1:23 AM Peter Cipov <pci...@twilio.com.invalid> wrote: > This was really helpful. > > Thank you > > On Thu, Feb 18, 2021 at 8:08 PM Boyang Chen <reluctanthero...@gmail.com> > wrote: > > > Thanks for the question. I think Gary provided an excellent answer. > > Additionally, you could check out the code example > > < > > > https://urldefense.com/v3/__https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java*L132__;Iw!!NCc8flgU!LgslseBAQdNQtCsX-YGUPoAM1BQ4jZYO6p5X9Tjnzow_pVJrKZA9dJkdRzqerg$ > > > > > for EOS, which shows you how to reset the state while aborting ongoing > > transactions. > > > > On Thu, Feb 18, 2021 at 11:01 AM Gary Russell <gruss...@vmware.com> > wrote: > > > > > You have to perform seeks (using the consumer) to the lowest > unprocessed > > > offset for each partition returned by the poll, before the next poll. > > > ________________________________ > > > From: Peter Cipov <pci...@twilio.com.INVALID> > > > Sent: Thursday, February 18, 2021 1:20 PM > > > To: users@kafka.apache.org <users@kafka.apache.org> > > > Subject: Abort transaction semantics > > > > > > Hello > > > I have a question regarding aborting transactions in kafka client > 2.4.1. > > > > > > lets have following code : > > > > > > ... propper transaction producer consumer creation, consumer > autocommit = > > > false > > > > > > producer.transactionInit(); > > > > > > while(true) { > > > records = consumer.poll(); > > > logRecordOffsets(records) > > > producer.beginTransaction() > > > try { > > > doMagic() > > > } catch{ > > > producer.AbortTransaction(); > > > continue; > > > } > > > producer.sendOffsets(..); > > > producer.commitTransaction() > > > } > > > > > > When doMagic crashes for some reason, abort is called and code will > start > > > from beginning with doing poll. > > > > > > Our assumption was that the next poll will start from the same offsets, > > but > > > as we saw from logs this is not the case. What we observed that offsets > > are > > > shifted and messages are lost, they will not be retried again. > > > > > > What is the semantics for abort, we could not figure out from > > > documentation. > > > What is the recommended approach for retrying ? > > > > > > Thank you > > > > > > -- -- Guozhang