Hi Nitty, I'm a little confused about what you mean by this part:
> transaction is not getting completed because it is not commiting the transaction offest. The only conditions required for a transaction to be completed when a connector is defining its own transaction boundaries are: 1. The task requests a transaction commit/abort from the TransactionContext 2. The task returns a batch of records from SourceTask::poll (and, if using the per-record API provided by the TransactionContext class, includes at least one record that should trigger a transaction commit/abort in that batch) The Connect runtime should automatically commit source offsets to Kafka whenever a transaction is completed, either by commit or abort. This is because transactions should only be aborted for data that should never be re-read by the connector; if there is a validation error that should be handled by reconfiguring the connector, then the task should throw an exception instead of aborting the transaction. If possible, do you think you could provide a brief code snippet illustrating what your task is doing that's causing issues? Cheers, Chris (not Chrise 🙂) On Tue, Mar 7, 2023 at 10:17 AM NITTY BENNY <nittybe...@gmail.com> wrote: > Hi Chrise, > > Thanks for sharing the details. > > Regarding the use case, For Asn1 source connector we have a use case to > validate number of records in the file with the number of records in the > header. So currently, if validation fails we are not sending the last > record to the topic. But after introducing exactly once with connector > transaction boundary, I can see that if I call an abort when the validation > fails, transaction is not getting completed because it is not commiting the > transaction offest. I saw that transaction state changed to CompleteAbort. > So for my next transaction I am getting InvalidProducerEpochException and > then task stopped after that. I tried calling the abort after sending last > record to the topic then transaction getting completed. > > I dont know if I am doing anything wrong here. > > Please advise. > Thanks, > Nitty > > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton <chr...@aiven.io.invalid> > wrote: > > > Hi Nitty, > > > > We've recently added some documentation on implementing exactly-once > source > > connectors here: > > > https://kafka.apache.org/documentation/#connect_exactlyoncesourceconnectors > > . > > To quote a relevant passage from those docs: > > > > > In order for a source connector to take advantage of this support, it > > must be able to provide meaningful source offsets for each record that it > > emits, and resume consumption from the external system at the exact > > position corresponding to any of those offsets without dropping or > > duplicating messages. > > > > So, as long as your source connector is able to use the Kafka Connect > > framework's offsets API correctly, it shouldn't be necessary to make any > > other code changes to the connector. > > > > To enable exactly-once support for source connectors on your Connect > > cluster, see the docs section here: > > https://kafka.apache.org/documentation/#connect_exactlyoncesource > > > > With regard to transactions, a transactional producer is always created > > automatically for your connector by the Connect runtime when exactly-once > > support is enabled on the worker. The only reason to set > > "transaction.boundary" to "connector" is if your connector would like to > > explicitly define its own transaction boundaries. In this case, it sounds > > like may be what you want; I just want to make sure to call out that in > > either case, you should not be directly instantiating a producer in your > > connector code, but let the Kafka Connect runtime do that for you, and > just > > worry about returning the right records from SourceTask::poll (and > possibly > > defining custom transactions using the TransactionContext API). > > > > With respect to your question about committing or aborting in certain > > circumstances, it'd be useful to know more about your use case, since it > > may not be necessary to define custom transaction boundaries in your > > connector at all. > > > > Cheers, > > > > Chris > > > > > > > > On Tue, Mar 7, 2023 at 7:21 AM NITTY BENNY <nittybe...@gmail.com> wrote: > > > > > Hi Team, > > > > > > Adding on top of this, I tried creating a TransactionContext object and > > > calling the commitTransaction and abortTranaction methods in source > > > connectors. > > > But the main problem I saw is that if there is any error while parsing > > the > > > record, connect is calling an abort but we have a use case to call > commit > > > in some cases. Is it a valid use case in terms of kafka connect? > > > > > > Another Question - Should I use a transactional producer instead > > > creating an object of TransactionContext? Below is the connector > > > configuration I am using. > > > > > > exactly.once.support: "required" > > > transaction.boundary: "connector" > > > > > > Could you please help me here? > > > > > > Thanks, > > > Nitty > > > > > > On Tue, Mar 7, 2023 at 12:29 AM NITTY BENNY <nittybe...@gmail.com> > > wrote: > > > > > > > Hi Team, > > > > I am trying to implement exactly once behavior in our source > connector. > > > Is > > > > there any sample source connector implementation available to have a > > look > > > > at? > > > > Regards, > > > > Nitty > > > > > > > > > >