Hi Nitty,

I'm a little confused about what you mean by this part:

> transaction is not getting completed because it is not commiting the
transaction offest.

The only conditions required for a transaction to be completed when a
connector is defining its own transaction boundaries are:

1. The task requests a transaction commit/abort from the TransactionContext
2. The task returns a batch of records from SourceTask::poll (and, if using
the per-record API provided by the TransactionContext class, includes at
least one record that should trigger a transaction commit/abort in that
batch)

The Connect runtime should automatically commit source offsets to Kafka
whenever a transaction is completed, either by commit or abort. This is
because transactions should only be aborted for data that should never be
re-read by the connector; if there is a validation error that should be
handled by reconfiguring the connector, then the task should throw an
exception instead of aborting the transaction.

If possible, do you think you could provide a brief code snippet
illustrating what your task is doing that's causing issues?

Cheers,

Chris (not Chrise 🙂)

On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY <nittybe...@gmail.com> wrote:

> Hi Chrise,
>
> Thanks for sharing the details.
>
> Regarding the use case, For Asn1 source connector we have a use case to
> validate number of records in the file with the number of records in the
> header. So currently, if validation fails we are not sending the last
> record to the topic. But after introducing exactly once with connector
> transaction boundary, I can see that if I call an abort when the validation
> fails, transaction is not getting completed because it is not commiting the
> transaction offest. I saw that transaction state changed to CompleteAbort.
> So for my next transaction I am getting InvalidProducerEpochException and
> then task stopped after that. I tried calling the abort after sending last
> record to the topic then transaction getting completed.
>
> I dont know if I am doing anything wrong here.
>
> Please advise.
> Thanks,
> Nitty
>
> On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton <chr...@aiven.io.invalid>
> wrote:
>
> > Hi Nitty,
> >
> > We've recently added some documentation on implementing exactly-once
> source
> > connectors here:
> >
> https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors
> > .
> > To quote a relevant passage from those docs:
> >
> > > In order for a source connector to take advantage of this support, it
> > must be able to provide meaningful source offsets for each record that it
> > emits, and resume consumption from the external system at the exact
> > position corresponding to any of those offsets without dropping or
> > duplicating messages.
> >
> > So, as long as your source connector is able to use the Kafka Connect
> > framework's offsets API correctly, it shouldn't be necessary to make any
> > other code changes to the connector.
> >
> > To enable exactly-once support for source connectors on your Connect
> > cluster, see the docs section here:
> > https://kafka.apache.org/documentation/#connect_exactlyoncesource
> >
> > With regard to transactions, a transactional producer is always created
> > automatically for your connector by the Connect runtime when exactly-once
> > support is enabled on the worker. The only reason to set
> > "transaction.boundary" to "connector" is if your connector would like to
> > explicitly define its own transaction boundaries. In this case, it sounds
> > like may be what you want; I just want to make sure to call out that in
> > either case, you should not be directly instantiating a producer in your
> > connector code, but let the Kafka Connect runtime do that for you, and
> just
> > worry about returning the right records from SourceTask::poll (and
> possibly
> > defining custom transactions using the TransactionContext API).
> >
> > With respect to your question about committing or aborting in certain
> > circumstances, it'd be useful to know more about your use case, since it
> > may not be necessary to define custom transaction boundaries in your
> > connector at all.
> >
> > Cheers,
> >
> > Chris
> >
> >
> >
> > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY <nittybe...@gmail.com> wrote:
> >
> > > Hi Team,
> > >
> > > Adding on top of this, I tried creating a TransactionContext object and
> > > calling the commitTransaction and abortTranaction methods in source
> > > connectors.
> > > But the main problem I saw is that if there is any error while parsing
> > the
> > > record, connect is calling an abort but we have a use case to call
> commit
> > > in some cases. Is it a valid use case in terms of kafka connect?
> > >
> > > Another Question - Should I use a transactional producer instead
> > > creating an object of TransactionContext? Below is the connector
> > > configuration I am using.
> > >
> > >   exactly.once.support: "required"
> > >   transaction.boundary: "connector"
> > >
> > > Could you please help me here?
> > >
> > > Thanks,
> > > Nitty
> > >
> > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY <nittybe...@gmail.com>
> > wrote:
> > >
> > > > Hi Team,
> > > > I am trying to implement exactly once behavior in our source
> connector.
> > > Is
> > > > there any sample source connector implementation available to have a
> > look
> > > > at?
> > > > Regards,
> > > > Nitty
> > > >
> > >
> >
>

Reply via email to