Hello Peter, The resetToLastCommittedPositions is for cases where the app can still up and running, as a convenient way to continue the processing; if the app has crashed, then upon restarting it would try to reset its position by reading the committed offsets, which would be corresponding to the last succeeded transaction and hence would not include the last failed txn right before the crash.
Or you can think about that in another way: whenever the transaction need to be aborted, instead of following the example tutorial we provide, one can also pursue another more-extreme way which is "always system.exit(1) immediately, and then try to restart the app" (:P) Which would basically fall to the crash scenario as well and is still correct --- you just pay more perf hit as crashing and restarting the app. Guozhang On Tue, Mar 9, 2021 at 3:06 AM Peter Cipov <pci...@twilio.com.invalid> wrote: > Hello > > Thanks for advice, I have looked into examples and spotted yet another > place that is not clear to me. It is resetToLastCommittedPositions (in case > of abort). I am wondering what will happen when this method crashes, so it > will not reset consumer to latest committed offsets. Let's say the process > is killed in that instant. In my case this will schedule a new process that > creates a new consumer (assume rebalance will assign him the same > partitions, nobody else touched them). In such cases the offsets will be > shifted or not ? > > I am wondering where is the place in the transaction schema where consumer > offsets are committed to kafka. > > Thank you > > Peter > > On Fri, Feb 19, 2021 at 9:23 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Peter, > > > > Note that when you upgrade from 2.4 to later versions in Kafka, your > error > > handling could be modified and simplified a bit as well. You can read the > > example code in KIP-691 as a reference: > > > > > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-691*3A*Enhance*Transactional*Producer*Exception*Handling__;JSsrKysr!!NCc8flgU!IcJMrFZbaTy6WKg7C9KUzh2MsRne_VKrStjpn4MNlzSaDi5-5VUV2NIntQ7pDQ$ > > > > > > Guozhang > > > > On Fri, Feb 19, 2021 at 1:23 AM Peter Cipov <pci...@twilio.com.invalid> > > wrote: > > > > > This was really helpful. > > > > > > Thank you > > > > > > On Thu, Feb 18, 2021 at 8:08 PM Boyang Chen < > reluctanthero...@gmail.com> > > > wrote: > > > > > > > Thanks for the question. I think Gary provided an excellent answer. > > > > Additionally, you could check out the code example > > > > < > > > > > > > > > > https://urldefense.com/v3/__https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java*L132__;Iw!!NCc8flgU!LgslseBAQdNQtCsX-YGUPoAM1BQ4jZYO6p5X9Tjnzow_pVJrKZA9dJkdRzqerg$ > > > > > > > > > for EOS, which shows you how to reset the state while aborting > ongoing > > > > transactions. > > > > > > > > On Thu, Feb 18, 2021 at 11:01 AM Gary Russell <gruss...@vmware.com> > > > wrote: > > > > > > > > > You have to perform seeks (using the consumer) to the lowest > > > unprocessed > > > > > offset for each partition returned by the poll, before the next > poll. > > > > > ________________________________ > > > > > From: Peter Cipov <pci...@twilio.com.INVALID> > > > > > Sent: Thursday, February 18, 2021 1:20 PM > > > > > To: users@kafka.apache.org <users@kafka.apache.org> > > > > > Subject: Abort transaction semantics > > > > > > > > > > Hello > > > > > I have a question regarding aborting transactions in kafka client > > > 2.4.1. > > > > > > > > > > lets have following code : > > > > > > > > > > ... propper transaction producer consumer creation, consumer > > > autocommit = > > > > > false > > > > > > > > > > producer.transactionInit(); > > > > > > > > > > while(true) { > > > > > records = consumer.poll(); > > > > > logRecordOffsets(records) > > > > > producer.beginTransaction() > > > > > try { > > > > > doMagic() > > > > > } catch{ > > > > > producer.AbortTransaction(); > > > > > continue; > > > > > } > > > > > producer.sendOffsets(..); > > > > > producer.commitTransaction() > > > > > } > > > > > > > > > > When doMagic crashes for some reason, abort is called and code will > > > start > > > > > from beginning with doing poll. > > > > > > > > > > Our assumption was that the next poll will start from the same > > offsets, > > > > but > > > > > as we saw from logs this is not the case. What we observed that > > offsets > > > > are > > > > > shifted and messages are lost, they will not be retried again. > > > > > > > > > > What is the semantics for abort, we could not figure out from > > > > > documentation. > > > > > What is the recommended approach for retrying ? > > > > > > > > > > Thank you > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang