Hi Nitty,

> I called commitTransaction when I reach the first error record, but
commit is not happening for me. Kafka connect tries to abort the
transaction automatically

This is really interesting--are you certain that your task never invoked
TransactionContext::abortTransaction in this case? I'm looking over the
code base and it seems fairly clear that the only thing that could trigger
a call to KafkaProducer::abortTransaction is a request by the task to abort
a transaction (either for a next batch, or for a specific record). It may
help to run the connector in a debugger and/or look for "Aborting
transaction for batch as requested by connector" or "Aborting transaction
for record on topic <TOPIC NAME HERE> as requested by connector" log
messages (which will be emitted at INFO level by
the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask class if
the task is requesting an abort).

Regardless, I'll work on a fix for the bug with aborting empty
transactions. Thanks for helping uncover that one!

Cheers,

Chris

On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY <nittybe...@gmail.com> wrote:

> Hi Chris,
>
> We have a use case to commit previous successful records and stop the
> processing of the current file and move on with the next file. To achieve
> that I called commitTransaction when I reach the first error record, but
> commit is not happening for me. Kafka connect tries to abort the
> transaction automatically, I checked the _transaction_state topic and
> states marked as PrepareAbort and CompleteAbort. Do you know why kafka
> connect automatically invokes abort instead of the implicit commit I
> called?
> Then as a result, when I tries to parse the next file - say ABC, I saw the
> logs "Aborting incomplete transaction" and ERROR: "Failed to sent record to
> topic", and we lost the first batch of records from the current transaction
> in the file ABC.
>
> Is it possible that there's a case where an abort is being requested while
> the current transaction is empty (i.e., the task hasn't returned any
> records from SourceTask::poll since the last transaction was
> committed/aborted)? --- Yes, that case is possible for us. There is a case
> where the first record itself an error record.
>
> Thanks,
> Nitty
>
> On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton <chr...@aiven.io.invalid>
> wrote:
>
> > Hi Nitty,
> >
> > Thanks for the code examples and the detailed explanations, this is
> really
> > helpful!
> >
> > > Say if I have a file with 5 records and batch size is 2, and in my 3rd
> > batch I have one error record then in that batch, I dont have a valid
> > record to call commit or abort. But I want to commit all the previous
> > batches that were successfully parsed. How do I do that?
> >
> > An important thing to keep in mind with the TransactionContext API is
> that
> > all records that a task returns from SourceTask::poll are implicitly
> > included in a transaction. Invoking SourceTaskContext::transactionContext
> > doesn't alter this or cause transactions to start being used; everything
> is
> > already in a transaction, and the Connect runtime automatically begins
> > transactions for any records it sees from the task if it hasn't already
> > begun one. It's also valid to return a null or empty list of records from
> > SourceTask::poll. So in this case, you can invoke
> > transactionContext.commitTransaction() (the no-args variant) and return
> an
> > empty batch from SourceTask::poll, which will cause the transaction
> > containing the 4 valid records that were returned in the last 2 batches
> to
> > be committed.
> >
> > FWIW, I would be a little cautious about this approach. Many times it's
> > better to fail fast on invalid data; it might be worth it to at least
> allow
> > users to configure whether the connector fails on invalid data, or
> silently
> > skips over it (which is what happens when transactions are aborted).
> >
> > > Why is abort not working without adding the last record to the list?
> >
> > Is it possible that there's a case where an abort is being requested
> while
> > the current transaction is empty (i.e., the task hasn't returned any
> > records from SourceTask::poll since the last transaction was
> > committed/aborted)? I think this may be a bug in the Connect framework
> > where we don't check to see if a transaction is already open when a task
> > requests that a transaction be aborted, which can cause tasks to fail
> (see
> > https://issues.apache.org/jira/browse/KAFKA-14799 for more details).
> >
> > Cheers,
> >
> > Chris
> >
> >
> > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY <nittybe...@gmail.com> wrote:
> >
> > > Hi Chris,
> > >
> > > I am not sure if you are able to see the images I shared with you .
> > > Copying the code snippet below,
> > >
> > >  if (expectedRecordCount >= 0) {
> > >             int missingCount = expectedRecordCount - (int) this.
> > > recordOffset() - 1;
> > >             if (missingCount > 0) {
> > >               if (transactionContext != null) {
> > >                 isMissedRecords = true;
> > >               } else {
> > >                 throw new DataException(String.format("Missing %d
> records
> > > (expecting %d, actual %d)", missingCount, expectedRecordCount, this.
> > > recordOffset()));
> > >               }
> > >             } else if (missingCount < 0) {
> > >               if (transactionContext != null) {
> > >                 isMissedRecords = true;
> > >               } else {
> > >                 throw new DataException(String.format("Too many records
> > > (expecting %d, actual %d)", expectedRecordCount, this.recordOffset()));
> > >               }
> > >             }
> > >           }
> > >           addLastRecord(records, null, value);
> > >         }
> > >
> > >
> > >
> > >  //asn1 or binary abort
> > >         if((config.parseErrorThreshold != null && parseErrorCount >=
> > > config.parseErrorThreshold
> > >         && lastbatch && transactionContext != null) || (isMissedRecords
> > > && transactionContext != null && lastbatch)) {
> > >           log.info("Transaction is aborting");
> > >             log.info("records = {}", records);
> > >             if (!records.isEmpty()) {
> > >               log.info("with record");
> > >
> >  transactionContext.abortTransaction(records.get(records.size
> > > ()-1));
> > >             } else {
> > >               log.info("without record");
> > >               transactionContext.abortTransaction();
> > >             }
> > >
> > > Thanks,
> > > Nitty
> > >
> > > On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY <nittybe...@gmail.com>
> > wrote:
> > >
> > >> Hi Chris,
> > >> Sorry for the typo in my previous email.
> > >>
> > >> Regarding the point 2,* the task returns a batch of records from
> > >> SourceTask::poll (and, if using*
> > >>
> > >>
> > >> *the per-record API provided by the TransactionContext class, includes
> > >> atleast one record that should trigger a transaction commit/abort in
> > >> thatbatch)*
> > >> What if I am using the API without passing a record? We have 2 types
> of
> > >> use cases, one where on encountering an error record, we want to
> commit
> > >> previous successful batches and disregard the failed record and
> upcoming
> > >> batches. In this case we created the transactionContext just before
> > reading
> > >> the file (file is our transaction boundary).Say if I have a file with
> 5
> > >> records and batch size is 2, and in my 3rd batch I have one error
> record
> > >> then in that batch, I dont have a valid record to call commit or
> abort.
> > But
> > >> I want to commit all the previous batches that were successfully
> parsed.
> > >> How do I do that?
> > >>
> > >> Second use case is where I want to abort a transaction if the record
> > >> count doesn't match.
> > >> Code Snippet :
> > >> [image: image.png]
> > >> There are no error records in this case. If you see I added the
> > condition
> > >> of transactionContext check to implement exactly once, without
> > >> transaction it was just throwing the exception without calling the
> > >> addLastRecord() method and in the catch block it just logs the message
> > and
> > >> return the list of records without the last record to poll().To make
> it
> > >> work, I called the method addLastRecord() in this case, so it is not
> > >> throwing the exception and list has last record as well. Then I called
> > the
> > >> abort, everything got aborted. Why is abort not working without adding
> > the
> > >> last record to the list?
> > >> [image: image.png]
> > >>
> > >> Code to call abort.
> > >>
> > >>
> > >>
> > >>
> > >> Thanks,
> > >> Nitty
> > >>
> > >> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton <chr...@aiven.io.invalid
> >
> > >> wrote:
> > >>
> > >>> Hi Nitty,
> > >>>
> > >>> I'm a little confused about what you mean by this part:
> > >>>
> > >>> > transaction is not getting completed because it is not commiting
> the
> > >>> transaction offest.
> > >>>
> > >>> The only conditions required for a transaction to be completed when a
> > >>> connector is defining its own transaction boundaries are:
> > >>>
> > >>> 1. The task requests a transaction commit/abort from the
> > >>> TransactionContext
> > >>> 2. The task returns a batch of records from SourceTask::poll (and, if
> > >>> using
> > >>> the per-record API provided by the TransactionContext class, includes
> > at
> > >>> least one record that should trigger a transaction commit/abort in
> that
> > >>> batch)
> > >>>
> > >>> The Connect runtime should automatically commit source offsets to
> Kafka
> > >>> whenever a transaction is completed, either by commit or abort. This
> is
> > >>> because transactions should only be aborted for data that should
> never
> > be
> > >>> re-read by the connector; if there is a validation error that should
> be
> > >>> handled by reconfiguring the connector, then the task should throw an
> > >>> exception instead of aborting the transaction.
> > >>>
> > >>> If possible, do you think you could provide a brief code snippet
> > >>> illustrating what your task is doing that's causing issues?
> > >>>
> > >>> Cheers,
> > >>>
> > >>> Chris (not Chrise 🙂)
> > >>>
> > >>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY <nittybe...@gmail.com>
> > >>> wrote:
> > >>>
> > >>> > Hi Chrise,
> > >>> >
> > >>> > Thanks for sharing the details.
> > >>> >
> > >>> > Regarding the use case, For Asn1 source connector we have a use
> case
> > to
> > >>> > validate number of records in the file with the number of records
> in
> > >>> the
> > >>> > header. So currently, if validation fails we are not sending the
> last
> > >>> > record to the topic. But after introducing exactly once with
> > connector
> > >>> > transaction boundary, I can see that if I call an abort when the
> > >>> validation
> > >>> > fails, transaction is not getting completed because it is not
> > >>> commiting the
> > >>> > transaction offest. I saw that transaction state changed to
> > >>> CompleteAbort.
> > >>> > So for my next transaction I am getting
> InvalidProducerEpochException
> > >>> and
> > >>> > then task stopped after that. I tried calling the abort after
> sending
> > >>> last
> > >>> > record to the topic then transaction getting completed.
> > >>> >
> > >>> > I dont know if I am doing anything wrong here.
> > >>> >
> > >>> > Please advise.
> > >>> > Thanks,
> > >>> > Nitty
> > >>> >
> > >>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton
> > <chr...@aiven.io.invalid
> > >>> >
> > >>> > wrote:
> > >>> >
> > >>> > > Hi Nitty,
> > >>> > >
> > >>> > > We've recently added some documentation on implementing
> > exactly-once
> > >>> > source
> > >>> > > connectors here:
> > >>> > >
> > >>> >
> > >>>
> >
> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
> > >>> > > .
> > >>> > > To quote a relevant passage from those docs:
> > >>> > >
> > >>> > > > In order for a source connector to take advantage of this
> > support,
> > >>> it
> > >>> > > must be able to provide meaningful source offsets for each record
> > >>> that it
> > >>> > > emits, and resume consumption from the external system at the
> exact
> > >>> > > position corresponding to any of those offsets without dropping
> or
> > >>> > > duplicating messages.
> > >>> > >
> > >>> > > So, as long as your source connector is able to use the Kafka
> > Connect
> > >>> > > framework's offsets API correctly, it shouldn't be necessary to
> > make
> > >>> any
> > >>> > > other code changes to the connector.
> > >>> > >
> > >>> > > To enable exactly-once support for source connectors on your
> > Connect
> > >>> > > cluster, see the docs section here:
> > >>> > >
> https://kafka.apache.org/documentation/#connect_exactlyoncesource
> > >>> > >
> > >>> > > With regard to transactions, a transactional producer is always
> > >>> created
> > >>> > > automatically for your connector by the Connect runtime when
> > >>> exactly-once
> > >>> > > support is enabled on the worker. The only reason to set
> > >>> > > "transaction.boundary" to "connector" is if your connector would
> > >>> like to
> > >>> > > explicitly define its own transaction boundaries. In this case,
> it
> > >>> sounds
> > >>> > > like may be what you want; I just want to make sure to call out
> > that
> > >>> in
> > >>> > > either case, you should not be directly instantiating a producer
> in
> > >>> your
> > >>> > > connector code, but let the Kafka Connect runtime do that for
> you,
> > >>> and
> > >>> > just
> > >>> > > worry about returning the right records from SourceTask::poll
> (and
> > >>> > possibly
> > >>> > > defining custom transactions using the TransactionContext API).
> > >>> > >
> > >>> > > With respect to your question about committing or aborting in
> > certain
> > >>> > > circumstances, it'd be useful to know more about your use case,
> > >>> since it
> > >>> > > may not be necessary to define custom transaction boundaries in
> > your
> > >>> > > connector at all.
> > >>> > >
> > >>> > > Cheers,
> > >>> > >
> > >>> > > Chris
> > >>> > >
> > >>> > >
> > >>> > >
> > >>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY <nittybe...@gmail.com
> >
> > >>> wrote:
> > >>> > >
> > >>> > > > Hi Team,
> > >>> > > >
> > >>> > > > Adding on top of this, I tried creating a TransactionContext
> > >>> object and
> > >>> > > > calling the commitTransaction and abortTranaction methods in
> > source
> > >>> > > > connectors.
> > >>> > > > But the main problem I saw is that if there is any error while
> > >>> parsing
> > >>> > > the
> > >>> > > > record, connect is calling an abort but we have a use case to
> > call
> > >>> > commit
> > >>> > > > in some cases. Is it a valid use case in terms of kafka
> connect?
> > >>> > > >
> > >>> > > > Another Question - Should I use a transactional producer
> instead
> > >>> > > > creating an object of TransactionContext? Below is the
> connector
> > >>> > > > configuration I am using.
> > >>> > > >
> > >>> > > >   exactly.once.support: "required"
> > >>> > > >   transaction.boundary: "connector"
> > >>> > > >
> > >>> > > > Could you please help me here?
> > >>> > > >
> > >>> > > > Thanks,
> > >>> > > > Nitty
> > >>> > > >
> > >>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY <
> > nittybe...@gmail.com>
> > >>> > > wrote:
> > >>> > > >
> > >>> > > > > Hi Team,
> > >>> > > > > I am trying to implement exactly once behavior in our source
> > >>> > connector.
> > >>> > > > Is
> > >>> > > > > there any sample source connector implementation available to
> > >>> have a
> > >>> > > look
> > >>> > > > > at?
> > >>> > > > > Regards,
> > >>> > > > > Nitty
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>> >
> > >>>
> > >>
> >
>

Reply via email to