Hi Nitty, Thanks again for all the details here, especially the log messages.
> The below mentioned issue is happening for Json connector only. Is there any difference with asn1,binary,csv and json connector? Can you clarify if the difference here is in the Connector/Task classens, or if it's in the key/value converters that are configured for the connector? The key/value converters are configured using the "key.converter" and "value.converter" property and, if problems arise with them, the task will fail and, if it has a non-empty ongoing transaction, that transaction will be automatically aborted since we close the task's Kafka producer when it fails (or shuts down gracefully). With regards to these log messages: > org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one. It looks like your tasks aren't shutting down gracefully in time, which causes them to be fenced out by the Connect framework later on. Do you see messages like "Graceful stop of task <TASK ID HERE> failed" in the logs for your Connect worker? Cheers, Chris On Mon, Mar 13, 2023 at 10:58 AM NITTY BENNY <nittybe...@gmail.com> wrote: > Hi Chris, > > As you said, the below message is coming when I call an abort if there is > an invalid record, then for the next transaction I can see the below > message and then the connector will be stopped. > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] Aborting > transaction for batch as requested by connector > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > [task-thread-json-sftp-source-connector-0] > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] [Producer > clientId=connector-producer-json-sftp-source-connector-0, > transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting > incomplete transaction (org.apache.kafka.clients.producer.KafkaProducer) > [task-thread-json-sftp-source-connector-0] > > The issue with InvalidProducerEpoch is happening when I call the commit if > there is an invalid record, and in the next transaction I am getting > InvalidProducerEpoch Exception and the messages are copied in the previous > email. I don't know if this will also be fixed by your bug fix.I am using > kafka 3.3.1 version as of now. > > Thanks, > Nitty > > > On Mon, Mar 13, 2023 at 10:47 AM NITTY BENNY <nittybe...@gmail.com> wrote: > > > Hi Chris, > > > > The below mentioned issue is happening for Json connector only. Is there > > any difference with asn1,binary,csv and json connector? > > > > Thanks, > > Nitty > > > > On Mon, Mar 13, 2023 at 9:16 AM NITTY BENNY <nittybe...@gmail.com> > wrote: > > > >> Hi Chris, > >> > >> Sorry Chris, I am not able to reproduce the above issue. > >> > >> I want to share with you one more use case I found. > >> The use case is in the first batch it returns 2 valid records and then > in > >> the next batch it is an invalid record.Below is the transaction_state > >> topic, when I call a commit while processing an invalid record. > >> > >> > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*, > >> pendingState=None, topicPartitions=HashSet(streams-input-2), > >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834) > >> > >> then after some time I saw the below states as well, > >> > >> > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000, > state=*PrepareAbort*, > >> pendingState=None, topicPartitions=HashSet(streams-input-2), > >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526119) > >> > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000, > state=*CompleteAbort*, > >> pendingState=None, topicPartitions=HashSet(), > >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620526121) > >> > >> Later for the next transaction, when it returns the first batch below is > >> the logs I can see. > >> > >> Transiting to abortable error state due to > >> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer > >> attempted to produce with an old epoch. > >> (org.apache.kafka.clients.producer.internals.TransactionManager) > >> [kafka-producer-network-thread | > >> connector-producer-json-sftp-source-connector-0] > >> 2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0] > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to > send > >> record to streams-input: > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) > >> [kafka-producer-network-thread | > >> connector-producer-json-sftp-source-connector-0] > >> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer > >> attempted to produce with an old epoch. > >> 2023-03-12 11:32:45,222 INFO [json-sftp-source-connector|task-0] > >> [Producer clientId=connector-producer-json-sftp-source-connector-0, > >> transactionalId=connect-cluster-json-sftp-source-connector-0] > Transiting to > >> fatal error state due to > >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer > >> producer with the same transactionalId which fences the current one. > >> (org.apache.kafka.clients.producer.internals.TransactionManager) > >> [kafka-producer-network-thread | > >> connector-producer-json-sftp-source-connector-0] > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] > >> [Producer clientId=connector-producer-json-sftp-source-connector-0, > >> transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting > >> producer batches due to fatal error > >> (org.apache.kafka.clients.producer.internals.Sender) > >> [kafka-producer-network-thread | > >> connector-producer-json-sftp-source-connector-0] > >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer > >> producer with the same transactionalId which fences the current one. > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to > >> flush offsets to storage: > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > >> [kafka-producer-network-thread | > >> connector-producer-json-sftp-source-connector-0] > >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer > >> producer with the same transactionalId which fences the current one. > >> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0] > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed to > send > >> record to streams-input: > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) > >> [kafka-producer-network-thread | > >> connector-producer-json-sftp-source-connector-0] > >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer > >> producer with the same transactionalId which fences the current one. > >> 2023-03-12 11:32:45,222 ERROR > [json-sftp-source-connector|task-0|offsets] > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed to > >> commit producer transaction > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) > >> [task-thread-json-sftp-source-connector-0] > >> org.apache.kafka.common.errors.ProducerFencedException: There is a newer > >> producer with the same transactionalId which fences the current one. > >> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0] > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task threw > an > >> uncaught and unrecoverable exception. Task is being killed and will not > >> recover until manually restarted > >> (org.apache.kafka.connect.runtime.WorkerTask) > >> [task-thread-json-sftp-source-connector-0] > >> > >> Do you know why it is showing an abort state even if I call commit? > >> > >> I tested one more scenario, When I call the commit I saw the below > >> > connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*, > >> pendingState=None, topicPartitions=HashSet(streams-input-2), > >> txnStartTimestamp=1678620463834, txnLastUpdateTimestamp=1678620463834) > >> Then, before changing the states to Abort, I dropped the next file then > I > >> dont see any issues. Previous transaction > >> as well as the current transaction are committed. > >> > >> Thank you for your support. > >> > >> Thanks, > >> Nitty > >> > >> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton <chr...@aiven.io.invalid> > >> wrote: > >> > >>> Hi Nitty, > >>> > >>> > I called commitTransaction when I reach the first error record, but > >>> commit is not happening for me. Kafka connect tries to abort the > >>> transaction automatically > >>> > >>> This is really interesting--are you certain that your task never > invoked > >>> TransactionContext::abortTransaction in this case? I'm looking over the > >>> code base and it seems fairly clear that the only thing that could > >>> trigger > >>> a call to KafkaProducer::abortTransaction is a request by the task to > >>> abort > >>> a transaction (either for a next batch, or for a specific record). It > may > >>> help to run the connector in a debugger and/or look for "Aborting > >>> transaction for batch as requested by connector" or "Aborting > transaction > >>> for record on topic <TOPIC NAME HERE> as requested by connector" log > >>> messages (which will be emitted at INFO level by > >>> the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask class > if > >>> the task is requesting an abort). > >>> > >>> Regardless, I'll work on a fix for the bug with aborting empty > >>> transactions. Thanks for helping uncover that one! > >>> > >>> Cheers, > >>> > >>> Chris > >>> > >>> On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY <nittybe...@gmail.com> > wrote: > >>> > >>> > Hi Chris, > >>> > > >>> > We have a use case to commit previous successful records and stop the > >>> > processing of the current file and move on with the next file. To > >>> achieve > >>> > that I called commitTransaction when I reach the first error record, > >>> but > >>> > commit is not happening for me. Kafka connect tries to abort the > >>> > transaction automatically, I checked the _transaction_state topic and > >>> > states marked as PrepareAbort and CompleteAbort. Do you know why > kafka > >>> > connect automatically invokes abort instead of the implicit commit I > >>> > called? > >>> > Then as a result, when I tries to parse the next file - say ABC, I > saw > >>> the > >>> > logs "Aborting incomplete transaction" and ERROR: "Failed to sent > >>> record to > >>> > topic", and we lost the first batch of records from the current > >>> transaction > >>> > in the file ABC. > >>> > > >>> > Is it possible that there's a case where an abort is being requested > >>> while > >>> > the current transaction is empty (i.e., the task hasn't returned any > >>> > records from SourceTask::poll since the last transaction was > >>> > committed/aborted)? --- Yes, that case is possible for us. There is a > >>> case > >>> > where the first record itself an error record. > >>> > > >>> > Thanks, > >>> > Nitty > >>> > > >>> > On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton <chr...@aiven.io.invalid > > > >>> > wrote: > >>> > > >>> > > Hi Nitty, > >>> > > > >>> > > Thanks for the code examples and the detailed explanations, this is > >>> > really > >>> > > helpful! > >>> > > > >>> > > > Say if I have a file with 5 records and batch size is 2, and in > my > >>> 3rd > >>> > > batch I have one error record then in that batch, I dont have a > valid > >>> > > record to call commit or abort. But I want to commit all the > previous > >>> > > batches that were successfully parsed. How do I do that? > >>> > > > >>> > > An important thing to keep in mind with the TransactionContext API > is > >>> > that > >>> > > all records that a task returns from SourceTask::poll are > implicitly > >>> > > included in a transaction. Invoking > >>> SourceTaskContext::transactionContext > >>> > > doesn't alter this or cause transactions to start being used; > >>> everything > >>> > is > >>> > > already in a transaction, and the Connect runtime automatically > >>> begins > >>> > > transactions for any records it sees from the task if it hasn't > >>> already > >>> > > begun one. It's also valid to return a null or empty list of > records > >>> from > >>> > > SourceTask::poll. So in this case, you can invoke > >>> > > transactionContext.commitTransaction() (the no-args variant) and > >>> return > >>> > an > >>> > > empty batch from SourceTask::poll, which will cause the transaction > >>> > > containing the 4 valid records that were returned in the last 2 > >>> batches > >>> > to > >>> > > be committed. > >>> > > > >>> > > FWIW, I would be a little cautious about this approach. Many times > >>> it's > >>> > > better to fail fast on invalid data; it might be worth it to at > least > >>> > allow > >>> > > users to configure whether the connector fails on invalid data, or > >>> > silently > >>> > > skips over it (which is what happens when transactions are > aborted). > >>> > > > >>> > > > Why is abort not working without adding the last record to the > >>> list? > >>> > > > >>> > > Is it possible that there's a case where an abort is being > requested > >>> > while > >>> > > the current transaction is empty (i.e., the task hasn't returned > any > >>> > > records from SourceTask::poll since the last transaction was > >>> > > committed/aborted)? I think this may be a bug in the Connect > >>> framework > >>> > > where we don't check to see if a transaction is already open when a > >>> task > >>> > > requests that a transaction be aborted, which can cause tasks to > fail > >>> > (see > >>> > > https://issues.apache.org/jira/browse/KAFKA-14799 for more > details). > >>> > > > >>> > > Cheers, > >>> > > > >>> > > Chris > >>> > > > >>> > > > >>> > > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY <nittybe...@gmail.com> > >>> wrote: > >>> > > > >>> > > > Hi Chris, > >>> > > > > >>> > > > I am not sure if you are able to see the images I shared with > you . > >>> > > > Copying the code snippet below, > >>> > > > > >>> > > > if (expectedRecordCount >= 0) { > >>> > > > int missingCount = expectedRecordCount - (int) this. > >>> > > > recordOffset() - 1; > >>> > > > if (missingCount > 0) { > >>> > > > if (transactionContext != null) { > >>> > > > isMissedRecords = true; > >>> > > > } else { > >>> > > > throw new DataException(String.format("Missing %d > >>> > records > >>> > > > (expecting %d, actual %d)", missingCount, expectedRecordCount, > >>> this. > >>> > > > recordOffset())); > >>> > > > } > >>> > > > } else if (missingCount < 0) { > >>> > > > if (transactionContext != null) { > >>> > > > isMissedRecords = true; > >>> > > > } else { > >>> > > > throw new DataException(String.format("Too many > >>> records > >>> > > > (expecting %d, actual %d)", expectedRecordCount, > >>> this.recordOffset())); > >>> > > > } > >>> > > > } > >>> > > > } > >>> > > > addLastRecord(records, null, value); > >>> > > > } > >>> > > > > >>> > > > > >>> > > > > >>> > > > //asn1 or binary abort > >>> > > > if((config.parseErrorThreshold != null && parseErrorCount > >>> >= > >>> > > > config.parseErrorThreshold > >>> > > > && lastbatch && transactionContext != null) || > >>> (isMissedRecords > >>> > > > && transactionContext != null && lastbatch)) { > >>> > > > log.info("Transaction is aborting"); > >>> > > > log.info("records = {}", records); > >>> > > > if (!records.isEmpty()) { > >>> > > > log.info("with record"); > >>> > > > > >>> > > transactionContext.abortTransaction(records.get(records.size > >>> > > > ()-1)); > >>> > > > } else { > >>> > > > log.info("without record"); > >>> > > > transactionContext.abortTransaction(); > >>> > > > } > >>> > > > > >>> > > > Thanks, > >>> > > > Nitty > >>> > > > > >>> > > > On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY < > nittybe...@gmail.com> > >>> > > wrote: > >>> > > > > >>> > > >> Hi Chris, > >>> > > >> Sorry for the typo in my previous email. > >>> > > >> > >>> > > >> Regarding the point 2,* the task returns a batch of records from > >>> > > >> SourceTask::poll (and, if using* > >>> > > >> > >>> > > >> > >>> > > >> *the per-record API provided by the TransactionContext class, > >>> includes > >>> > > >> atleast one record that should trigger a transaction > commit/abort > >>> in > >>> > > >> thatbatch)* > >>> > > >> What if I am using the API without passing a record? We have 2 > >>> types > >>> > of > >>> > > >> use cases, one where on encountering an error record, we want to > >>> > commit > >>> > > >> previous successful batches and disregard the failed record and > >>> > upcoming > >>> > > >> batches. In this case we created the transactionContext just > >>> before > >>> > > reading > >>> > > >> the file (file is our transaction boundary).Say if I have a file > >>> with > >>> > 5 > >>> > > >> records and batch size is 2, and in my 3rd batch I have one > error > >>> > record > >>> > > >> then in that batch, I dont have a valid record to call commit or > >>> > abort. > >>> > > But > >>> > > >> I want to commit all the previous batches that were successfully > >>> > parsed. > >>> > > >> How do I do that? > >>> > > >> > >>> > > >> Second use case is where I want to abort a transaction if the > >>> record > >>> > > >> count doesn't match. > >>> > > >> Code Snippet : > >>> > > >> [image: image.png] > >>> > > >> There are no error records in this case. If you see I added the > >>> > > condition > >>> > > >> of transactionContext check to implement exactly once, without > >>> > > >> transaction it was just throwing the exception without calling > the > >>> > > >> addLastRecord() method and in the catch block it just logs the > >>> message > >>> > > and > >>> > > >> return the list of records without the last record to poll().To > >>> make > >>> > it > >>> > > >> work, I called the method addLastRecord() in this case, so it is > >>> not > >>> > > >> throwing the exception and list has last record as well. Then I > >>> called > >>> > > the > >>> > > >> abort, everything got aborted. Why is abort not working without > >>> adding > >>> > > the > >>> > > >> last record to the list? > >>> > > >> [image: image.png] > >>> > > >> > >>> > > >> Code to call abort. > >>> > > >> > >>> > > >> > >>> > > >> > >>> > > >> > >>> > > >> Thanks, > >>> > > >> Nitty > >>> > > >> > >>> > > >> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton > >>> <chr...@aiven.io.invalid > >>> > > > >>> > > >> wrote: > >>> > > >> > >>> > > >>> Hi Nitty, > >>> > > >>> > >>> > > >>> I'm a little confused about what you mean by this part: > >>> > > >>> > >>> > > >>> > transaction is not getting completed because it is not > >>> commiting > >>> > the > >>> > > >>> transaction offest. > >>> > > >>> > >>> > > >>> The only conditions required for a transaction to be completed > >>> when a > >>> > > >>> connector is defining its own transaction boundaries are: > >>> > > >>> > >>> > > >>> 1. The task requests a transaction commit/abort from the > >>> > > >>> TransactionContext > >>> > > >>> 2. The task returns a batch of records from SourceTask::poll > >>> (and, if > >>> > > >>> using > >>> > > >>> the per-record API provided by the TransactionContext class, > >>> includes > >>> > > at > >>> > > >>> least one record that should trigger a transaction commit/abort > >>> in > >>> > that > >>> > > >>> batch) > >>> > > >>> > >>> > > >>> The Connect runtime should automatically commit source offsets > to > >>> > Kafka > >>> > > >>> whenever a transaction is completed, either by commit or abort. > >>> This > >>> > is > >>> > > >>> because transactions should only be aborted for data that > should > >>> > never > >>> > > be > >>> > > >>> re-read by the connector; if there is a validation error that > >>> should > >>> > be > >>> > > >>> handled by reconfiguring the connector, then the task should > >>> throw an > >>> > > >>> exception instead of aborting the transaction. > >>> > > >>> > >>> > > >>> If possible, do you think you could provide a brief code > snippet > >>> > > >>> illustrating what your task is doing that's causing issues? > >>> > > >>> > >>> > > >>> Cheers, > >>> > > >>> > >>> > > >>> Chris (not Chrise 🙂) > >>> > > >>> > >>> > > >>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY < > >>> nittybe...@gmail.com> > >>> > > >>> wrote: > >>> > > >>> > >>> > > >>> > Hi Chrise, > >>> > > >>> > > >>> > > >>> > Thanks for sharing the details. > >>> > > >>> > > >>> > > >>> > Regarding the use case, For Asn1 source connector we have a > use > >>> > case > >>> > > to > >>> > > >>> > validate number of records in the file with the number of > >>> records > >>> > in > >>> > > >>> the > >>> > > >>> > header. So currently, if validation fails we are not sending > >>> the > >>> > last > >>> > > >>> > record to the topic. But after introducing exactly once with > >>> > > connector > >>> > > >>> > transaction boundary, I can see that if I call an abort when > >>> the > >>> > > >>> validation > >>> > > >>> > fails, transaction is not getting completed because it is not > >>> > > >>> commiting the > >>> > > >>> > transaction offest. I saw that transaction state changed to > >>> > > >>> CompleteAbort. > >>> > > >>> > So for my next transaction I am getting > >>> > InvalidProducerEpochException > >>> > > >>> and > >>> > > >>> > then task stopped after that. I tried calling the abort after > >>> > sending > >>> > > >>> last > >>> > > >>> > record to the topic then transaction getting completed. > >>> > > >>> > > >>> > > >>> > I dont know if I am doing anything wrong here. > >>> > > >>> > > >>> > > >>> > Please advise. > >>> > > >>> > Thanks, > >>> > > >>> > Nitty > >>> > > >>> > > >>> > > >>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton > >>> > > <chr...@aiven.io.invalid > >>> > > >>> > > >>> > > >>> > wrote: > >>> > > >>> > > >>> > > >>> > > Hi Nitty, > >>> > > >>> > > > >>> > > >>> > > We've recently added some documentation on implementing > >>> > > exactly-once > >>> > > >>> > source > >>> > > >>> > > connectors here: > >>> > > >>> > > > >>> > > >>> > > >>> > > >>> > >>> > > > >>> > > >>> > https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors > >>> > > >>> > > . > >>> > > >>> > > To quote a relevant passage from those docs: > >>> > > >>> > > > >>> > > >>> > > > In order for a source connector to take advantage of this > >>> > > support, > >>> > > >>> it > >>> > > >>> > > must be able to provide meaningful source offsets for each > >>> record > >>> > > >>> that it > >>> > > >>> > > emits, and resume consumption from the external system at > the > >>> > exact > >>> > > >>> > > position corresponding to any of those offsets without > >>> dropping > >>> > or > >>> > > >>> > > duplicating messages. > >>> > > >>> > > > >>> > > >>> > > So, as long as your source connector is able to use the > Kafka > >>> > > Connect > >>> > > >>> > > framework's offsets API correctly, it shouldn't be > necessary > >>> to > >>> > > make > >>> > > >>> any > >>> > > >>> > > other code changes to the connector. > >>> > > >>> > > > >>> > > >>> > > To enable exactly-once support for source connectors on > your > >>> > > Connect > >>> > > >>> > > cluster, see the docs section here: > >>> > > >>> > > > >>> > https://kafka.apache.org/documentation/#connect_exactlyoncesource > >>> > > >>> > > > >>> > > >>> > > With regard to transactions, a transactional producer is > >>> always > >>> > > >>> created > >>> > > >>> > > automatically for your connector by the Connect runtime > when > >>> > > >>> exactly-once > >>> > > >>> > > support is enabled on the worker. The only reason to set > >>> > > >>> > > "transaction.boundary" to "connector" is if your connector > >>> would > >>> > > >>> like to > >>> > > >>> > > explicitly define its own transaction boundaries. In this > >>> case, > >>> > it > >>> > > >>> sounds > >>> > > >>> > > like may be what you want; I just want to make sure to call > >>> out > >>> > > that > >>> > > >>> in > >>> > > >>> > > either case, you should not be directly instantiating a > >>> producer > >>> > in > >>> > > >>> your > >>> > > >>> > > connector code, but let the Kafka Connect runtime do that > for > >>> > you, > >>> > > >>> and > >>> > > >>> > just > >>> > > >>> > > worry about returning the right records from > SourceTask::poll > >>> > (and > >>> > > >>> > possibly > >>> > > >>> > > defining custom transactions using the TransactionContext > >>> API). > >>> > > >>> > > > >>> > > >>> > > With respect to your question about committing or aborting > in > >>> > > certain > >>> > > >>> > > circumstances, it'd be useful to know more about your use > >>> case, > >>> > > >>> since it > >>> > > >>> > > may not be necessary to define custom transaction > boundaries > >>> in > >>> > > your > >>> > > >>> > > connector at all. > >>> > > >>> > > > >>> > > >>> > > Cheers, > >>> > > >>> > > > >>> > > >>> > > Chris > >>> > > >>> > > > >>> > > >>> > > > >>> > > >>> > > > >>> > > >>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY < > >>> nittybe...@gmail.com > >>> > > > >>> > > >>> wrote: > >>> > > >>> > > > >>> > > >>> > > > Hi Team, > >>> > > >>> > > > > >>> > > >>> > > > Adding on top of this, I tried creating a > >>> TransactionContext > >>> > > >>> object and > >>> > > >>> > > > calling the commitTransaction and abortTranaction methods > >>> in > >>> > > source > >>> > > >>> > > > connectors. > >>> > > >>> > > > But the main problem I saw is that if there is any error > >>> while > >>> > > >>> parsing > >>> > > >>> > > the > >>> > > >>> > > > record, connect is calling an abort but we have a use > case > >>> to > >>> > > call > >>> > > >>> > commit > >>> > > >>> > > > in some cases. Is it a valid use case in terms of kafka > >>> > connect? > >>> > > >>> > > > > >>> > > >>> > > > Another Question - Should I use a transactional producer > >>> > instead > >>> > > >>> > > > creating an object of TransactionContext? Below is the > >>> > connector > >>> > > >>> > > > configuration I am using. > >>> > > >>> > > > > >>> > > >>> > > > exactly.once.support: "required" > >>> > > >>> > > > transaction.boundary: "connector" > >>> > > >>> > > > > >>> > > >>> > > > Could you please help me here? > >>> > > >>> > > > > >>> > > >>> > > > Thanks, > >>> > > >>> > > > Nitty > >>> > > >>> > > > > >>> > > >>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY < > >>> > > nittybe...@gmail.com> > >>> > > >>> > > wrote: > >>> > > >>> > > > > >>> > > >>> > > > > Hi Team, > >>> > > >>> > > > > I am trying to implement exactly once behavior in our > >>> source > >>> > > >>> > connector. > >>> > > >>> > > > Is > >>> > > >>> > > > > there any sample source connector implementation > >>> available to > >>> > > >>> have a > >>> > > >>> > > look > >>> > > >>> > > > > at? > >>> > > >>> > > > > Regards, > >>> > > >>> > > > > Nitty > >>> > > >>> > > > > > >>> > > >>> > > > > >>> > > >>> > > > >>> > > >>> > > >>> > > >>> > >>> > > >> > >>> > > > >>> > > >>> > >> >