Hi Chris,

We have a use case to commit previous successful records and stop the
processing of the current file and move on with the next file. To achieve
that I called commitTransaction when I reach the first error record, but
commit is not happening for me. Kafka connect tries to abort the
transaction automatically, I checked the _transaction_state topic and
states marked as PrepareAbort and CompleteAbort. Do you know why kafka
connect automatically invokes abort instead of the implicit commit I called?
Then as a result, when I tries to parse the next file - say ABC, I saw the
logs "Aborting incomplete transaction" and ERROR: "Failed to sent record to
topic", and we lost the first batch of records from the current transaction
in the file ABC.

Is it possible that there's a case where an abort is being requested while
the current transaction is empty (i.e., the task hasn't returned any
records from SourceTask::poll since the last transaction was
committed/aborted)? --- Yes, that case is possible for us. There is a case
where the first record itself an error record.

Thanks,
Nitty

On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton <chr...@aiven.io.invalid>
wrote:

> Hi Nitty,
>
> Thanks for the code examples and the detailed explanations, this is really
> helpful!
>
> > Say if I have a file with 5 records and batch size is 2, and in my 3rd
> batch I have one error record then in that batch, I dont have a valid
> record to call commit or abort. But I want to commit all the previous
> batches that were successfully parsed. How do I do that?
>
> An important thing to keep in mind with the TransactionContext API is that
> all records that a task returns from SourceTask::poll are implicitly
> included in a transaction. Invoking SourceTaskContext::transactionContext
> doesn't alter this or cause transactions to start being used; everything is
> already in a transaction, and the Connect runtime automatically begins
> transactions for any records it sees from the task if it hasn't already
> begun one. It's also valid to return a null or empty list of records from
> SourceTask::poll. So in this case, you can invoke
> transactionContext.commitTransaction() (the no-args variant) and return an
> empty batch from SourceTask::poll, which will cause the transaction
> containing the 4 valid records that were returned in the last 2 batches to
> be committed.
>
> FWIW, I would be a little cautious about this approach. Many times it's
> better to fail fast on invalid data; it might be worth it to at least allow
> users to configure whether the connector fails on invalid data, or silently
> skips over it (which is what happens when transactions are aborted).
>
> > Why is abort not working without adding the last record to the list?
>
> Is it possible that there's a case where an abort is being requested while
> the current transaction is empty (i.e., the task hasn't returned any
> records from SourceTask::poll since the last transaction was
> committed/aborted)? I think this may be a bug in the Connect framework
> where we don't check to see if a transaction is already open when a task
> requests that a transaction be aborted, which can cause tasks to fail (see
> https://issues.apache.org/jira/browse/KAFKA-14799 for more details).
>
> Cheers,
>
> Chris
>
>
> On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY <nittybe...@gmail.com> wrote:
>
> > Hi Chris,
> >
> > I am not sure if you are able to see the images I shared with you .
> > Copying the code snippet below,
> >
> >  if (expectedRecordCount >= 0) {
> >             int missingCount = expectedRecordCount - (int) this.
> > recordOffset() - 1;
> >             if (missingCount > 0) {
> >               if (transactionContext != null) {
> >                 isMissedRecords = true;
> >               } else {
> >                 throw new DataException(String.format("Missing %d records
> > (expecting %d, actual %d)", missingCount, expectedRecordCount, this.
> > recordOffset()));
> >               }
> >             } else if (missingCount < 0) {
> >               if (transactionContext != null) {
> >                 isMissedRecords = true;
> >               } else {
> >                 throw new DataException(String.format("Too many records
> > (expecting %d, actual %d)", expectedRecordCount, this.recordOffset()));
> >               }
> >             }
> >           }
> >           addLastRecord(records, null, value);
> >         }
> >
> >
> >
> >  //asn1 or binary abort
> >         if((config.parseErrorThreshold != null && parseErrorCount >=
> > config.parseErrorThreshold
> >         && lastbatch && transactionContext != null) || (isMissedRecords
> > && transactionContext != null && lastbatch)) {
> >           log.info("Transaction is aborting");
> >             log.info("records = {}", records);
> >             if (!records.isEmpty()) {
> >               log.info("with record");
> >
>  transactionContext.abortTransaction(records.get(records.size
> > ()-1));
> >             } else {
> >               log.info("without record");
> >               transactionContext.abortTransaction();
> >             }
> >
> > Thanks,
> > Nitty
> >
> > On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY <nittybe...@gmail.com>
> wrote:
> >
> >> Hi Chris,
> >> Sorry for the typo in my previous email.
> >>
> >> Regarding the point 2,* the task returns a batch of records from
> >> SourceTask::poll (and, if using*
> >>
> >>
> >> *the per-record API provided by the TransactionContext class, includes
> >> atleast one record that should trigger a transaction commit/abort in
> >> thatbatch)*
> >> What if I am using the API without passing a record? We have 2 types of
> >> use cases, one where on encountering an error record, we want to commit
> >> previous successful batches and disregard the failed record and upcoming
> >> batches. In this case we created the transactionContext just before
> reading
> >> the file (file is our transaction boundary).Say if I have a file with 5
> >> records and batch size is 2, and in my 3rd batch I have one error record
> >> then in that batch, I dont have a valid record to call commit or abort.
> But
> >> I want to commit all the previous batches that were successfully parsed.
> >> How do I do that?
> >>
> >> Second use case is where I want to abort a transaction if the record
> >> count doesn't match.
> >> Code Snippet :
> >> [image: image.png]
> >> There are no error records in this case. If you see I added the
> condition
> >> of transactionContext check to implement exactly once, without
> >> transaction it was just throwing the exception without calling the
> >> addLastRecord() method and in the catch block it just logs the message
> and
> >> return the list of records without the last record to poll().To make it
> >> work, I called the method addLastRecord() in this case, so it is not
> >> throwing the exception and list has last record as well. Then I called
> the
> >> abort, everything got aborted. Why is abort not working without adding
> the
> >> last record to the list?
> >> [image: image.png]
> >>
> >> Code to call abort.
> >>
> >>
> >>
> >>
> >> Thanks,
> >> Nitty
> >>
> >> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton <chr...@aiven.io.invalid>
> >> wrote:
> >>
> >>> Hi Nitty,
> >>>
> >>> I'm a little confused about what you mean by this part:
> >>>
> >>> > transaction is not getting completed because it is not commiting the
> >>> transaction offest.
> >>>
> >>> The only conditions required for a transaction to be completed when a
> >>> connector is defining its own transaction boundaries are:
> >>>
> >>> 1. The task requests a transaction commit/abort from the
> >>> TransactionContext
> >>> 2. The task returns a batch of records from SourceTask::poll (and, if
> >>> using
> >>> the per-record API provided by the TransactionContext class, includes
> at
> >>> least one record that should trigger a transaction commit/abort in that
> >>> batch)
> >>>
> >>> The Connect runtime should automatically commit source offsets to Kafka
> >>> whenever a transaction is completed, either by commit or abort. This is
> >>> because transactions should only be aborted for data that should never
> be
> >>> re-read by the connector; if there is a validation error that should be
> >>> handled by reconfiguring the connector, then the task should throw an
> >>> exception instead of aborting the transaction.
> >>>
> >>> If possible, do you think you could provide a brief code snippet
> >>> illustrating what your task is doing that's causing issues?
> >>>
> >>> Cheers,
> >>>
> >>> Chris (not Chrise 🙂)
> >>>
> >>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY <nittybe...@gmail.com>
> >>> wrote:
> >>>
> >>> > Hi Chrise,
> >>> >
> >>> > Thanks for sharing the details.
> >>> >
> >>> > Regarding the use case, For Asn1 source connector we have a use case
> to
> >>> > validate number of records in the file with the number of records in
> >>> the
> >>> > header. So currently, if validation fails we are not sending the last
> >>> > record to the topic. But after introducing exactly once with
> connector
> >>> > transaction boundary, I can see that if I call an abort when the
> >>> validation
> >>> > fails, transaction is not getting completed because it is not
> >>> commiting the
> >>> > transaction offest. I saw that transaction state changed to
> >>> CompleteAbort.
> >>> > So for my next transaction I am getting InvalidProducerEpochException
> >>> and
> >>> > then task stopped after that. I tried calling the abort after sending
> >>> last
> >>> > record to the topic then transaction getting completed.
> >>> >
> >>> > I dont know if I am doing anything wrong here.
> >>> >
> >>> > Please advise.
> >>> > Thanks,
> >>> > Nitty
> >>> >
> >>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton
> <chr...@aiven.io.invalid
> >>> >
> >>> > wrote:
> >>> >
> >>> > > Hi Nitty,
> >>> > >
> >>> > > We've recently added some documentation on implementing
> exactly-once
> >>> > source
> >>> > > connectors here:
> >>> > >
> >>> >
> >>>
> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
> >>> > > .
> >>> > > To quote a relevant passage from those docs:
> >>> > >
> >>> > > > In order for a source connector to take advantage of this
> support,
> >>> it
> >>> > > must be able to provide meaningful source offsets for each record
> >>> that it
> >>> > > emits, and resume consumption from the external system at the exact
> >>> > > position corresponding to any of those offsets without dropping or
> >>> > > duplicating messages.
> >>> > >
> >>> > > So, as long as your source connector is able to use the Kafka
> Connect
> >>> > > framework's offsets API correctly, it shouldn't be necessary to
> make
> >>> any
> >>> > > other code changes to the connector.
> >>> > >
> >>> > > To enable exactly-once support for source connectors on your
> Connect
> >>> > > cluster, see the docs section here:
> >>> > > https://kafka.apache.org/documentation/#connect_exactlyoncesource
> >>> > >
> >>> > > With regard to transactions, a transactional producer is always
> >>> created
> >>> > > automatically for your connector by the Connect runtime when
> >>> exactly-once
> >>> > > support is enabled on the worker. The only reason to set
> >>> > > "transaction.boundary" to "connector" is if your connector would
> >>> like to
> >>> > > explicitly define its own transaction boundaries. In this case, it
> >>> sounds
> >>> > > like may be what you want; I just want to make sure to call out
> that
> >>> in
> >>> > > either case, you should not be directly instantiating a producer in
> >>> your
> >>> > > connector code, but let the Kafka Connect runtime do that for you,
> >>> and
> >>> > just
> >>> > > worry about returning the right records from SourceTask::poll (and
> >>> > possibly
> >>> > > defining custom transactions using the TransactionContext API).
> >>> > >
> >>> > > With respect to your question about committing or aborting in
> certain
> >>> > > circumstances, it'd be useful to know more about your use case,
> >>> since it
> >>> > > may not be necessary to define custom transaction boundaries in
> your
> >>> > > connector at all.
> >>> > >
> >>> > > Cheers,
> >>> > >
> >>> > > Chris
> >>> > >
> >>> > >
> >>> > >
> >>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY <nittybe...@gmail.com>
> >>> wrote:
> >>> > >
> >>> > > > Hi Team,
> >>> > > >
> >>> > > > Adding on top of this, I tried creating a TransactionContext
> >>> object and
> >>> > > > calling the commitTransaction and abortTranaction methods in
> source
> >>> > > > connectors.
> >>> > > > But the main problem I saw is that if there is any error while
> >>> parsing
> >>> > > the
> >>> > > > record, connect is calling an abort but we have a use case to
> call
> >>> > commit
> >>> > > > in some cases. Is it a valid use case in terms of kafka connect?
> >>> > > >
> >>> > > > Another Question - Should I use a transactional producer instead
> >>> > > > creating an object of TransactionContext? Below is the connector
> >>> > > > configuration I am using.
> >>> > > >
> >>> > > >   exactly.once.support: "required"
> >>> > > >   transaction.boundary: "connector"
> >>> > > >
> >>> > > > Could you please help me here?
> >>> > > >
> >>> > > > Thanks,
> >>> > > > Nitty
> >>> > > >
> >>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY <
> nittybe...@gmail.com>
> >>> > > wrote:
> >>> > > >
> >>> > > > > Hi Team,
> >>> > > > > I am trying to implement exactly once behavior in our source
> >>> > connector.
> >>> > > > Is
> >>> > > > > there any sample source connector implementation available to
> >>> have a
> >>> > > look
> >>> > > > > at?
> >>> > > > > Regards,
> >>> > > > > Nitty
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
>

Reply via email to