Hi Chris, Is there any possibility to have a call with you? This is actually blocking our delivery, I actually want to sort with this.
Thanks, Nitty On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY <nittybe...@gmail.com> wrote: > Hi Chris, > > I really don't understand why a graceful shutdown will happen during a > commit operation? Am I understanding something wrong here?. I see > this happens when I have a batch of 2 valid records and in the second > batch the record is invalid. In that case I want to commit the valid > records. So I called commit and sent an empty list for the current batch to > poll() and then when the next file comes in and poll sees new records, I > see InvalidProducerEpochException. > Please advise me. > > Thanks, > Nitty > > On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY <nittybe...@gmail.com> wrote: > >> Hi Chris, >> >> The difference is in the Task Classes, no difference for value/key >> convertors. >> >> I don’t see log messages for graceful shutdown. I am not clear on what >> you mean by shutting down the task. >> >> I called the commit operation for the successful records. Should I >> perform any other steps if I have an invalid record? >> Please advise. >> >> Thanks, >> Nitty >> >> On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton <chr...@aiven.io.invalid> >> wrote: >> >>> Hi Nitty, >>> >>> Thanks again for all the details here, especially the log messages. >>> >>> > The below mentioned issue is happening for Json connector only. Is >>> there >>> any difference with asn1,binary,csv and json connector? >>> >>> Can you clarify if the difference here is in the Connector/Task classens, >>> or if it's in the key/value converters that are configured for the >>> connector? The key/value converters are configured using the >>> "key.converter" and "value.converter" property and, if problems arise >>> with >>> them, the task will fail and, if it has a non-empty ongoing transaction, >>> that transaction will be automatically aborted since we close the task's >>> Kafka producer when it fails (or shuts down gracefully). >>> >>> With regards to these log messages: >>> >>> > org.apache.kafka.common.errors.ProducerFencedException: There is a >>> newer >>> producer with the same transactionalId which fences the current one. >>> >>> It looks like your tasks aren't shutting down gracefully in time, which >>> causes them to be fenced out by the Connect framework later on. Do you >>> see >>> messages like "Graceful stop of task <TASK ID HERE> failed" in the logs >>> for >>> your Connect worker? >>> >>> Cheers, >>> >>> Chris >>> >>> On Mon, Mar 13, 2023 at 10:58 AM NITTY BENNY <nittybe...@gmail.com> >>> wrote: >>> >>> > Hi Chris, >>> > >>> > As you said, the below message is coming when I call an abort if there >>> is >>> > an invalid record, then for the next transaction I can see the below >>> > message and then the connector will be stopped. >>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] >>> Aborting >>> > transaction for batch as requested by connector >>> > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) >>> > [task-thread-json-sftp-source-connector-0] >>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0] >>> [Producer >>> > clientId=connector-producer-json-sftp-source-connector-0, >>> > transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting >>> > incomplete transaction >>> (org.apache.kafka.clients.producer.KafkaProducer) >>> > [task-thread-json-sftp-source-connector-0] >>> > >>> > The issue with InvalidProducerEpoch is happening when I call the >>> commit if >>> > there is an invalid record, and in the next transaction I am getting >>> > InvalidProducerEpoch Exception and the messages are copied in the >>> previous >>> > email. I don't know if this will also be fixed by your bug fix.I am >>> using >>> > kafka 3.3.1 version as of now. >>> > >>> > Thanks, >>> > Nitty >>> > >>> > >>> > On Mon, Mar 13, 2023 at 10:47 AM NITTY BENNY <nittybe...@gmail.com> >>> wrote: >>> > >>> > > Hi Chris, >>> > > >>> > > The below mentioned issue is happening for Json connector only. Is >>> there >>> > > any difference with asn1,binary,csv and json connector? >>> > > >>> > > Thanks, >>> > > Nitty >>> > > >>> > > On Mon, Mar 13, 2023 at 9:16 AM NITTY BENNY <nittybe...@gmail.com> >>> > wrote: >>> > > >>> > >> Hi Chris, >>> > >> >>> > >> Sorry Chris, I am not able to reproduce the above issue. >>> > >> >>> > >> I want to share with you one more use case I found. >>> > >> The use case is in the first batch it returns 2 valid records and >>> then >>> > in >>> > >> the next batch it is an invalid record.Below is the >>> transaction_state >>> > >> topic, when I call a commit while processing an invalid record. >>> > >> >>> > >> >>> > >>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, >>> > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*, >>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2), >>> > >> txnStartTimestamp=1678620463834, >>> txnLastUpdateTimestamp=1678620463834) >>> > >> >>> > >> then after some time I saw the below states as well, >>> > >> >>> > >> >>> > >>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, >>> > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000, >>> > state=*PrepareAbort*, >>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2), >>> > >> txnStartTimestamp=1678620463834, >>> txnLastUpdateTimestamp=1678620526119) >>> > >> >>> > >>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, >>> > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000, >>> > state=*CompleteAbort*, >>> > >> pendingState=None, topicPartitions=HashSet(), >>> > >> txnStartTimestamp=1678620463834, >>> txnLastUpdateTimestamp=1678620526121) >>> > >> >>> > >> Later for the next transaction, when it returns the first batch >>> below is >>> > >> the logs I can see. >>> > >> >>> > >> Transiting to abortable error state due to >>> > >> org.apache.kafka.common.errors.InvalidProducerEpochException: >>> Producer >>> > >> attempted to produce with an old epoch. >>> > >> (org.apache.kafka.clients.producer.internals.TransactionManager) >>> > >> [kafka-producer-network-thread | >>> > >> connector-producer-json-sftp-source-connector-0] >>> > >> 2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0] >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed >>> to >>> > send >>> > >> record to streams-input: >>> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) >>> > >> [kafka-producer-network-thread | >>> > >> connector-producer-json-sftp-source-connector-0] >>> > >> org.apache.kafka.common.errors.InvalidProducerEpochException: >>> Producer >>> > >> attempted to produce with an old epoch. >>> > >> 2023-03-12 11:32:45,222 INFO [json-sftp-source-connector|task-0] >>> > >> [Producer clientId=connector-producer-json-sftp-source-connector-0, >>> > >> transactionalId=connect-cluster-json-sftp-source-connector-0] >>> > Transiting to >>> > >> fatal error state due to >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a >>> newer >>> > >> producer with the same transactionalId which fences the current one. >>> > >> (org.apache.kafka.clients.producer.internals.TransactionManager) >>> > >> [kafka-producer-network-thread | >>> > >> connector-producer-json-sftp-source-connector-0] >>> > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] >>> > >> [Producer clientId=connector-producer-json-sftp-source-connector-0, >>> > >> transactionalId=connect-cluster-json-sftp-source-connector-0] >>> Aborting >>> > >> producer batches due to fatal error >>> > >> (org.apache.kafka.clients.producer.internals.Sender) >>> > >> [kafka-producer-network-thread | >>> > >> connector-producer-json-sftp-source-connector-0] >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a >>> newer >>> > >> producer with the same transactionalId which fences the current one. >>> > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0] >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed >>> to >>> > >> flush offsets to storage: >>> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) >>> > >> [kafka-producer-network-thread | >>> > >> connector-producer-json-sftp-source-connector-0] >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a >>> newer >>> > >> producer with the same transactionalId which fences the current one. >>> > >> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0] >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed >>> to >>> > send >>> > >> record to streams-input: >>> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask) >>> > >> [kafka-producer-network-thread | >>> > >> connector-producer-json-sftp-source-connector-0] >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a >>> newer >>> > >> producer with the same transactionalId which fences the current one. >>> > >> 2023-03-12 11:32:45,222 ERROR >>> > [json-sftp-source-connector|task-0|offsets] >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed >>> to >>> > >> commit producer transaction >>> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask) >>> > >> [task-thread-json-sftp-source-connector-0] >>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a >>> newer >>> > >> producer with the same transactionalId which fences the current one. >>> > >> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0] >>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task >>> threw >>> > an >>> > >> uncaught and unrecoverable exception. Task is being killed and will >>> not >>> > >> recover until manually restarted >>> > >> (org.apache.kafka.connect.runtime.WorkerTask) >>> > >> [task-thread-json-sftp-source-connector-0] >>> > >> >>> > >> Do you know why it is showing an abort state even if I call commit? >>> > >> >>> > >> I tested one more scenario, When I call the commit I saw the below >>> > >> >>> > >>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0, >>> > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*, >>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2), >>> > >> txnStartTimestamp=1678620463834, >>> txnLastUpdateTimestamp=1678620463834) >>> > >> Then, before changing the states to Abort, I dropped the next file >>> then >>> > I >>> > >> dont see any issues. Previous transaction >>> > >> as well as the current transaction are committed. >>> > >> >>> > >> Thank you for your support. >>> > >> >>> > >> Thanks, >>> > >> Nitty >>> > >> >>> > >> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton >>> <chr...@aiven.io.invalid> >>> > >> wrote: >>> > >> >>> > >>> Hi Nitty, >>> > >>> >>> > >>> > I called commitTransaction when I reach the first error record, >>> but >>> > >>> commit is not happening for me. Kafka connect tries to abort the >>> > >>> transaction automatically >>> > >>> >>> > >>> This is really interesting--are you certain that your task never >>> > invoked >>> > >>> TransactionContext::abortTransaction in this case? I'm looking >>> over the >>> > >>> code base and it seems fairly clear that the only thing that could >>> > >>> trigger >>> > >>> a call to KafkaProducer::abortTransaction is a request by the task >>> to >>> > >>> abort >>> > >>> a transaction (either for a next batch, or for a specific record). >>> It >>> > may >>> > >>> help to run the connector in a debugger and/or look for "Aborting >>> > >>> transaction for batch as requested by connector" or "Aborting >>> > transaction >>> > >>> for record on topic <TOPIC NAME HERE> as requested by connector" >>> log >>> > >>> messages (which will be emitted at INFO level by >>> > >>> the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask >>> class >>> > if >>> > >>> the task is requesting an abort). >>> > >>> >>> > >>> Regardless, I'll work on a fix for the bug with aborting empty >>> > >>> transactions. Thanks for helping uncover that one! >>> > >>> >>> > >>> Cheers, >>> > >>> >>> > >>> Chris >>> > >>> >>> > >>> On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY <nittybe...@gmail.com> >>> > wrote: >>> > >>> >>> > >>> > Hi Chris, >>> > >>> > >>> > >>> > We have a use case to commit previous successful records and >>> stop the >>> > >>> > processing of the current file and move on with the next file. To >>> > >>> achieve >>> > >>> > that I called commitTransaction when I reach the first error >>> record, >>> > >>> but >>> > >>> > commit is not happening for me. Kafka connect tries to abort the >>> > >>> > transaction automatically, I checked the _transaction_state >>> topic and >>> > >>> > states marked as PrepareAbort and CompleteAbort. Do you know why >>> > kafka >>> > >>> > connect automatically invokes abort instead of the implicit >>> commit I >>> > >>> > called? >>> > >>> > Then as a result, when I tries to parse the next file - say ABC, >>> I >>> > saw >>> > >>> the >>> > >>> > logs "Aborting incomplete transaction" and ERROR: "Failed to sent >>> > >>> record to >>> > >>> > topic", and we lost the first batch of records from the current >>> > >>> transaction >>> > >>> > in the file ABC. >>> > >>> > >>> > >>> > Is it possible that there's a case where an abort is being >>> requested >>> > >>> while >>> > >>> > the current transaction is empty (i.e., the task hasn't returned >>> any >>> > >>> > records from SourceTask::poll since the last transaction was >>> > >>> > committed/aborted)? --- Yes, that case is possible for us. There >>> is a >>> > >>> case >>> > >>> > where the first record itself an error record. >>> > >>> > >>> > >>> > Thanks, >>> > >>> > Nitty >>> > >>> > >>> > >>> > On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton >>> <chr...@aiven.io.invalid >>> > > >>> > >>> > wrote: >>> > >>> > >>> > >>> > > Hi Nitty, >>> > >>> > > >>> > >>> > > Thanks for the code examples and the detailed explanations, >>> this is >>> > >>> > really >>> > >>> > > helpful! >>> > >>> > > >>> > >>> > > > Say if I have a file with 5 records and batch size is 2, and >>> in >>> > my >>> > >>> 3rd >>> > >>> > > batch I have one error record then in that batch, I dont have a >>> > valid >>> > >>> > > record to call commit or abort. But I want to commit all the >>> > previous >>> > >>> > > batches that were successfully parsed. How do I do that? >>> > >>> > > >>> > >>> > > An important thing to keep in mind with the TransactionContext >>> API >>> > is >>> > >>> > that >>> > >>> > > all records that a task returns from SourceTask::poll are >>> > implicitly >>> > >>> > > included in a transaction. Invoking >>> > >>> SourceTaskContext::transactionContext >>> > >>> > > doesn't alter this or cause transactions to start being used; >>> > >>> everything >>> > >>> > is >>> > >>> > > already in a transaction, and the Connect runtime automatically >>> > >>> begins >>> > >>> > > transactions for any records it sees from the task if it hasn't >>> > >>> already >>> > >>> > > begun one. It's also valid to return a null or empty list of >>> > records >>> > >>> from >>> > >>> > > SourceTask::poll. So in this case, you can invoke >>> > >>> > > transactionContext.commitTransaction() (the no-args variant) >>> and >>> > >>> return >>> > >>> > an >>> > >>> > > empty batch from SourceTask::poll, which will cause the >>> transaction >>> > >>> > > containing the 4 valid records that were returned in the last 2 >>> > >>> batches >>> > >>> > to >>> > >>> > > be committed. >>> > >>> > > >>> > >>> > > FWIW, I would be a little cautious about this approach. Many >>> times >>> > >>> it's >>> > >>> > > better to fail fast on invalid data; it might be worth it to at >>> > least >>> > >>> > allow >>> > >>> > > users to configure whether the connector fails on invalid >>> data, or >>> > >>> > silently >>> > >>> > > skips over it (which is what happens when transactions are >>> > aborted). >>> > >>> > > >>> > >>> > > > Why is abort not working without adding the last record to >>> the >>> > >>> list? >>> > >>> > > >>> > >>> > > Is it possible that there's a case where an abort is being >>> > requested >>> > >>> > while >>> > >>> > > the current transaction is empty (i.e., the task hasn't >>> returned >>> > any >>> > >>> > > records from SourceTask::poll since the last transaction was >>> > >>> > > committed/aborted)? I think this may be a bug in the Connect >>> > >>> framework >>> > >>> > > where we don't check to see if a transaction is already open >>> when a >>> > >>> task >>> > >>> > > requests that a transaction be aborted, which can cause tasks >>> to >>> > fail >>> > >>> > (see >>> > >>> > > https://issues.apache.org/jira/browse/KAFKA-14799 for more >>> > details). >>> > >>> > > >>> > >>> > > Cheers, >>> > >>> > > >>> > >>> > > Chris >>> > >>> > > >>> > >>> > > >>> > >>> > > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY < >>> nittybe...@gmail.com> >>> > >>> wrote: >>> > >>> > > >>> > >>> > > > Hi Chris, >>> > >>> > > > >>> > >>> > > > I am not sure if you are able to see the images I shared with >>> > you . >>> > >>> > > > Copying the code snippet below, >>> > >>> > > > >>> > >>> > > > if (expectedRecordCount >= 0) { >>> > >>> > > > int missingCount = expectedRecordCount - (int) >>> this. >>> > >>> > > > recordOffset() - 1; >>> > >>> > > > if (missingCount > 0) { >>> > >>> > > > if (transactionContext != null) { >>> > >>> > > > isMissedRecords = true; >>> > >>> > > > } else { >>> > >>> > > > throw new >>> DataException(String.format("Missing %d >>> > >>> > records >>> > >>> > > > (expecting %d, actual %d)", missingCount, >>> expectedRecordCount, >>> > >>> this. >>> > >>> > > > recordOffset())); >>> > >>> > > > } >>> > >>> > > > } else if (missingCount < 0) { >>> > >>> > > > if (transactionContext != null) { >>> > >>> > > > isMissedRecords = true; >>> > >>> > > > } else { >>> > >>> > > > throw new DataException(String.format("Too >>> many >>> > >>> records >>> > >>> > > > (expecting %d, actual %d)", expectedRecordCount, >>> > >>> this.recordOffset())); >>> > >>> > > > } >>> > >>> > > > } >>> > >>> > > > } >>> > >>> > > > addLastRecord(records, null, value); >>> > >>> > > > } >>> > >>> > > > >>> > >>> > > > >>> > >>> > > > >>> > >>> > > > //asn1 or binary abort >>> > >>> > > > if((config.parseErrorThreshold != null && >>> parseErrorCount >>> > >>> >= >>> > >>> > > > config.parseErrorThreshold >>> > >>> > > > && lastbatch && transactionContext != null) || >>> > >>> (isMissedRecords >>> > >>> > > > && transactionContext != null && lastbatch)) { >>> > >>> > > > log.info("Transaction is aborting"); >>> > >>> > > > log.info("records = {}", records); >>> > >>> > > > if (!records.isEmpty()) { >>> > >>> > > > log.info("with record"); >>> > >>> > > > >>> > >>> > > transactionContext.abortTransaction(records.get(records.size >>> > >>> > > > ()-1)); >>> > >>> > > > } else { >>> > >>> > > > log.info("without record"); >>> > >>> > > > transactionContext.abortTransaction(); >>> > >>> > > > } >>> > >>> > > > >>> > >>> > > > Thanks, >>> > >>> > > > Nitty >>> > >>> > > > >>> > >>> > > > On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY < >>> > nittybe...@gmail.com> >>> > >>> > > wrote: >>> > >>> > > > >>> > >>> > > >> Hi Chris, >>> > >>> > > >> Sorry for the typo in my previous email. >>> > >>> > > >> >>> > >>> > > >> Regarding the point 2,* the task returns a batch of records >>> from >>> > >>> > > >> SourceTask::poll (and, if using* >>> > >>> > > >> >>> > >>> > > >> >>> > >>> > > >> *the per-record API provided by the TransactionContext >>> class, >>> > >>> includes >>> > >>> > > >> atleast one record that should trigger a transaction >>> > commit/abort >>> > >>> in >>> > >>> > > >> thatbatch)* >>> > >>> > > >> What if I am using the API without passing a record? We >>> have 2 >>> > >>> types >>> > >>> > of >>> > >>> > > >> use cases, one where on encountering an error record, we >>> want to >>> > >>> > commit >>> > >>> > > >> previous successful batches and disregard the failed record >>> and >>> > >>> > upcoming >>> > >>> > > >> batches. In this case we created the transactionContext just >>> > >>> before >>> > >>> > > reading >>> > >>> > > >> the file (file is our transaction boundary).Say if I have a >>> file >>> > >>> with >>> > >>> > 5 >>> > >>> > > >> records and batch size is 2, and in my 3rd batch I have one >>> > error >>> > >>> > record >>> > >>> > > >> then in that batch, I dont have a valid record to call >>> commit or >>> > >>> > abort. >>> > >>> > > But >>> > >>> > > >> I want to commit all the previous batches that were >>> successfully >>> > >>> > parsed. >>> > >>> > > >> How do I do that? >>> > >>> > > >> >>> > >>> > > >> Second use case is where I want to abort a transaction if >>> the >>> > >>> record >>> > >>> > > >> count doesn't match. >>> > >>> > > >> Code Snippet : >>> > >>> > > >> [image: image.png] >>> > >>> > > >> There are no error records in this case. If you see I added >>> the >>> > >>> > > condition >>> > >>> > > >> of transactionContext check to implement exactly once, >>> without >>> > >>> > > >> transaction it was just throwing the exception without >>> calling >>> > the >>> > >>> > > >> addLastRecord() method and in the catch block it just logs >>> the >>> > >>> message >>> > >>> > > and >>> > >>> > > >> return the list of records without the last record to >>> poll().To >>> > >>> make >>> > >>> > it >>> > >>> > > >> work, I called the method addLastRecord() in this case, so >>> it is >>> > >>> not >>> > >>> > > >> throwing the exception and list has last record as well. >>> Then I >>> > >>> called >>> > >>> > > the >>> > >>> > > >> abort, everything got aborted. Why is abort not working >>> without >>> > >>> adding >>> > >>> > > the >>> > >>> > > >> last record to the list? >>> > >>> > > >> [image: image.png] >>> > >>> > > >> >>> > >>> > > >> Code to call abort. >>> > >>> > > >> >>> > >>> > > >> >>> > >>> > > >> >>> > >>> > > >> >>> > >>> > > >> Thanks, >>> > >>> > > >> Nitty >>> > >>> > > >> >>> > >>> > > >> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton >>> > >>> <chr...@aiven.io.invalid >>> > >>> > > >>> > >>> > > >> wrote: >>> > >>> > > >> >>> > >>> > > >>> Hi Nitty, >>> > >>> > > >>> >>> > >>> > > >>> I'm a little confused about what you mean by this part: >>> > >>> > > >>> >>> > >>> > > >>> > transaction is not getting completed because it is not >>> > >>> commiting >>> > >>> > the >>> > >>> > > >>> transaction offest. >>> > >>> > > >>> >>> > >>> > > >>> The only conditions required for a transaction to be >>> completed >>> > >>> when a >>> > >>> > > >>> connector is defining its own transaction boundaries are: >>> > >>> > > >>> >>> > >>> > > >>> 1. The task requests a transaction commit/abort from the >>> > >>> > > >>> TransactionContext >>> > >>> > > >>> 2. The task returns a batch of records from >>> SourceTask::poll >>> > >>> (and, if >>> > >>> > > >>> using >>> > >>> > > >>> the per-record API provided by the TransactionContext >>> class, >>> > >>> includes >>> > >>> > > at >>> > >>> > > >>> least one record that should trigger a transaction >>> commit/abort >>> > >>> in >>> > >>> > that >>> > >>> > > >>> batch) >>> > >>> > > >>> >>> > >>> > > >>> The Connect runtime should automatically commit source >>> offsets >>> > to >>> > >>> > Kafka >>> > >>> > > >>> whenever a transaction is completed, either by commit or >>> abort. >>> > >>> This >>> > >>> > is >>> > >>> > > >>> because transactions should only be aborted for data that >>> > should >>> > >>> > never >>> > >>> > > be >>> > >>> > > >>> re-read by the connector; if there is a validation error >>> that >>> > >>> should >>> > >>> > be >>> > >>> > > >>> handled by reconfiguring the connector, then the task >>> should >>> > >>> throw an >>> > >>> > > >>> exception instead of aborting the transaction. >>> > >>> > > >>> >>> > >>> > > >>> If possible, do you think you could provide a brief code >>> > snippet >>> > >>> > > >>> illustrating what your task is doing that's causing issues? >>> > >>> > > >>> >>> > >>> > > >>> Cheers, >>> > >>> > > >>> >>> > >>> > > >>> Chris (not Chrise 🙂) >>> > >>> > > >>> >>> > >>> > > >>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY < >>> > >>> nittybe...@gmail.com> >>> > >>> > > >>> wrote: >>> > >>> > > >>> >>> > >>> > > >>> > Hi Chrise, >>> > >>> > > >>> > >>> > >>> > > >>> > Thanks for sharing the details. >>> > >>> > > >>> > >>> > >>> > > >>> > Regarding the use case, For Asn1 source connector we >>> have a >>> > use >>> > >>> > case >>> > >>> > > to >>> > >>> > > >>> > validate number of records in the file with the number of >>> > >>> records >>> > >>> > in >>> > >>> > > >>> the >>> > >>> > > >>> > header. So currently, if validation fails we are not >>> sending >>> > >>> the >>> > >>> > last >>> > >>> > > >>> > record to the topic. But after introducing exactly once >>> with >>> > >>> > > connector >>> > >>> > > >>> > transaction boundary, I can see that if I call an abort >>> when >>> > >>> the >>> > >>> > > >>> validation >>> > >>> > > >>> > fails, transaction is not getting completed because it >>> is not >>> > >>> > > >>> commiting the >>> > >>> > > >>> > transaction offest. I saw that transaction state changed >>> to >>> > >>> > > >>> CompleteAbort. >>> > >>> > > >>> > So for my next transaction I am getting >>> > >>> > InvalidProducerEpochException >>> > >>> > > >>> and >>> > >>> > > >>> > then task stopped after that. I tried calling the abort >>> after >>> > >>> > sending >>> > >>> > > >>> last >>> > >>> > > >>> > record to the topic then transaction getting completed. >>> > >>> > > >>> > >>> > >>> > > >>> > I dont know if I am doing anything wrong here. >>> > >>> > > >>> > >>> > >>> > > >>> > Please advise. >>> > >>> > > >>> > Thanks, >>> > >>> > > >>> > Nitty >>> > >>> > > >>> > >>> > >>> > > >>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton >>> > >>> > > <chr...@aiven.io.invalid >>> > >>> > > >>> > >>> > >>> > > >>> > wrote: >>> > >>> > > >>> > >>> > >>> > > >>> > > Hi Nitty, >>> > >>> > > >>> > > >>> > >>> > > >>> > > We've recently added some documentation on implementing >>> > >>> > > exactly-once >>> > >>> > > >>> > source >>> > >>> > > >>> > > connectors here: >>> > >>> > > >>> > > >>> > >>> > > >>> > >>> > >>> > > >>> >>> > >>> > > >>> > >>> > >>> > >>> >>> > >>> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors >>> > >>> > > >>> > > . >>> > >>> > > >>> > > To quote a relevant passage from those docs: >>> > >>> > > >>> > > >>> > >>> > > >>> > > > In order for a source connector to take advantage of >>> this >>> > >>> > > support, >>> > >>> > > >>> it >>> > >>> > > >>> > > must be able to provide meaningful source offsets for >>> each >>> > >>> record >>> > >>> > > >>> that it >>> > >>> > > >>> > > emits, and resume consumption from the external system >>> at >>> > the >>> > >>> > exact >>> > >>> > > >>> > > position corresponding to any of those offsets without >>> > >>> dropping >>> > >>> > or >>> > >>> > > >>> > > duplicating messages. >>> > >>> > > >>> > > >>> > >>> > > >>> > > So, as long as your source connector is able to use the >>> > Kafka >>> > >>> > > Connect >>> > >>> > > >>> > > framework's offsets API correctly, it shouldn't be >>> > necessary >>> > >>> to >>> > >>> > > make >>> > >>> > > >>> any >>> > >>> > > >>> > > other code changes to the connector. >>> > >>> > > >>> > > >>> > >>> > > >>> > > To enable exactly-once support for source connectors on >>> > your >>> > >>> > > Connect >>> > >>> > > >>> > > cluster, see the docs section here: >>> > >>> > > >>> > > >>> > >>> > >>> https://kafka.apache.org/documentation/#connect_exactlyoncesource >>> > >>> > > >>> > > >>> > >>> > > >>> > > With regard to transactions, a transactional producer >>> is >>> > >>> always >>> > >>> > > >>> created >>> > >>> > > >>> > > automatically for your connector by the Connect runtime >>> > when >>> > >>> > > >>> exactly-once >>> > >>> > > >>> > > support is enabled on the worker. The only reason to >>> set >>> > >>> > > >>> > > "transaction.boundary" to "connector" is if your >>> connector >>> > >>> would >>> > >>> > > >>> like to >>> > >>> > > >>> > > explicitly define its own transaction boundaries. In >>> this >>> > >>> case, >>> > >>> > it >>> > >>> > > >>> sounds >>> > >>> > > >>> > > like may be what you want; I just want to make sure to >>> call >>> > >>> out >>> > >>> > > that >>> > >>> > > >>> in >>> > >>> > > >>> > > either case, you should not be directly instantiating a >>> > >>> producer >>> > >>> > in >>> > >>> > > >>> your >>> > >>> > > >>> > > connector code, but let the Kafka Connect runtime do >>> that >>> > for >>> > >>> > you, >>> > >>> > > >>> and >>> > >>> > > >>> > just >>> > >>> > > >>> > > worry about returning the right records from >>> > SourceTask::poll >>> > >>> > (and >>> > >>> > > >>> > possibly >>> > >>> > > >>> > > defining custom transactions using the >>> TransactionContext >>> > >>> API). >>> > >>> > > >>> > > >>> > >>> > > >>> > > With respect to your question about committing or >>> aborting >>> > in >>> > >>> > > certain >>> > >>> > > >>> > > circumstances, it'd be useful to know more about your >>> use >>> > >>> case, >>> > >>> > > >>> since it >>> > >>> > > >>> > > may not be necessary to define custom transaction >>> > boundaries >>> > >>> in >>> > >>> > > your >>> > >>> > > >>> > > connector at all. >>> > >>> > > >>> > > >>> > >>> > > >>> > > Cheers, >>> > >>> > > >>> > > >>> > >>> > > >>> > > Chris >>> > >>> > > >>> > > >>> > >>> > > >>> > > >>> > >>> > > >>> > > >>> > >>> > > >>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY < >>> > >>> nittybe...@gmail.com >>> > >>> > > >>> > >>> > > >>> wrote: >>> > >>> > > >>> > > >>> > >>> > > >>> > > > Hi Team, >>> > >>> > > >>> > > > >>> > >>> > > >>> > > > Adding on top of this, I tried creating a >>> > >>> TransactionContext >>> > >>> > > >>> object and >>> > >>> > > >>> > > > calling the commitTransaction and abortTranaction >>> methods >>> > >>> in >>> > >>> > > source >>> > >>> > > >>> > > > connectors. >>> > >>> > > >>> > > > But the main problem I saw is that if there is any >>> error >>> > >>> while >>> > >>> > > >>> parsing >>> > >>> > > >>> > > the >>> > >>> > > >>> > > > record, connect is calling an abort but we have a use >>> > case >>> > >>> to >>> > >>> > > call >>> > >>> > > >>> > commit >>> > >>> > > >>> > > > in some cases. Is it a valid use case in terms of >>> kafka >>> > >>> > connect? >>> > >>> > > >>> > > > >>> > >>> > > >>> > > > Another Question - Should I use a transactional >>> producer >>> > >>> > instead >>> > >>> > > >>> > > > creating an object of TransactionContext? Below is >>> the >>> > >>> > connector >>> > >>> > > >>> > > > configuration I am using. >>> > >>> > > >>> > > > >>> > >>> > > >>> > > > exactly.once.support: "required" >>> > >>> > > >>> > > > transaction.boundary: "connector" >>> > >>> > > >>> > > > >>> > >>> > > >>> > > > Could you please help me here? >>> > >>> > > >>> > > > >>> > >>> > > >>> > > > Thanks, >>> > >>> > > >>> > > > Nitty >>> > >>> > > >>> > > > >>> > >>> > > >>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY < >>> > >>> > > nittybe...@gmail.com> >>> > >>> > > >>> > > wrote: >>> > >>> > > >>> > > > >>> > >>> > > >>> > > > > Hi Team, >>> > >>> > > >>> > > > > I am trying to implement exactly once behavior in >>> our >>> > >>> source >>> > >>> > > >>> > connector. >>> > >>> > > >>> > > > Is >>> > >>> > > >>> > > > > there any sample source connector implementation >>> > >>> available to >>> > >>> > > >>> have a >>> > >>> > > >>> > > look >>> > >>> > > >>> > > > > at? >>> > >>> > > >>> > > > > Regards, >>> > >>> > > >>> > > > > Nitty >>> > >>> > > >>> > > > > >>> > >>> > > >>> > > > >>> > >>> > > >>> > > >>> > >>> > > >>> > >>> > >>> > > >>> >>> > >>> > > >> >>> > >>> > > >>> > >>> > >>> > >>> >>> > >> >>> > >>> >>