Hi Chris, We have a use case to commit previous successful records and stop the processing of the current file and move on with the next file. To achieve that I called commitTransaction when I reach the first error record, but commit is not happening for me. Kafka connect tries to abort the transaction automatically, I checked the _transaction_state topic and states marked as PrepareAbort and CompleteAbort. Do you know why kafka connect automatically invokes abort instead of the implicit commit I called? Then as a result, when I tries to parse the next file - say ABC, I saw the logs "Aborting incomplete transaction" and ERROR: "Failed to sent record to topic", and we lost the first batch of records from the current transaction in the file ABC.
Is it possible that there's a case where an abort is being requested while the current transaction is empty (i.e., the task hasn't returned any records from SourceTask::poll since the last transaction was committed/aborted)? --- Yes, that case is possible for us. There is a case where the first record itself an error record. Thanks, Nitty On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton <chr...@aiven.io.invalid> wrote: > Hi Nitty, > > Thanks for the code examples and the detailed explanations, this is really > helpful! > > > Say if I have a file with 5 records and batch size is 2, and in my 3rd > batch I have one error record then in that batch, I dont have a valid > record to call commit or abort. But I want to commit all the previous > batches that were successfully parsed. How do I do that? > > An important thing to keep in mind with the TransactionContext API is that > all records that a task returns from SourceTask::poll are implicitly > included in a transaction. Invoking SourceTaskContext::transactionContext > doesn't alter this or cause transactions to start being used; everything is > already in a transaction, and the Connect runtime automatically begins > transactions for any records it sees from the task if it hasn't already > begun one. It's also valid to return a null or empty list of records from > SourceTask::poll. So in this case, you can invoke > transactionContext.commitTransaction() (the no-args variant) and return an > empty batch from SourceTask::poll, which will cause the transaction > containing the 4 valid records that were returned in the last 2 batches to > be committed. > > FWIW, I would be a little cautious about this approach. Many times it's > better to fail fast on invalid data; it might be worth it to at least allow > users to configure whether the connector fails on invalid data, or silently > skips over it (which is what happens when transactions are aborted). > > > Why is abort not working without adding the last record to the list? > > Is it possible that there's a case where an abort is being requested while > the current transaction is empty (i.e., the task hasn't returned any > records from SourceTask::poll since the last transaction was > committed/aborted)? I think this may be a bug in the Connect framework > where we don't check to see if a transaction is already open when a task > requests that a transaction be aborted, which can cause tasks to fail (see > https://issues.apache.org/jira/browse/KAFKA-14799 for more details). > > Cheers, > > Chris > > > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY <nittybe...@gmail.com> wrote: > > > Hi Chris, > > > > I am not sure if you are able to see the images I shared with you . > > Copying the code snippet below, > > > > if (expectedRecordCount >= 0) { > > int missingCount = expectedRecordCount - (int) this. > > recordOffset() - 1; > > if (missingCount > 0) { > > if (transactionContext != null) { > > isMissedRecords = true; > > } else { > > throw new DataException(String.format("Missing %d records > > (expecting %d, actual %d)", missingCount, expectedRecordCount, this. > > recordOffset())); > > } > > } else if (missingCount < 0) { > > if (transactionContext != null) { > > isMissedRecords = true; > > } else { > > throw new DataException(String.format("Too many records > > (expecting %d, actual %d)", expectedRecordCount, this.recordOffset())); > > } > > } > > } > > addLastRecord(records, null, value); > > } > > > > > > > > //asn1 or binary abort > > if((config.parseErrorThreshold != null && parseErrorCount >= > > config.parseErrorThreshold > > && lastbatch && transactionContext != null) || (isMissedRecords > > && transactionContext != null && lastbatch)) { > > log.info("Transaction is aborting"); > > log.info("records = {}", records); > > if (!records.isEmpty()) { > > log.info("with record"); > > > transactionContext.abortTransaction(records.get(records.size > > ()-1)); > > } else { > > log.info("without record"); > > transactionContext.abortTransaction(); > > } > > > > Thanks, > > Nitty > > > > On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY <nittybe...@gmail.com> > wrote: > > > >> Hi Chris, > >> Sorry for the typo in my previous email. > >> > >> Regarding the point 2,* the task returns a batch of records from > >> SourceTask::poll (and, if using* > >> > >> > >> *the per-record API provided by the TransactionContext class, includes > >> atleast one record that should trigger a transaction commit/abort in > >> thatbatch)* > >> What if I am using the API without passing a record? We have 2 types of > >> use cases, one where on encountering an error record, we want to commit > >> previous successful batches and disregard the failed record and upcoming > >> batches. In this case we created the transactionContext just before > reading > >> the file (file is our transaction boundary).Say if I have a file with 5 > >> records and batch size is 2, and in my 3rd batch I have one error record > >> then in that batch, I dont have a valid record to call commit or abort. > But > >> I want to commit all the previous batches that were successfully parsed. > >> How do I do that? > >> > >> Second use case is where I want to abort a transaction if the record > >> count doesn't match. > >> Code Snippet : > >> [image: image.png] > >> There are no error records in this case. If you see I added the > condition > >> of transactionContext check to implement exactly once, without > >> transaction it was just throwing the exception without calling the > >> addLastRecord() method and in the catch block it just logs the message > and > >> return the list of records without the last record to poll().To make it > >> work, I called the method addLastRecord() in this case, so it is not > >> throwing the exception and list has last record as well. Then I called > the > >> abort, everything got aborted. Why is abort not working without adding > the > >> last record to the list? > >> [image: image.png] > >> > >> Code to call abort. > >> > >> > >> > >> > >> Thanks, > >> Nitty > >> > >> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton <chr...@aiven.io.invalid> > >> wrote: > >> > >>> Hi Nitty, > >>> > >>> I'm a little confused about what you mean by this part: > >>> > >>> > transaction is not getting completed because it is not commiting the > >>> transaction offest. > >>> > >>> The only conditions required for a transaction to be completed when a > >>> connector is defining its own transaction boundaries are: > >>> > >>> 1. The task requests a transaction commit/abort from the > >>> TransactionContext > >>> 2. The task returns a batch of records from SourceTask::poll (and, if > >>> using > >>> the per-record API provided by the TransactionContext class, includes > at > >>> least one record that should trigger a transaction commit/abort in that > >>> batch) > >>> > >>> The Connect runtime should automatically commit source offsets to Kafka > >>> whenever a transaction is completed, either by commit or abort. This is > >>> because transactions should only be aborted for data that should never > be > >>> re-read by the connector; if there is a validation error that should be > >>> handled by reconfiguring the connector, then the task should throw an > >>> exception instead of aborting the transaction. > >>> > >>> If possible, do you think you could provide a brief code snippet > >>> illustrating what your task is doing that's causing issues? > >>> > >>> Cheers, > >>> > >>> Chris (not Chrise 🙂) > >>> > >>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY <nittybe...@gmail.com> > >>> wrote: > >>> > >>> > Hi Chrise, > >>> > > >>> > Thanks for sharing the details. > >>> > > >>> > Regarding the use case, For Asn1 source connector we have a use case > to > >>> > validate number of records in the file with the number of records in > >>> the > >>> > header. So currently, if validation fails we are not sending the last > >>> > record to the topic. But after introducing exactly once with > connector > >>> > transaction boundary, I can see that if I call an abort when the > >>> validation > >>> > fails, transaction is not getting completed because it is not > >>> commiting the > >>> > transaction offest. I saw that transaction state changed to > >>> CompleteAbort. > >>> > So for my next transaction I am getting InvalidProducerEpochException > >>> and > >>> > then task stopped after that. I tried calling the abort after sending > >>> last > >>> > record to the topic then transaction getting completed. > >>> > > >>> > I dont know if I am doing anything wrong here. > >>> > > >>> > Please advise. > >>> > Thanks, > >>> > Nitty > >>> > > >>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton > <chr...@aiven.io.invalid > >>> > > >>> > wrote: > >>> > > >>> > > Hi Nitty, > >>> > > > >>> > > We've recently added some documentation on implementing > exactly-once > >>> > source > >>> > > connectors here: > >>> > > > >>> > > >>> > https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors > >>> > > . > >>> > > To quote a relevant passage from those docs: > >>> > > > >>> > > > In order for a source connector to take advantage of this > support, > >>> it > >>> > > must be able to provide meaningful source offsets for each record > >>> that it > >>> > > emits, and resume consumption from the external system at the exact > >>> > > position corresponding to any of those offsets without dropping or > >>> > > duplicating messages. > >>> > > > >>> > > So, as long as your source connector is able to use the Kafka > Connect > >>> > > framework's offsets API correctly, it shouldn't be necessary to > make > >>> any > >>> > > other code changes to the connector. > >>> > > > >>> > > To enable exactly-once support for source connectors on your > Connect > >>> > > cluster, see the docs section here: > >>> > > https://kafka.apache.org/documentation/#connect_exactlyoncesource > >>> > > > >>> > > With regard to transactions, a transactional producer is always > >>> created > >>> > > automatically for your connector by the Connect runtime when > >>> exactly-once > >>> > > support is enabled on the worker. The only reason to set > >>> > > "transaction.boundary" to "connector" is if your connector would > >>> like to > >>> > > explicitly define its own transaction boundaries. In this case, it > >>> sounds > >>> > > like may be what you want; I just want to make sure to call out > that > >>> in > >>> > > either case, you should not be directly instantiating a producer in > >>> your > >>> > > connector code, but let the Kafka Connect runtime do that for you, > >>> and > >>> > just > >>> > > worry about returning the right records from SourceTask::poll (and > >>> > possibly > >>> > > defining custom transactions using the TransactionContext API). > >>> > > > >>> > > With respect to your question about committing or aborting in > certain > >>> > > circumstances, it'd be useful to know more about your use case, > >>> since it > >>> > > may not be necessary to define custom transaction boundaries in > your > >>> > > connector at all. > >>> > > > >>> > > Cheers, > >>> > > > >>> > > Chris > >>> > > > >>> > > > >>> > > > >>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY <nittybe...@gmail.com> > >>> wrote: > >>> > > > >>> > > > Hi Team, > >>> > > > > >>> > > > Adding on top of this, I tried creating a TransactionContext > >>> object and > >>> > > > calling the commitTransaction and abortTranaction methods in > source > >>> > > > connectors. > >>> > > > But the main problem I saw is that if there is any error while > >>> parsing > >>> > > the > >>> > > > record, connect is calling an abort but we have a use case to > call > >>> > commit > >>> > > > in some cases. Is it a valid use case in terms of kafka connect? > >>> > > > > >>> > > > Another Question - Should I use a transactional producer instead > >>> > > > creating an object of TransactionContext? Below is the connector > >>> > > > configuration I am using. > >>> > > > > >>> > > > exactly.once.support: "required" > >>> > > > transaction.boundary: "connector" > >>> > > > > >>> > > > Could you please help me here? > >>> > > > > >>> > > > Thanks, > >>> > > > Nitty > >>> > > > > >>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY < > nittybe...@gmail.com> > >>> > > wrote: > >>> > > > > >>> > > > > Hi Team, > >>> > > > > I am trying to implement exactly once behavior in our source > >>> > connector. > >>> > > > Is > >>> > > > > there any sample source connector implementation available to > >>> have a > >>> > > look > >>> > > > > at? > >>> > > > > Regards, > >>> > > > > Nitty > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >> >