Hi Chris,

Is there any possibility to have a call with you? This is actually blocking
our delivery, I actually want to sort with this.

Thanks,
Nitty

On Mon, Mar 13, 2023 at 8:18 PM NITTY BENNY <nittybe...@gmail.com> wrote:

> Hi Chris,
>
> I really don't understand why a graceful shutdown will happen during a
> commit operation? Am I understanding something wrong here?. I see
> this happens when I have a batch of 2 valid records and in the second
> batch the record is invalid. In that case I want to commit the valid
> records. So I called commit and sent an empty list for the current batch to
> poll() and then when the next file comes in and poll sees new records, I
> see InvalidProducerEpochException.
> Please advise me.
>
> Thanks,
> Nitty
>
> On Mon, Mar 13, 2023 at 5:33 PM NITTY BENNY <nittybe...@gmail.com> wrote:
>
>> Hi Chris,
>>
>> The difference is in the Task Classes, no difference for value/key
>> convertors.
>>
>> I don’t see log messages for graceful shutdown. I am not clear on what
>> you mean by shutting down the task.
>>
>> I called the commit operation for the successful records. Should I
>> perform any other steps if I have an invalid record?
>> Please advise.
>>
>> Thanks,
>> Nitty
>>
>> On Mon, Mar 13, 2023 at 3:42 PM Chris Egerton <chr...@aiven.io.invalid>
>> wrote:
>>
>>> Hi Nitty,
>>>
>>> Thanks again for all the details here, especially the log messages.
>>>
>>> > The below mentioned issue is happening for Json connector only. Is
>>> there
>>> any difference with asn1,binary,csv and json connector?
>>>
>>> Can you clarify if the difference here is in the Connector/Task classens,
>>> or if it's in the key/value converters that are configured for the
>>> connector? The key/value converters are configured using the
>>> "key.converter" and "value.converter" property and, if problems arise
>>> with
>>> them, the task will fail and, if it has a non-empty ongoing transaction,
>>> that transaction will be automatically aborted since we close the task's
>>> Kafka producer when it fails (or shuts down gracefully).
>>>
>>> With regards to these log messages:
>>>
>>> > org.apache.kafka.common.errors.ProducerFencedException: There is a
>>> newer
>>> producer with the same transactionalId which fences the current one.
>>>
>>> It looks like your tasks aren't shutting down gracefully in time, which
>>> causes them to be fenced out by the Connect framework later on. Do you
>>> see
>>> messages like "Graceful stop of task <TASK ID HERE> failed" in the logs
>>> for
>>> your Connect worker?
>>>
>>> Cheers,
>>>
>>> Chris
>>>
>>> On Mon, Mar 13, 2023 at 10:58 AM NITTY BENNY <nittybe...@gmail.com>
>>> wrote:
>>>
>>> > Hi Chris,
>>> >
>>> > As you said, the below message is coming when I call an abort if there
>>> is
>>> > an invalid record, then for the next transaction I can see the below
>>> > message and then the connector will be stopped.
>>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0]
>>> Aborting
>>> > transaction for batch as requested by connector
>>> > (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
>>> > [task-thread-json-sftp-source-connector-0]
>>> > 2023-03-13 14:28:26,043 INFO [json-sftp-source-connector|task-0]
>>> [Producer
>>> > clientId=connector-producer-json-sftp-source-connector-0,
>>> > transactionalId=connect-cluster-json-sftp-source-connector-0] Aborting
>>> > incomplete transaction
>>> (org.apache.kafka.clients.producer.KafkaProducer)
>>> > [task-thread-json-sftp-source-connector-0]
>>> >
>>> > The issue with InvalidProducerEpoch is happening when I call the
>>> commit if
>>> > there is an invalid record, and in the next transaction I am getting
>>> > InvalidProducerEpoch Exception and the messages are copied in the
>>> previous
>>> > email. I don't know if this will also be fixed by your bug fix.I am
>>> using
>>> > kafka 3.3.1 version as of now.
>>> >
>>> > Thanks,
>>> > Nitty
>>> >
>>> >
>>> > On Mon, Mar 13, 2023 at 10:47 AM NITTY BENNY <nittybe...@gmail.com>
>>> wrote:
>>> >
>>> > > Hi Chris,
>>> > >
>>> > > The below mentioned issue is happening for Json connector only. Is
>>> there
>>> > > any difference with asn1,binary,csv and json connector?
>>> > >
>>> > > Thanks,
>>> > > Nitty
>>> > >
>>> > > On Mon, Mar 13, 2023 at 9:16 AM NITTY BENNY <nittybe...@gmail.com>
>>> > wrote:
>>> > >
>>> > >> Hi Chris,
>>> > >>
>>> > >> Sorry Chris, I am not able to reproduce the above issue.
>>> > >>
>>> > >> I want to share with you one more use case I found.
>>> > >> The use case is in the first batch it returns 2 valid records and
>>> then
>>> > in
>>> > >> the next batch it is an invalid record.Below is the
>>> transaction_state
>>> > >> topic, when I call a commit while processing an invalid record.
>>> > >>
>>> > >>
>>> >
>>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>>> > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*,
>>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2),
>>> > >> txnStartTimestamp=1678620463834,
>>> txnLastUpdateTimestamp=1678620463834)
>>> > >>
>>> > >> then after some time I saw the below states as well,
>>> > >>
>>> > >>
>>> >
>>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>>> > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000,
>>> > state=*PrepareAbort*,
>>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2),
>>> > >> txnStartTimestamp=1678620463834,
>>> txnLastUpdateTimestamp=1678620526119)
>>> > >>
>>> >
>>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>>> > >> producerId=11, producerEpoch=3, txnTimeoutMs=60000,
>>> > state=*CompleteAbort*,
>>> > >> pendingState=None, topicPartitions=HashSet(),
>>> > >> txnStartTimestamp=1678620463834,
>>> txnLastUpdateTimestamp=1678620526121)
>>> > >>
>>> > >> Later for the next transaction, when it returns the first batch
>>> below is
>>> > >> the logs I can see.
>>> > >>
>>> > >>  Transiting to abortable error state due to
>>> > >> org.apache.kafka.common.errors.InvalidProducerEpochException:
>>> Producer
>>> > >> attempted to produce with an old epoch.
>>> > >> (org.apache.kafka.clients.producer.internals.TransactionManager)
>>> > >> [kafka-producer-network-thread |
>>> > >> connector-producer-json-sftp-source-connector-0]
>>> > >> 2023-03-12 11:32:45,220 ERROR [json-sftp-source-connector|task-0]
>>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed
>>> to
>>> > send
>>> > >> record to streams-input:
>>> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
>>> > >> [kafka-producer-network-thread |
>>> > >> connector-producer-json-sftp-source-connector-0]
>>> > >> org.apache.kafka.common.errors.InvalidProducerEpochException:
>>> Producer
>>> > >> attempted to produce with an old epoch.
>>> > >> 2023-03-12 11:32:45,222 INFO [json-sftp-source-connector|task-0]
>>> > >> [Producer clientId=connector-producer-json-sftp-source-connector-0,
>>> > >> transactionalId=connect-cluster-json-sftp-source-connector-0]
>>> > Transiting to
>>> > >> fatal error state due to
>>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
>>> newer
>>> > >> producer with the same transactionalId which fences the current one.
>>> > >> (org.apache.kafka.clients.producer.internals.TransactionManager)
>>> > >> [kafka-producer-network-thread |
>>> > >> connector-producer-json-sftp-source-connector-0]
>>> > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
>>> > >> [Producer clientId=connector-producer-json-sftp-source-connector-0,
>>> > >> transactionalId=connect-cluster-json-sftp-source-connector-0]
>>> Aborting
>>> > >> producer batches due to fatal error
>>> > >> (org.apache.kafka.clients.producer.internals.Sender)
>>> > >> [kafka-producer-network-thread |
>>> > >> connector-producer-json-sftp-source-connector-0]
>>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
>>> newer
>>> > >> producer with the same transactionalId which fences the current one.
>>> > >> 2023-03-12 11:32:45,222 ERROR [json-sftp-source-connector|task-0]
>>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed
>>> to
>>> > >> flush offsets to storage:
>>> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
>>> > >> [kafka-producer-network-thread |
>>> > >> connector-producer-json-sftp-source-connector-0]
>>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
>>> newer
>>> > >> producer with the same transactionalId which fences the current one.
>>> > >> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0]
>>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed
>>> to
>>> > send
>>> > >> record to streams-input:
>>> > >> (org.apache.kafka.connect.runtime.AbstractWorkerSourceTask)
>>> > >> [kafka-producer-network-thread |
>>> > >> connector-producer-json-sftp-source-connector-0]
>>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
>>> newer
>>> > >> producer with the same transactionalId which fences the current one.
>>> > >> 2023-03-12 11:32:45,222 ERROR
>>> > [json-sftp-source-connector|task-0|offsets]
>>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Failed
>>> to
>>> > >> commit producer transaction
>>> > >> (org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask)
>>> > >> [task-thread-json-sftp-source-connector-0]
>>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
>>> newer
>>> > >> producer with the same transactionalId which fences the current one.
>>> > >> 2023-03-12 11:32:45,225 ERROR [json-sftp-source-connector|task-0]
>>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} Task
>>> threw
>>> > an
>>> > >> uncaught and unrecoverable exception. Task is being killed and will
>>> not
>>> > >> recover until manually restarted
>>> > >> (org.apache.kafka.connect.runtime.WorkerTask)
>>> > >> [task-thread-json-sftp-source-connector-0]
>>> > >>
>>> > >> Do you know why it is showing an abort state even if I call commit?
>>> > >>
>>> > >> I tested one more scenario, When I call the commit I saw the below
>>> > >>
>>> >
>>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>>> > >> producerId=11, producerEpoch=2, txnTimeoutMs=60000, state=*Ongoing*,
>>> > >> pendingState=None, topicPartitions=HashSet(streams-input-2),
>>> > >> txnStartTimestamp=1678620463834,
>>> txnLastUpdateTimestamp=1678620463834)
>>> > >> Then, before changing the states to Abort, I dropped the next file
>>> then
>>> > I
>>> > >> dont see any issues. Previous transaction
>>> > >> as well as the current transaction are committed.
>>> > >>
>>> > >> Thank you for your support.
>>> > >>
>>> > >> Thanks,
>>> > >> Nitty
>>> > >>
>>> > >> On Fri, Mar 10, 2023 at 8:04 PM Chris Egerton
>>> <chr...@aiven.io.invalid>
>>> > >> wrote:
>>> > >>
>>> > >>> Hi Nitty,
>>> > >>>
>>> > >>> > I called commitTransaction when I reach the first error record,
>>> but
>>> > >>> commit is not happening for me. Kafka connect tries to abort the
>>> > >>> transaction automatically
>>> > >>>
>>> > >>> This is really interesting--are you certain that your task never
>>> > invoked
>>> > >>> TransactionContext::abortTransaction in this case? I'm looking
>>> over the
>>> > >>> code base and it seems fairly clear that the only thing that could
>>> > >>> trigger
>>> > >>> a call to KafkaProducer::abortTransaction is a request by the task
>>> to
>>> > >>> abort
>>> > >>> a transaction (either for a next batch, or for a specific record).
>>> It
>>> > may
>>> > >>> help to run the connector in a debugger and/or look for "Aborting
>>> > >>> transaction for batch as requested by connector" or "Aborting
>>> > transaction
>>> > >>> for record on topic <TOPIC NAME HERE> as requested by connector"
>>> log
>>> > >>> messages (which will be emitted at INFO level by
>>> > >>> the org.apache.kafka.connect.runtime.ExactlyOnceWorkerSourceTask
>>> class
>>> > if
>>> > >>> the task is requesting an abort).
>>> > >>>
>>> > >>> Regardless, I'll work on a fix for the bug with aborting empty
>>> > >>> transactions. Thanks for helping uncover that one!
>>> > >>>
>>> > >>> Cheers,
>>> > >>>
>>> > >>> Chris
>>> > >>>
>>> > >>> On Thu, Mar 9, 2023 at 6:36 PM NITTY BENNY <nittybe...@gmail.com>
>>> > wrote:
>>> > >>>
>>> > >>> > Hi Chris,
>>> > >>> >
>>> > >>> > We have a use case to commit previous successful records and
>>> stop the
>>> > >>> > processing of the current file and move on with the next file. To
>>> > >>> achieve
>>> > >>> > that I called commitTransaction when I reach the first error
>>> record,
>>> > >>> but
>>> > >>> > commit is not happening for me. Kafka connect tries to abort the
>>> > >>> > transaction automatically, I checked the _transaction_state
>>> topic and
>>> > >>> > states marked as PrepareAbort and CompleteAbort. Do you know why
>>> > kafka
>>> > >>> > connect automatically invokes abort instead of the implicit
>>> commit I
>>> > >>> > called?
>>> > >>> > Then as a result, when I tries to parse the next file - say ABC,
>>> I
>>> > saw
>>> > >>> the
>>> > >>> > logs "Aborting incomplete transaction" and ERROR: "Failed to sent
>>> > >>> record to
>>> > >>> > topic", and we lost the first batch of records from the current
>>> > >>> transaction
>>> > >>> > in the file ABC.
>>> > >>> >
>>> > >>> > Is it possible that there's a case where an abort is being
>>> requested
>>> > >>> while
>>> > >>> > the current transaction is empty (i.e., the task hasn't returned
>>> any
>>> > >>> > records from SourceTask::poll since the last transaction was
>>> > >>> > committed/aborted)? --- Yes, that case is possible for us. There
>>> is a
>>> > >>> case
>>> > >>> > where the first record itself an error record.
>>> > >>> >
>>> > >>> > Thanks,
>>> > >>> > Nitty
>>> > >>> >
>>> > >>> > On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton
>>> <chr...@aiven.io.invalid
>>> > >
>>> > >>> > wrote:
>>> > >>> >
>>> > >>> > > Hi Nitty,
>>> > >>> > >
>>> > >>> > > Thanks for the code examples and the detailed explanations,
>>> this is
>>> > >>> > really
>>> > >>> > > helpful!
>>> > >>> > >
>>> > >>> > > > Say if I have a file with 5 records and batch size is 2, and
>>> in
>>> > my
>>> > >>> 3rd
>>> > >>> > > batch I have one error record then in that batch, I dont have a
>>> > valid
>>> > >>> > > record to call commit or abort. But I want to commit all the
>>> > previous
>>> > >>> > > batches that were successfully parsed. How do I do that?
>>> > >>> > >
>>> > >>> > > An important thing to keep in mind with the TransactionContext
>>> API
>>> > is
>>> > >>> > that
>>> > >>> > > all records that a task returns from SourceTask::poll are
>>> > implicitly
>>> > >>> > > included in a transaction. Invoking
>>> > >>> SourceTaskContext::transactionContext
>>> > >>> > > doesn't alter this or cause transactions to start being used;
>>> > >>> everything
>>> > >>> > is
>>> > >>> > > already in a transaction, and the Connect runtime automatically
>>> > >>> begins
>>> > >>> > > transactions for any records it sees from the task if it hasn't
>>> > >>> already
>>> > >>> > > begun one. It's also valid to return a null or empty list of
>>> > records
>>> > >>> from
>>> > >>> > > SourceTask::poll. So in this case, you can invoke
>>> > >>> > > transactionContext.commitTransaction() (the no-args variant)
>>> and
>>> > >>> return
>>> > >>> > an
>>> > >>> > > empty batch from SourceTask::poll, which will cause the
>>> transaction
>>> > >>> > > containing the 4 valid records that were returned in the last 2
>>> > >>> batches
>>> > >>> > to
>>> > >>> > > be committed.
>>> > >>> > >
>>> > >>> > > FWIW, I would be a little cautious about this approach. Many
>>> times
>>> > >>> it's
>>> > >>> > > better to fail fast on invalid data; it might be worth it to at
>>> > least
>>> > >>> > allow
>>> > >>> > > users to configure whether the connector fails on invalid
>>> data, or
>>> > >>> > silently
>>> > >>> > > skips over it (which is what happens when transactions are
>>> > aborted).
>>> > >>> > >
>>> > >>> > > > Why is abort not working without adding the last record to
>>> the
>>> > >>> list?
>>> > >>> > >
>>> > >>> > > Is it possible that there's a case where an abort is being
>>> > requested
>>> > >>> > while
>>> > >>> > > the current transaction is empty (i.e., the task hasn't
>>> returned
>>> > any
>>> > >>> > > records from SourceTask::poll since the last transaction was
>>> > >>> > > committed/aborted)? I think this may be a bug in the Connect
>>> > >>> framework
>>> > >>> > > where we don't check to see if a transaction is already open
>>> when a
>>> > >>> task
>>> > >>> > > requests that a transaction be aborted, which can cause tasks
>>> to
>>> > fail
>>> > >>> > (see
>>> > >>> > > https://issues.apache.org/jira/browse/KAFKA-14799 for more
>>> > details).
>>> > >>> > >
>>> > >>> > > Cheers,
>>> > >>> > >
>>> > >>> > > Chris
>>> > >>> > >
>>> > >>> > >
>>> > >>> > > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY <
>>> nittybe...@gmail.com>
>>> > >>> wrote:
>>> > >>> > >
>>> > >>> > > > Hi Chris,
>>> > >>> > > >
>>> > >>> > > > I am not sure if you are able to see the images I shared with
>>> > you .
>>> > >>> > > > Copying the code snippet below,
>>> > >>> > > >
>>> > >>> > > >  if (expectedRecordCount >= 0) {
>>> > >>> > > >             int missingCount = expectedRecordCount - (int)
>>> this.
>>> > >>> > > > recordOffset() - 1;
>>> > >>> > > >             if (missingCount > 0) {
>>> > >>> > > >               if (transactionContext != null) {
>>> > >>> > > >                 isMissedRecords = true;
>>> > >>> > > >               } else {
>>> > >>> > > >                 throw new
>>> DataException(String.format("Missing %d
>>> > >>> > records
>>> > >>> > > > (expecting %d, actual %d)", missingCount,
>>> expectedRecordCount,
>>> > >>> this.
>>> > >>> > > > recordOffset()));
>>> > >>> > > >               }
>>> > >>> > > >             } else if (missingCount < 0) {
>>> > >>> > > >               if (transactionContext != null) {
>>> > >>> > > >                 isMissedRecords = true;
>>> > >>> > > >               } else {
>>> > >>> > > >                 throw new DataException(String.format("Too
>>> many
>>> > >>> records
>>> > >>> > > > (expecting %d, actual %d)", expectedRecordCount,
>>> > >>> this.recordOffset()));
>>> > >>> > > >               }
>>> > >>> > > >             }
>>> > >>> > > >           }
>>> > >>> > > >           addLastRecord(records, null, value);
>>> > >>> > > >         }
>>> > >>> > > >
>>> > >>> > > >
>>> > >>> > > >
>>> > >>> > > >  //asn1 or binary abort
>>> > >>> > > >         if((config.parseErrorThreshold != null &&
>>> parseErrorCount
>>> > >>> >=
>>> > >>> > > > config.parseErrorThreshold
>>> > >>> > > >         && lastbatch && transactionContext != null) ||
>>> > >>> (isMissedRecords
>>> > >>> > > > && transactionContext != null && lastbatch)) {
>>> > >>> > > >           log.info("Transaction is aborting");
>>> > >>> > > >             log.info("records = {}", records);
>>> > >>> > > >             if (!records.isEmpty()) {
>>> > >>> > > >               log.info("with record");
>>> > >>> > > >
>>> > >>> > >  transactionContext.abortTransaction(records.get(records.size
>>> > >>> > > > ()-1));
>>> > >>> > > >             } else {
>>> > >>> > > >               log.info("without record");
>>> > >>> > > >               transactionContext.abortTransaction();
>>> > >>> > > >             }
>>> > >>> > > >
>>> > >>> > > > Thanks,
>>> > >>> > > > Nitty
>>> > >>> > > >
>>> > >>> > > > On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY <
>>> > nittybe...@gmail.com>
>>> > >>> > > wrote:
>>> > >>> > > >
>>> > >>> > > >> Hi Chris,
>>> > >>> > > >> Sorry for the typo in my previous email.
>>> > >>> > > >>
>>> > >>> > > >> Regarding the point 2,* the task returns a batch of records
>>> from
>>> > >>> > > >> SourceTask::poll (and, if using*
>>> > >>> > > >>
>>> > >>> > > >>
>>> > >>> > > >> *the per-record API provided by the TransactionContext
>>> class,
>>> > >>> includes
>>> > >>> > > >> atleast one record that should trigger a transaction
>>> > commit/abort
>>> > >>> in
>>> > >>> > > >> thatbatch)*
>>> > >>> > > >> What if I am using the API without passing a record? We
>>> have 2
>>> > >>> types
>>> > >>> > of
>>> > >>> > > >> use cases, one where on encountering an error record, we
>>> want to
>>> > >>> > commit
>>> > >>> > > >> previous successful batches and disregard the failed record
>>> and
>>> > >>> > upcoming
>>> > >>> > > >> batches. In this case we created the transactionContext just
>>> > >>> before
>>> > >>> > > reading
>>> > >>> > > >> the file (file is our transaction boundary).Say if I have a
>>> file
>>> > >>> with
>>> > >>> > 5
>>> > >>> > > >> records and batch size is 2, and in my 3rd batch I have one
>>> > error
>>> > >>> > record
>>> > >>> > > >> then in that batch, I dont have a valid record to call
>>> commit or
>>> > >>> > abort.
>>> > >>> > > But
>>> > >>> > > >> I want to commit all the previous batches that were
>>> successfully
>>> > >>> > parsed.
>>> > >>> > > >> How do I do that?
>>> > >>> > > >>
>>> > >>> > > >> Second use case is where I want to abort a transaction if
>>> the
>>> > >>> record
>>> > >>> > > >> count doesn't match.
>>> > >>> > > >> Code Snippet :
>>> > >>> > > >> [image: image.png]
>>> > >>> > > >> There are no error records in this case. If you see I added
>>> the
>>> > >>> > > condition
>>> > >>> > > >> of transactionContext check to implement exactly once,
>>> without
>>> > >>> > > >> transaction it was just throwing the exception without
>>> calling
>>> > the
>>> > >>> > > >> addLastRecord() method and in the catch block it just logs
>>> the
>>> > >>> message
>>> > >>> > > and
>>> > >>> > > >> return the list of records without the last record to
>>> poll().To
>>> > >>> make
>>> > >>> > it
>>> > >>> > > >> work, I called the method addLastRecord() in this case, so
>>> it is
>>> > >>> not
>>> > >>> > > >> throwing the exception and list has last record as well.
>>> Then I
>>> > >>> called
>>> > >>> > > the
>>> > >>> > > >> abort, everything got aborted. Why is abort not working
>>> without
>>> > >>> adding
>>> > >>> > > the
>>> > >>> > > >> last record to the list?
>>> > >>> > > >> [image: image.png]
>>> > >>> > > >>
>>> > >>> > > >> Code to call abort.
>>> > >>> > > >>
>>> > >>> > > >>
>>> > >>> > > >>
>>> > >>> > > >>
>>> > >>> > > >> Thanks,
>>> > >>> > > >> Nitty
>>> > >>> > > >>
>>> > >>> > > >> On Wed, Mar 8, 2023 at 4:26 PM Chris Egerton
>>> > >>> <chr...@aiven.io.invalid
>>> > >>> > >
>>> > >>> > > >> wrote:
>>> > >>> > > >>
>>> > >>> > > >>> Hi Nitty,
>>> > >>> > > >>>
>>> > >>> > > >>> I'm a little confused about what you mean by this part:
>>> > >>> > > >>>
>>> > >>> > > >>> > transaction is not getting completed because it is not
>>> > >>> commiting
>>> > >>> > the
>>> > >>> > > >>> transaction offest.
>>> > >>> > > >>>
>>> > >>> > > >>> The only conditions required for a transaction to be
>>> completed
>>> > >>> when a
>>> > >>> > > >>> connector is defining its own transaction boundaries are:
>>> > >>> > > >>>
>>> > >>> > > >>> 1. The task requests a transaction commit/abort from the
>>> > >>> > > >>> TransactionContext
>>> > >>> > > >>> 2. The task returns a batch of records from
>>> SourceTask::poll
>>> > >>> (and, if
>>> > >>> > > >>> using
>>> > >>> > > >>> the per-record API provided by the TransactionContext
>>> class,
>>> > >>> includes
>>> > >>> > > at
>>> > >>> > > >>> least one record that should trigger a transaction
>>> commit/abort
>>> > >>> in
>>> > >>> > that
>>> > >>> > > >>> batch)
>>> > >>> > > >>>
>>> > >>> > > >>> The Connect runtime should automatically commit source
>>> offsets
>>> > to
>>> > >>> > Kafka
>>> > >>> > > >>> whenever a transaction is completed, either by commit or
>>> abort.
>>> > >>> This
>>> > >>> > is
>>> > >>> > > >>> because transactions should only be aborted for data that
>>> > should
>>> > >>> > never
>>> > >>> > > be
>>> > >>> > > >>> re-read by the connector; if there is a validation error
>>> that
>>> > >>> should
>>> > >>> > be
>>> > >>> > > >>> handled by reconfiguring the connector, then the task
>>> should
>>> > >>> throw an
>>> > >>> > > >>> exception instead of aborting the transaction.
>>> > >>> > > >>>
>>> > >>> > > >>> If possible, do you think you could provide a brief code
>>> > snippet
>>> > >>> > > >>> illustrating what your task is doing that's causing issues?
>>> > >>> > > >>>
>>> > >>> > > >>> Cheers,
>>> > >>> > > >>>
>>> > >>> > > >>> Chris (not Chrise 🙂)
>>> > >>> > > >>>
>>> > >>> > > >>> On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY <
>>> > >>> nittybe...@gmail.com>
>>> > >>> > > >>> wrote:
>>> > >>> > > >>>
>>> > >>> > > >>> > Hi Chrise,
>>> > >>> > > >>> >
>>> > >>> > > >>> > Thanks for sharing the details.
>>> > >>> > > >>> >
>>> > >>> > > >>> > Regarding the use case, For Asn1 source connector we
>>> have a
>>> > use
>>> > >>> > case
>>> > >>> > > to
>>> > >>> > > >>> > validate number of records in the file with the number of
>>> > >>> records
>>> > >>> > in
>>> > >>> > > >>> the
>>> > >>> > > >>> > header. So currently, if validation fails we are not
>>> sending
>>> > >>> the
>>> > >>> > last
>>> > >>> > > >>> > record to the topic. But after introducing exactly once
>>> with
>>> > >>> > > connector
>>> > >>> > > >>> > transaction boundary, I can see that if I call an abort
>>> when
>>> > >>> the
>>> > >>> > > >>> validation
>>> > >>> > > >>> > fails, transaction is not getting completed because it
>>> is not
>>> > >>> > > >>> commiting the
>>> > >>> > > >>> > transaction offest. I saw that transaction state changed
>>> to
>>> > >>> > > >>> CompleteAbort.
>>> > >>> > > >>> > So for my next transaction I am getting
>>> > >>> > InvalidProducerEpochException
>>> > >>> > > >>> and
>>> > >>> > > >>> > then task stopped after that. I tried calling the abort
>>> after
>>> > >>> > sending
>>> > >>> > > >>> last
>>> > >>> > > >>> > record to the topic then transaction getting completed.
>>> > >>> > > >>> >
>>> > >>> > > >>> > I dont know if I am doing anything wrong here.
>>> > >>> > > >>> >
>>> > >>> > > >>> > Please advise.
>>> > >>> > > >>> > Thanks,
>>> > >>> > > >>> > Nitty
>>> > >>> > > >>> >
>>> > >>> > > >>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton
>>> > >>> > > <chr...@aiven.io.invalid
>>> > >>> > > >>> >
>>> > >>> > > >>> > wrote:
>>> > >>> > > >>> >
>>> > >>> > > >>> > > Hi Nitty,
>>> > >>> > > >>> > >
>>> > >>> > > >>> > > We've recently added some documentation on implementing
>>> > >>> > > exactly-once
>>> > >>> > > >>> > source
>>> > >>> > > >>> > > connectors here:
>>> > >>> > > >>> > >
>>> > >>> > > >>> >
>>> > >>> > > >>>
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> >
>>> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
>>> > >>> > > >>> > > .
>>> > >>> > > >>> > > To quote a relevant passage from those docs:
>>> > >>> > > >>> > >
>>> > >>> > > >>> > > > In order for a source connector to take advantage of
>>> this
>>> > >>> > > support,
>>> > >>> > > >>> it
>>> > >>> > > >>> > > must be able to provide meaningful source offsets for
>>> each
>>> > >>> record
>>> > >>> > > >>> that it
>>> > >>> > > >>> > > emits, and resume consumption from the external system
>>> at
>>> > the
>>> > >>> > exact
>>> > >>> > > >>> > > position corresponding to any of those offsets without
>>> > >>> dropping
>>> > >>> > or
>>> > >>> > > >>> > > duplicating messages.
>>> > >>> > > >>> > >
>>> > >>> > > >>> > > So, as long as your source connector is able to use the
>>> > Kafka
>>> > >>> > > Connect
>>> > >>> > > >>> > > framework's offsets API correctly, it shouldn't be
>>> > necessary
>>> > >>> to
>>> > >>> > > make
>>> > >>> > > >>> any
>>> > >>> > > >>> > > other code changes to the connector.
>>> > >>> > > >>> > >
>>> > >>> > > >>> > > To enable exactly-once support for source connectors on
>>> > your
>>> > >>> > > Connect
>>> > >>> > > >>> > > cluster, see the docs section here:
>>> > >>> > > >>> > >
>>> > >>> >
>>> https://kafka.apache.org/documentation/#connect_exactlyoncesource
>>> > >>> > > >>> > >
>>> > >>> > > >>> > > With regard to transactions, a transactional producer
>>> is
>>> > >>> always
>>> > >>> > > >>> created
>>> > >>> > > >>> > > automatically for your connector by the Connect runtime
>>> > when
>>> > >>> > > >>> exactly-once
>>> > >>> > > >>> > > support is enabled on the worker. The only reason to
>>> set
>>> > >>> > > >>> > > "transaction.boundary" to "connector" is if your
>>> connector
>>> > >>> would
>>> > >>> > > >>> like to
>>> > >>> > > >>> > > explicitly define its own transaction boundaries. In
>>> this
>>> > >>> case,
>>> > >>> > it
>>> > >>> > > >>> sounds
>>> > >>> > > >>> > > like may be what you want; I just want to make sure to
>>> call
>>> > >>> out
>>> > >>> > > that
>>> > >>> > > >>> in
>>> > >>> > > >>> > > either case, you should not be directly instantiating a
>>> > >>> producer
>>> > >>> > in
>>> > >>> > > >>> your
>>> > >>> > > >>> > > connector code, but let the Kafka Connect runtime do
>>> that
>>> > for
>>> > >>> > you,
>>> > >>> > > >>> and
>>> > >>> > > >>> > just
>>> > >>> > > >>> > > worry about returning the right records from
>>> > SourceTask::poll
>>> > >>> > (and
>>> > >>> > > >>> > possibly
>>> > >>> > > >>> > > defining custom transactions using the
>>> TransactionContext
>>> > >>> API).
>>> > >>> > > >>> > >
>>> > >>> > > >>> > > With respect to your question about committing or
>>> aborting
>>> > in
>>> > >>> > > certain
>>> > >>> > > >>> > > circumstances, it'd be useful to know more about your
>>> use
>>> > >>> case,
>>> > >>> > > >>> since it
>>> > >>> > > >>> > > may not be necessary to define custom transaction
>>> > boundaries
>>> > >>> in
>>> > >>> > > your
>>> > >>> > > >>> > > connector at all.
>>> > >>> > > >>> > >
>>> > >>> > > >>> > > Cheers,
>>> > >>> > > >>> > >
>>> > >>> > > >>> > > Chris
>>> > >>> > > >>> > >
>>> > >>> > > >>> > >
>>> > >>> > > >>> > >
>>> > >>> > > >>> > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY <
>>> > >>> nittybe...@gmail.com
>>> > >>> > >
>>> > >>> > > >>> wrote:
>>> > >>> > > >>> > >
>>> > >>> > > >>> > > > Hi Team,
>>> > >>> > > >>> > > >
>>> > >>> > > >>> > > > Adding on top of this, I tried creating a
>>> > >>> TransactionContext
>>> > >>> > > >>> object and
>>> > >>> > > >>> > > > calling the commitTransaction and abortTranaction
>>> methods
>>> > >>> in
>>> > >>> > > source
>>> > >>> > > >>> > > > connectors.
>>> > >>> > > >>> > > > But the main problem I saw is that if there is any
>>> error
>>> > >>> while
>>> > >>> > > >>> parsing
>>> > >>> > > >>> > > the
>>> > >>> > > >>> > > > record, connect is calling an abort but we have a use
>>> > case
>>> > >>> to
>>> > >>> > > call
>>> > >>> > > >>> > commit
>>> > >>> > > >>> > > > in some cases. Is it a valid use case in terms of
>>> kafka
>>> > >>> > connect?
>>> > >>> > > >>> > > >
>>> > >>> > > >>> > > > Another Question - Should I use a transactional
>>> producer
>>> > >>> > instead
>>> > >>> > > >>> > > > creating an object of TransactionContext? Below is
>>> the
>>> > >>> > connector
>>> > >>> > > >>> > > > configuration I am using.
>>> > >>> > > >>> > > >
>>> > >>> > > >>> > > >   exactly.once.support: "required"
>>> > >>> > > >>> > > >   transaction.boundary: "connector"
>>> > >>> > > >>> > > >
>>> > >>> > > >>> > > > Could you please help me here?
>>> > >>> > > >>> > > >
>>> > >>> > > >>> > > > Thanks,
>>> > >>> > > >>> > > > Nitty
>>> > >>> > > >>> > > >
>>> > >>> > > >>> > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY <
>>> > >>> > > nittybe...@gmail.com>
>>> > >>> > > >>> > > wrote:
>>> > >>> > > >>> > > >
>>> > >>> > > >>> > > > > Hi Team,
>>> > >>> > > >>> > > > > I am trying to implement exactly once behavior in
>>> our
>>> > >>> source
>>> > >>> > > >>> > connector.
>>> > >>> > > >>> > > > Is
>>> > >>> > > >>> > > > > there any sample source connector implementation
>>> > >>> available to
>>> > >>> > > >>> have a
>>> > >>> > > >>> > > look
>>> > >>> > > >>> > > > > at?
>>> > >>> > > >>> > > > > Regards,
>>> > >>> > > >>> > > > > Nitty
>>> > >>> > > >>> > > > >
>>> > >>> > > >>> > > >
>>> > >>> > > >>> > >
>>> > >>> > > >>> >
>>> > >>> > > >>>
>>> > >>> > > >>
>>> > >>> > >
>>> > >>> >
>>> > >>>
>>> > >>
>>> >
>>>
>>

Reply via email to