Hi Nitty,

Thanks for the code examples and the detailed explanations, this is really
helpful!

> Say if I have a file with 5 records and batch size is 2, and in my 3rd
batch I have one error record then in that batch, I dont have a valid
record to call commit or abort. But I want to commit all the previous
batches that were successfully parsed. How do I do that?

An important thing to keep in mind with the TransactionContext API is that
all records that a task returns from SourceTask::poll are implicitly
included in a transaction. Invoking SourceTaskContext::transactionContext
doesn't alter this or cause transactions to start being used; everything is
already in a transaction, and the Connect runtime automatically begins
transactions for any records it sees from the task if it hasn't already
begun one. It's also valid to return a null or empty list of records from
SourceTask::poll. So in this case, you can invoke
transactionContext.commitTransaction() (the no-args variant) and return an
empty batch from SourceTask::poll, which will cause the transaction
containing the 4 valid records that were returned in the last 2 batches to
be committed.

FWIW, I would be a little cautious about this approach. Many times it's
better to fail fast on invalid data; it might be worth it to at least allow
users to configure whether the connector fails on invalid data, or silently
skips over it (which is what happens when transactions are aborted).

> Why is abort not working without adding the last record to the list?

Is it possible that there's a case where an abort is being requested while
the current transaction is empty (i.e., the task hasn't returned any
records from SourceTask::poll since the last transaction was
committed/aborted)? I think this may be a bug in the Connect framework
where we don't check to see if a transaction is already open when a task
requests that a transaction be aborted, which can cause tasks to fail (see
https://issues.apache.org/jira/browse/KAFKA-14799 for more details).

Cheers,

Chris


On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY <nittybe...@gmail.com> wrote:

> Hi Chris,
>
> I am not sure if you are able to see the images I shared with you .
> Copying the code snippet below,
>
>  if (expectedRecordCount >= 0) {
>             int missingCount = expectedRecordCount - (int) this.
> recordOffset() - 1;
>             if (missingCount > 0) {
>               if (transactionContext != null) {
>                 isMissedRecords = true;
>               } else {
>                 throw new DataException(String.format("Missing %d records
> (expecting %d, actual %d)", missingCount, expectedRecordCount, this.
> recordOffset()));
>               }
>             } else if (missingCount < 0) {
>               if (transactionContext != null) {
>                 isMissedRecords = true;
>               } else {
>                 throw new DataException(String.format("Too many records
> (expecting %d, actual %d)", expectedRecordCount, this.recordOffset()));
>               }
>             }
>           }
>           addLastRecord(records, null, value);
>         }
>
>
>
>  //asn1 or binary abort
>         if((config.parseErrorThreshold != null && parseErrorCount >=
> config.parseErrorThreshold
>         && lastbatch && transactionContext != null) || (isMissedRecords
> && transactionContext != null && lastbatch)) {
>           log.info("Transaction is aborting");
>             log.info("records = {}", records);
>             if (!records.isEmpty()) {
>               log.info("with record");
>               transactionContext.abortTransaction(records.get(records.size
> ()-1));
>             } else {
>               log.info("without record");
>               transactionContext.abortTransaction();
>             }
>
> Thanks,
> Nitty
>
> On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY <nittybe...@gmail.com> wrote:
>
>> Hi Chris,
>> Sorry for the typo in my previous email.
>>
>> Regarding the point 2,* the task returns a batch of records from
>> SourceTask::poll (and, if using*
>>
>>
>> *the per-record API provided by the TransactionContext class, includes
>> atleast one record that should trigger a transaction commit/abort in
>> thatbatch)*
>> What if I am using the API without passing a record? We have 2 types of
>> use cases, one where on encountering an error record, we want to commit
>> previous successful batches and disregard the failed record and upcoming
>> batches. In this case we created the transactionContext just before reading
>> the file (file is our transaction boundary).Say if I have a file with 5
>> records and batch size is 2, and in my 3rd batch I have one error record
>> then in that batch, I dont have a valid record to call commit or abort. But
>> I want to commit all the previous batches that were successfully parsed.
>> How do I do that?
>>
>> Second use case is where I want to abort a transaction if the record
>> count doesn't match.
>> Code Snippet :
>> [image: image.png]
>> There are no error records in this case. If you see I added the condition
>> of transactionContext check to implement exactly once, without
>> transaction it was just throwing the exception without calling the
>> addLastRecord() method and in the catch block it just logs the message and
>> return the list of records without the last record to poll().To make it
>> work, I called the method addLastRecord() in this case, so it is not
>> throwing the exception and list has last record as well. Then I called the
>> abort, everything got aborted. Why is abort not working without adding the
>> last record to the list?
>> [image: image.png]
>>
>> Code to call abort.
>>
>>
>>
>>
>> Thanks,
>> Nitty
>>
>> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton <chr...@aiven.io.invalid>
>> wrote:
>>
>>> Hi Nitty,
>>>
>>> I'm a little confused about what you mean by this part:
>>>
>>> > transaction is not getting completed because it is not commiting the
>>> transaction offest.
>>>
>>> The only conditions required for a transaction to be completed when a
>>> connector is defining its own transaction boundaries are:
>>>
>>> 1. The task requests a transaction commit/abort from the
>>> TransactionContext
>>> 2. The task returns a batch of records from SourceTask::poll (and, if
>>> using
>>> the per-record API provided by the TransactionContext class, includes at
>>> least one record that should trigger a transaction commit/abort in that
>>> batch)
>>>
>>> The Connect runtime should automatically commit source offsets to Kafka
>>> whenever a transaction is completed, either by commit or abort. This is
>>> because transactions should only be aborted for data that should never be
>>> re-read by the connector; if there is a validation error that should be
>>> handled by reconfiguring the connector, then the task should throw an
>>> exception instead of aborting the transaction.
>>>
>>> If possible, do you think you could provide a brief code snippet
>>> illustrating what your task is doing that's causing issues?
>>>
>>> Cheers,
>>>
>>> Chris (not Chrise 🙂)
>>>
>>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY <nittybe...@gmail.com>
>>> wrote:
>>>
>>> > Hi Chrise,
>>> >
>>> > Thanks for sharing the details.
>>> >
>>> > Regarding the use case, For Asn1 source connector we have a use case to
>>> > validate number of records in the file with the number of records in
>>> the
>>> > header. So currently, if validation fails we are not sending the last
>>> > record to the topic. But after introducing exactly once with connector
>>> > transaction boundary, I can see that if I call an abort when the
>>> validation
>>> > fails, transaction is not getting completed because it is not
>>> commiting the
>>> > transaction offest. I saw that transaction state changed to
>>> CompleteAbort.
>>> > So for my next transaction I am getting InvalidProducerEpochException
>>> and
>>> > then task stopped after that. I tried calling the abort after sending
>>> last
>>> > record to the topic then transaction getting completed.
>>> >
>>> > I dont know if I am doing anything wrong here.
>>> >
>>> > Please advise.
>>> > Thanks,
>>> > Nitty
>>> >
>>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton <chr...@aiven.io.invalid
>>> >
>>> > wrote:
>>> >
>>> > > Hi Nitty,
>>> > >
>>> > > We've recently added some documentation on implementing exactly-once
>>> > source
>>> > > connectors here:
>>> > >
>>> >
>>> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
>>> > > .
>>> > > To quote a relevant passage from those docs:
>>> > >
>>> > > > In order for a source connector to take advantage of this support,
>>> it
>>> > > must be able to provide meaningful source offsets for each record
>>> that it
>>> > > emits, and resume consumption from the external system at the exact
>>> > > position corresponding to any of those offsets without dropping or
>>> > > duplicating messages.
>>> > >
>>> > > So, as long as your source connector is able to use the Kafka Connect
>>> > > framework's offsets API correctly, it shouldn't be necessary to make
>>> any
>>> > > other code changes to the connector.
>>> > >
>>> > > To enable exactly-once support for source connectors on your Connect
>>> > > cluster, see the docs section here:
>>> > > https://kafka.apache.org/documentation/#connect_exactlyoncesource
>>> > >
>>> > > With regard to transactions, a transactional producer is always
>>> created
>>> > > automatically for your connector by the Connect runtime when
>>> exactly-once
>>> > > support is enabled on the worker. The only reason to set
>>> > > "transaction.boundary" to "connector" is if your connector would
>>> like to
>>> > > explicitly define its own transaction boundaries. In this case, it
>>> sounds
>>> > > like may be what you want; I just want to make sure to call out that
>>> in
>>> > > either case, you should not be directly instantiating a producer in
>>> your
>>> > > connector code, but let the Kafka Connect runtime do that for you,
>>> and
>>> > just
>>> > > worry about returning the right records from SourceTask::poll (and
>>> > possibly
>>> > > defining custom transactions using the TransactionContext API).
>>> > >
>>> > > With respect to your question about committing or aborting in certain
>>> > > circumstances, it'd be useful to know more about your use case,
>>> since it
>>> > > may not be necessary to define custom transaction boundaries in your
>>> > > connector at all.
>>> > >
>>> > > Cheers,
>>> > >
>>> > > Chris
>>> > >
>>> > >
>>> > >
>>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY <nittybe...@gmail.com>
>>> wrote:
>>> > >
>>> > > > Hi Team,
>>> > > >
>>> > > > Adding on top of this, I tried creating a TransactionContext
>>> object and
>>> > > > calling the commitTransaction and abortTranaction methods in source
>>> > > > connectors.
>>> > > > But the main problem I saw is that if there is any error while
>>> parsing
>>> > > the
>>> > > > record, connect is calling an abort but we have a use case to call
>>> > commit
>>> > > > in some cases. Is it a valid use case in terms of kafka connect?
>>> > > >
>>> > > > Another Question - Should I use a transactional producer instead
>>> > > > creating an object of TransactionContext? Below is the connector
>>> > > > configuration I am using.
>>> > > >
>>> > > >   exactly.once.support: "required"
>>> > > >   transaction.boundary: "connector"
>>> > > >
>>> > > > Could you please help me here?
>>> > > >
>>> > > > Thanks,
>>> > > > Nitty
>>> > > >
>>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY <nittybe...@gmail.com>
>>> > > wrote:
>>> > > >
>>> > > > > Hi Team,
>>> > > > > I am trying to implement exactly once behavior in our source
>>> > connector.
>>> > > > Is
>>> > > > > there any sample source connector implementation available to
>>> have a
>>> > > look
>>> > > > > at?
>>> > > > > Regards,
>>> > > > > Nitty
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>

Reply via email to