Hello

Thanks for advice, I have looked into examples and spotted yet another
place that is not clear to me. It is resetToLastCommittedPositions (in case
of abort). I am wondering what will happen when this method crashes, so it
will not reset consumer to latest committed offsets. Let's say the process
is killed in that instant. In my case this will schedule a new process that
creates a new consumer (assume rebalance will assign him the same
partitions, nobody else touched them). In such cases the offsets will be
shifted or not ?

I am wondering where is the place in the transaction schema where consumer
offsets are committed to kafka.

Thank you

Peter

On Fri, Feb 19, 2021 at 9:23 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Peter,
>
> Note that when you upgrade from 2.4 to later versions in Kafka, your error
> handling could be modified and simplified a bit as well. You can read the
> example code in KIP-691 as a reference:
>
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-691*3A*Enhance*Transactional*Producer*Exception*Handling__;JSsrKysr!!NCc8flgU!IcJMrFZbaTy6WKg7C9KUzh2MsRne_VKrStjpn4MNlzSaDi5-5VUV2NIntQ7pDQ$
>
>
> Guozhang
>
> On Fri, Feb 19, 2021 at 1:23 AM Peter Cipov <pci...@twilio.com.invalid>
> wrote:
>
> > This was really helpful.
> >
> > Thank you
> >
> > On Thu, Feb 18, 2021 at 8:08 PM Boyang Chen <reluctanthero...@gmail.com>
> > wrote:
> >
> > > Thanks for the question. I think Gary provided an excellent answer.
> > > Additionally, you could check out the code example
> > > <
> > >
> >
> https://urldefense.com/v3/__https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java*L132__;Iw!!NCc8flgU!LgslseBAQdNQtCsX-YGUPoAM1BQ4jZYO6p5X9Tjnzow_pVJrKZA9dJkdRzqerg$
> > > >
> > > for EOS, which shows you how to reset the state while aborting ongoing
> > > transactions.
> > >
> > > On Thu, Feb 18, 2021 at 11:01 AM Gary Russell <gruss...@vmware.com>
> > wrote:
> > >
> > > > You have to perform seeks (using the consumer) to the lowest
> > unprocessed
> > > > offset for each partition returned by the poll, before the next poll.
> > > > ________________________________
> > > > From: Peter Cipov <pci...@twilio.com.INVALID>
> > > > Sent: Thursday, February 18, 2021 1:20 PM
> > > > To: users@kafka.apache.org <users@kafka.apache.org>
> > > > Subject: Abort transaction semantics
> > > >
> > > > Hello
> > > > I have a question regarding aborting transactions in kafka client
> > 2.4.1.
> > > >
> > > > lets have following code :
> > > >
> > > > ... propper transaction producer consumer creation, consumer
> > autocommit =
> > > > false
> > > >
> > > > producer.transactionInit();
> > > >
> > > > while(true) {
> > > >   records = consumer.poll();
> > > >   logRecordOffsets(records)
> > > >   producer.beginTransaction()
> > > >   try {
> > > >    doMagic()
> > > >   } catch{
> > > >     producer.AbortTransaction();
> > > >     continue;
> > > >   }
> > > >   producer.sendOffsets(..);
> > > >   producer.commitTransaction()
> > > > }
> > > >
> > > > When doMagic crashes for some reason, abort is called and code will
> > start
> > > > from beginning with doing poll.
> > > >
> > > > Our assumption was that the next poll will start from the same
> offsets,
> > > but
> > > > as we saw from logs this is not the case. What we observed that
> offsets
> > > are
> > > > shifted and messages are lost, they will not be retried again.
> > > >
> > > > What is the semantics for abort, we could not figure out from
> > > > documentation.
> > > > What is the recommended approach for retrying ?
> > > >
> > > > Thank you
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to