Hi,
Curious if anyone has thought about using Kafka Queues (KIP-932) with Kafka
Connect in the future? I.e. autoscaling connect tasks?
There could be some Connect use cases where order isn’t as important as keeping
up with load spikes, maybe.
Regards, Paul Brebner
NetApp Instaclustr Technology
Hi all,
I'm currently running a Kafka Connect cluster within a Kubernetes environment
(OpenShift).
Since I couldn't find much documentation on how to integrate, I integrated
using the OpenTelemetry Java Agent [1], which seems to work.
That said, I still lack visibility into
er
> connector for receiving SNMP traps.
> https://github.com/jcustenborder/kafka-connect-snmp
>
> Since this is a source connector that listens for incoming SNMP messages
> (not polling), I'm trying to understand how best to scale it?
>
> Has anyone used this connector in
Hi All,
I am planning to use SNMP trap receiver connecto,r which acts as a server
connector for receiving SNMP traps.
https://github.com/jcustenborder/kafka-connect-snmp
Since this is a source connector that listens for incoming SNMP messages (not
polling), I'm trying to understand how be
Hi Prateek, I'm not sure how you are testing this. A Kafka Connect
cluster in distributed mode uses the group management protocol to
coordinate (distribute tasks across workers). This is set to
"sessioned" by default, which aims to minimize task movements during
rebalancing.
On Mo
Thanks a lot @Vignesh & @Raphael Mazelier for your detailed replies.
Even I thought the same, but I read this and now I’m a bit confused.
"In a Kafka Connect cluster, each worker node is identified by its advertised
address. This identity is crucial because connectors and tasks are as
Kafka Connect is a stateless component by design. It relies on external
Kafka topics to persist its state, including connector configurations,
offsets, and status updates. In a distributed Kafka Connect cluster, this
state is managed through the following configurable topics
I created the docker+kube stuff for our kafka-connect at my current job.
I use standard deployment. kafka-connect doesn't care of hostname or IP.
The sole trick is to inject the connector configuration at runtime (if you
want).
--
Raph
On 14/06/2025 2:12 pm, Prateek Kohli wrote:
>
Hi All,
I'm building a custom Docker image for kafka Connect and planning to run it
on Kubernetes. I'm a bit stuck on whether I should use a Deployment or a
StatefulSet.
>From what I understand, the main difference that could affect Kafka Connect
is the hostname/IP behaviour. With
something wrong?
From: Federico Valeri
Sent: Saturday, June 14, 2025 7:33:29 PM
To: users@kafka.apache.org
Subject: Re: Kafka Connect on Kubernetes: Statefulset vs Deployment
[You don't often get email from fedeval...@gmail.com. Learn why this is
importa
Hi Prateek.
In a Kafka Connect cluster, the advertised address represents the
identity of the worker node. Connectors and tasks are scheduled to the
individual worker nodes based on their identity.
If you use a Kubernetes Deployment, when you roll the cluster, new
Pods with new IPs will be be
Hi All,
I'm building a custom Docker image for kafka Connect and planning to run it on
Kubernetes. I'm a bit stuck on whether I should use a Deployment or a
StatefulSet.
>From what I understand, the main difference that could affect Kafka Connect is
>the hostname/IP b
/ workers?
Many Thanks,
Jamie
On Wednesday 16 April 2025 at 10:57:01 BST, Jamie
<mailto:jamied...@aol.co.uk> wrote:
Hi All,
I'm trying to implement a custom REST extension for Kafka Connect that allows a
token to be used for authentication.
When I have a single inst
Hi All,
Has anyone managed to get a REST extension working with multiple connect
instances / workers?
Many Thanks,
Jamie
On Wednesday 16 April 2025 at 10:57:01 BST, Jamie
wrote:
Hi All,
I'm trying to implement a custom REST extension for Kafka Connect that allows a
token
Hi All,
I'm trying to implement a custom REST extension for Kafka Connect that allows a
token to be used for authentication.
When I have a single instance of Kafka connect running in distributed mode
(i.e. 1 worker) this works as expected. However, when I add another instance
(another w
idate
the above information.
Hope this helps,
Greg
On Thu, Feb 13, 2025 at 9:35 AM Mehrtens, Mazrim
mailto:mmehr...@amazon.com.inva>lid> wrote:
> I’ve found that Kafka Connect never respects the
> “target.cluster.bootstrap.servers” configuration in the MirrorMaker2 task
> co
*" in order to affect the mirrored records.
I don't understand your test cases exactly, are you observing some version
dependence here? I don't recall any recent changes which would invalidate
the above information.
Hope this helps,
Greg
On Thu, Feb 13, 2025 at 9:35 AM Mehrtens, Mazrim
I’ve found that Kafka Connect never respects the
“target.cluster.bootstrap.servers” configuration in the MirrorMaker2 task
config. It always uses the Kafka Connect broker information instead. Running
Kafka Connect on the source cluster causes an infinite loop of messages read
from the source
Hi,
I am planning to create Kafka Connect Docker images and deploy them in a
Kubernetes cluster.
My Kafka admin client, consumer, and Connect REST server are all using mTLS. Is
there a way to reload the certificates they use at runtime (hot reload) without
restarting the connect cluster
gt; Hi,
>
> I am planning to create Kafka Connect Docker images and deploy them in a
> Kubernetes cluster.
> I want to fetch sensitive configurations at both the worker and connector
> levels from HashiCorp Vault. The authentication method I can use to log in to
> Vault is Kubern
Hi,
I am planning to create Kafka Connect Docker images and deploy them in a
Kubernetes cluster.
I want to fetch sensitive configurations at both the worker and connector
levels from HashiCorp Vault. The authentication method I can use to log in to
Vault is Kubernetes authentication.
Worker
This was fixed in https://github.com/apache/kafka/pull/18146
On Tue, Nov 12, 2024 at 4:41 PM Thomas Thornton
wrote:
> Hi we notice data loss i.e. dropped records when running Debezium on Kafka
> Connect with Apicurio Schema Registry. Specifically, multiple times we have
> observed that
Hi,
I think SMTs (KIP-66) could work for your case.
https://kafka.apache.org/documentation.html#connect_transforms
Regards,
OSB
On Fri, Nov 15, 2024, 03:03 Surbhi Mungre wrote:
> Can Kafka Connect be used to read messages from one Kafka Cluster, apply
> some basic transformation and
Neeraj
>
>
>> On 15 Nov 2024, at 1:04 PM, Surbhi Mungre wrote:
>>
>> Can Kafka Connect be used to read messages from one Kafka Cluster, apply
>> some basic transformation and write messages to another Kafka Cluster? I
>> did not find a Kafka Connect Connecto
your final topic.
Regards,
Neeraj
> On 15 Nov 2024, at 1:04 PM, Surbhi Mungre wrote:
>
> Can Kafka Connect be used to read messages from one Kafka Cluster, apply
> some basic transformation and write messages to another Kafka Cluster? I
> did not find a Kafka Connect Connector
Can Kafka Connect be used to read messages from one Kafka Cluster, apply
some basic transformation and write messages to another Kafka Cluster? I
did not find a Kafka Connect Connector in the list of connectors provided
by Confluence[1]. I only found a Replicator[2] but for my use-case I want
to
Hey Kafka fam,
What’s the correct way to set task-level overrides to producer settings in a
Kafka Connect task? For example, with MirrorMaker2, I’d expect the following
“producer.override.*” based configs to work based on the documentation, but in
in reality this does not change any of the
Hi we notice data loss i.e. dropped records when running Debezium on Kafka
Connect with Apicurio Schema Registry. Specifically, multiple times we have
observed that a single record is dropped when we get this exception (full
stack trace
<https://gist.github.com/twth
I have 100+ sink connectors running with 100+ topics each with roughly 3
partitions per topic. How would you configure resources (mem and cpu) to
optimally handle this level of load. What would be your considerations?
Also, When considering this load, should i be thinking about it as an
aggregated
ase/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> >
> > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> >
> > at
> >
> >
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> >
> > at
> >
> >
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> >
> > at java.base/java.lang.Thread.run(Thread.java:829)
> >
> >
> >
> > My questions are:
> >
> >1. Are these connectors are overloaded? Can kafka connect handle this
> >level of load?
> >2. say it can, which i've seen it do, could it be that this is caused
> by
> >underlying rebalancing? If so what would you recommend I do to
> mitigate?
> >
> >
> > Thanks
> >
> > -BW
> >
>
xecutor.runWorker(ThreadPoolExecutor.java:1128)
>
> at
>
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>
> at java.base/java.lang.Thread.run(Thread.java:829)
>
>
>
> My questions are:
>
>1. Are these connectors are overloaded? Can kafka connect handle this
>level of load?
>2. say it can, which i've seen it do, could it be that this is caused by
>underlying rebalancing? If so what would you recommend I do to mitigate?
>
>
> Thanks
>
> -BW
>
r.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
My questions are:
1. Are these connectors are overloaded? Can kafka connect handle this
level of load?
2. say it can, which i've seen it do, could it be that this is caused by
underlying rebalanc
For sink connectors, I believe you can scale up the tasks to match the
partitions on the topic. But I don't believe this is the case for source
connectors; the number of partitions on the topic you're producing to has
nothing to do with the number of connector tasks. It really depends on the
indi
Hello
Confirmed. Partition is the minimal granularity level, so having more
consumers than the number of partitions of a topic for a same consumer
group is useless, having P partitions means maximum parallelism is reached
using P consumers.
Regards,
Sébastien.
Le jeu. 30 mai 2024 à 14:43, Yeike
: users@kafka.apache.org
Subject: [EXTERNAL] Regarding Kafka connect task to partition relationship for
both source and sink connectors
Hi everyone,
From my understanding, if a topic has n partitions, we can create up to n
tasks for both the source and sink connectors to achieve the maximum
Hi everyone,
>From my understanding, if a topic has n partitions, we can create up to n
>tasks for both the source and sink connectors to achieve the maximum
>parallelism. Adding more tasks would not be beneficial, as they would remain
>idle and be limited to the number of partitions of the t
Hi,
I'm using kafka connect, passing data with avro schema.
By default I get a schema of mili-seconds time precision for datetime2
columns.
Do you support time precision of micro seconds as well?
Thanks
Hey Greg,
Thinking more, I do like the idea of a source-side equivalent of the
ErrantRecordReporter interface!
However, I also suspect we may have to reason more carefully about what
users could do with this kind of information in a DLQ topic. Yes, it's an
option to reset the connector (or a copy
Hey Chris,
That's a cool idea! That can certainly be applied for failures other
than poll(), and could be useful when combined with the Offsets
modification API.
Perhaps failures inside of poll() can be handled by an extra
mechanism, similar to the ErrantRecordReporter, which allows reporting
aff
Hi Greg,
This was my understanding as well--if we can't turn a record into a byte
array on the source side, it's difficult to know exactly what to write to a
DLQ topic.
One idea I've toyed with recently is that we could write the source
partition and offset for the failed record (assuming, hopefu
Hi Yeikel,
Thanks for your question. It certainly isn't clear from the original
KIP-298, the attached discussion, or the follow-up KIP-610 as to why
the situation is asymmetric.
The reason as I understand it is: Source connectors are responsible
for importing data to Kafka. If an error occurs dur
Hi all,
Sink connectors support Dear Letter Queues[1], but Source connectors don't seem
to
What is the reason that we decided to do that?
In my data pipeline, I'd like to apply some transformations to the messages
before they are sink, but that leaves me vulnerable to failures as I need to
ei
entally introduced a breaking change for Kafka
Connect in https://github.com/apache/kafka/pull/9669. Before that change,
the SourceTask::stop method [1] would be invoked on a separate thread from
the one that did the actual data processing for the task (polling the task
for records, transforming
Hello everyone,
Is there any mechanism to force Kafka Connect to ingest at a given rate per
second as opposed to tasks?
I am operating in a shared environment where the ingestion rate needs to be as
low as possible (for example, 5 requests/second as an upper limit), and as far
as I can
can arrange
discussion with Partner Manager.
-Original Message-
From: Boyee [mailto:zhenchua...@163.com]
Sent: 14 October 2023 12:38
To: users@kafka.apache.org
Subject: The Plan To Introduce Virtual Threads To Kafka Connect
Kafka Connect as a kind of thread-intense program, can benifit a lo
Manager.
-Original Message-
From: Greg Harris [mailto:greg.har...@aiven.io.INVALID]
Sent: 16 October 2023 20:38
To: users@kafka.apache.org
Subject: Re: The Plan To Introduce Virtual Threads To Kafka Connect
Hi Boyee,
Thanks for the suggestion, Virtual threads look like they may be
apache.org/jira/browse/KAFKA-14606 .
Thanks!
Greg Harris
On Mon, Oct 16, 2023 at 6:20 AM Boyee wrote:
>
> Kafka Connect as a kind of thread-intense program, can benifit a lot from the
> usage of virtual threads.
> From JDK 21, released in last month, virtual threads is a formal featur
Kafka Connect as a kind of thread-intense program, can benifit a lot from the
usage of virtual threads.
From JDK 21, released in last month, virtual threads is a formal feature of JDK.
I would like to ask if any plans exist to bring virtual threads into Kafka
Connect.
Thank you.
Authorization =
headers.getHeaderString(HttpHeaders.AUTHORIZATION);
if (credentialAuthorization != null) {
req.header(HttpHeaders.AUTHORIZATION, credentialAuthorization);
}
}
}
This is of course risky and it would be significantly more convenient if this
functi
Hi Yeikel,
Neat question! And thanks for the link to the RestClient code; very helpful.
I don't believe there's a way to configure Kafka Connect to add these
headers to forwarded requests right now. You may be able to do some kind of
out-of-band proxy magic to intercept forwarded re
Hello everyone,
I'm currently running Kafka Connect behind a firewall that mandates the
inclusion of a specific header. This situation becomes particularly challenging
when forwarding requests among multiple workers, as it appears that only the
Authorization header is included in the re
tasks to workers with whom it
> cannot communicate?
This happens via the group rebalance process where each Kafka Connect
worker communicates with the Kafka broker that has been chosen as the group
co-ordinator for the Kafka Connect cluster. The assignment is indeed
computed by the leader C
e process where each Kafka Connect
worker communicates with the Kafka broker that has been chosen as the group
co-ordinator for the Kafka Connect cluster. The assignment is indeed
computed by the leader Connect worker but it is disseminated to the other
Connect workers via the group coordinator
that would render it useless as you mentioned
Thank you for taking the time
On Mon, 25 Sep 2023 11:41:18 -0400 Yash Mayya wrote
---
Hi Yeikel,
Heartbeats and group coordination in Kafka Connect do occur through Kafka,
but a Kafka Connect cluster where all workers cannot communicate
Hi Yeikel,
Heartbeats and group coordination in Kafka Connect do occur through Kafka,
but a Kafka Connect cluster where all workers cannot communicate with
each other won't work very well. You'll be able to create / update / delete
connectors by making requests to any workers that can c
-Nikhil
On Sun, 24 Sept 2023 at 06:36, Yeikel Santana wrote:
> Hello everyone,I'm currently designing a new Kafka Connect cluster, and
> I'm trying to understand how connectivity functions among workers.In my
> setup, I have a single Kafka Connect cluster connected to the sam
r, those REST requests will fail. I'm referring to REST requests like
> CREATE / UPDATE / DELETE.
>
> Hope this helps a little.
>
> Thanks,
> -Nikhil
>
> On Sun, 24 Sept 2023 at 06:36, Yeikel Santana wrote:
>
> > Hello everyone,I'm currently designing a
I'm currently designing a new Kafka Connect cluster, and
> I'm trying to understand how connectivity functions among workers.In my
> setup, I have a single Kafka Connect cluster connected to the same Kafka
> topics and Kafka cluster. However, the workers are deployed in
> ge
Hello everyone,I'm currently designing a new Kafka Connect cluster, and I'm
trying to understand how connectivity functions among workers.In my setup, I
have a single Kafka Connect cluster connected to the same Kafka topics and
Kafka cluster. However, the workers are deployed in geog
every 100 ms
<https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java#L427>.
Then I call a DEL on this connector, and the stop is not processed until
the next loop in the `poll()`.
Your initial diagnosis is 100% correct
-connect-jdbc received a patch to improve this
behavior when no data is being emitted:
https://github.com/confluentinc/kafka-connect-jdbc/pull/947 but I'm
not sure if that is relevant to your situation.
Thanks!
Greg
On Mon, Aug 21, 2023 at 6:53 AM Robson Hermes wrote:
>
> No, it stops them
; > > You have to remove connectors first using delete api
> > > > and then stop the connector
> > > >
> > > > On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes <
> robsonher...@gmail.com>
> > > > wrote:
> > > >
> > >
delete api
> > > and then stop the connector
> > >
> > > On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes
> > > wrote:
> > >
> > > > Hello
> > > >
> > > > I'm using kafka connect 7.4.0 to read data from Postgres views and
>
chaudhari >
> wrote:
>
> > You have to remove connectors first using delete api
> > and then stop the connector
> >
> > On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes
> > wrote:
> >
> > > Hello
> > >
> > > I'm using ka
nd then stop the connector
>
> On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes
> wrote:
>
> > Hello
> >
> > I'm using kafka connect 7.4.0 to read data from Postgres views and write
> to
> > another Postgres tables. So using JDBC source and sink connectors.
&
You have to remove connectors first using delete api
and then stop the connector
On Thu, 17 Aug 2023 at 2:51 AM, Robson Hermes
wrote:
> Hello
>
> I'm using kafka connect 7.4.0 to read data from Postgres views and write to
> another Postgres tables. So using JDBC source an
Hello Greg (sorry about the duplicate e-mail, forgot to cc users mailing
list)
Thanks a lot for your detailed reply. I'm using JDBC Source connectors from
kafka-connect-jdbc <https://github.com/confluentinc/kafka-connect-jdbc>.
Indeed the `poll()` implementation is blocked, so it only
Hi Robson,
Thank you for the detailed bug report.
I believe the behavior that you're describing is caused by this flaw:
https://issues.apache.org/jira/browse/KAFKA-15090 which is still under
discussion. Since the above flaw was introduced in 3.0, source
connectors need to return from poll() befor
Hello
I'm using kafka connect 7.4.0 to read data from Postgres views and write to
another Postgres tables. So using JDBC source and sink connectors.
All works good, but whenever I stop the source connectors via the rest api:
DEL http://kafka-connect:8083/connectors/connector_name_here
ge-
From: mil...@votecgroup.com [mailto:mil...@votecgroup.com]
Sent: 01 August 2023 11:56
To: users@kafka.apache.org
Subject: RE: Kafka Connect Rest Extension Question
Hi Team,
Greetings,
We actually reached out to you for Oracle/ IT / SAP / Infor / Microsoft "VOTEC
IT SERVICE PARTNERSH
e-
From: 양형욱 [mailto:hyungwooky...@navercorp.com]
Sent: 31 July 2023 14:42
To: users@kafka.apache.org
Subject: Kafka Connect Rest Extension Question
https://stackoverflow.com/questions/76797743/how-can-i-solve-connectrestextension-error
There is an issue with this link where the ConnectR
inal Message-
From: Greg Harris [mailto:greg.har...@aiven.io.INVALID]
Sent: 31 July 2023 23:42
To: users@kafka.apache.org
Subject: Re: Kafka Connect Rest Extension Question
Hello Yang Hyung Wook,
In your post I do not see anything obviously wrong, so you may need to do some
more debugging.
1. Are
://docs.oracle.com/javase/tutorial/deployment/jar/view.html
2. Do you see either of these errors
https://github.com/apache/kafka/blob/3.5.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java#L267-L269
for filesystem-specific problems?
3. This log line
https
https://stackoverflow.com/questions/76797743/how-can-i-solve-connectrestextension-error
There is an issue with this link where the ConnectRestExtension implementation
is not registered. I've done everything the kip official documentation says,
but can you tell me why it doesn't work?
양형욱 Yang
gt;
> I'd be interested to hear what someone more familiar with client and broker
> internals has to say! Going to be following this thread.
>
> [1] -
> https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba0
> 9/connect/runtime/src/main/java/org/apach
what someone more familiar with client and broker
internals has to say! Going to be following this thread.
[1] -
https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L357
Che
Hi,
I'm investigating possibilities of exactly-once semantic for Debezium [1]
Kafka Connect source connectors, which implements change data capture for
various databases. Debezium has two phases, initial snapshot phase and
streaming phase. Initial snapshot phase loads existing data fro
Hi all,
I'm testing apache kafka connect for a project and I found that the main
process listens to two different ports, the one to provide REST api, 8083
by default, and a different unprivileged port that changes its number each
restart. For instance, this is fragment of the output from ne
Hey Jorge,
I looked into it, and can reproduce the second LISTEN port in a
vanilla Kafka Connect cluster without any connectors running.
Using jstack, I see that there are two threads that appear to be
waiting in the corresponding accept methods:
"RMI TCP Accept-0" #15 daemon prio=5
Hi all,
I'm testing apache kafka connect for a project and I found that the main
process listens to two different ports, the one to provide REST api, 8083
by default, and a different unprivileged port that changes its number each
restart. For instance, this is fragment of the output from ne
Hello,
can someone please give me a hint how to execute two lines of code upon Kafka
Connect Startup, like:
final JaegerTracer tracer = Configuration.fromEnv().getTracer();
GlobalTracer.register(tracer);
I implemented using a custom (Fake-)Connector, but there is much overhead,
because you
;
}
@Override
public ConfigData get(String s, Set set) {
return null;
}
@Override
public void close(){}
}
And setting these Environment Variables in Kafka Connect
- CONNECT_CONFIG_PROVIDERS=tracing
- CONNECT_CONFIG_PROVIDERS_TRACING_CLASS=org.example.TracingConfigProvider
Best regards
) {
return null;
}
@Override
public void close(){}
}
And setting these Environment Variables in Kafka Connect
- CONNECT_CONFIG_PROVIDERS=tracing
- CONNECT_CONFIG_PROVIDERS_TRACING_CLASS=org.example.TracingConfigProvider
Best regards,
Jan
Von: Jakub Scholz
Datum: Montag, 20
gt; can someone please give me a hint how to execute two lines of code upon
> Kafka Connect Startup, like:
>
> final JaegerTracer tracer = Configuration.fromEnv().getTracer();
> GlobalTracer.register(tracer);
>
> I implemented using a custom (Fake-)Connector, but there is much ov
Hello,
can someone please give me a hint how to execute two lines of code upon Kafka
Connect Startup, like:
final JaegerTracer tracer = Configuration.fromEnv().getTracer();
GlobalTracer.register(tracer);
I implemented using a custom (Fake-)Connector, but there is much overhead,
because you
gt; Looking forward to your reply.
> >
> >
> > Thank you,
> > Xiaoxia
> >
> >
> >
> >
> > -- 原始邮件 ------
> > *发件人:* "users" ;
> > *发送时间:* 2023年3月15日(星期三) 晚上6:
o forget the signature.
> Looking forward to your reply.
>
>
> Thank you,
> Xiaoxia
>
>
>
>
> -- 原始邮件 --
> *发件人:* "users" ;
> *发送时间:* 2023年3月15日(星期三) 晚上6:38
> *收件人:* "users";
at 3:04 PM Chris Egerton
wrote:
> Hi Nitty,
>
> Sorry, I should have clarified. The reason I'm thinking about shutdown here
> is that, when exactly-once support is enabled on a Kafka Connect cluster
> and a new set of task configurations is generated for a connector, the
>
Hi Nitty,
Sorry, I should have clarified. The reason I'm thinking about shutdown here
is that, when exactly-once support is enabled on a Kafka Connect cluster
and a new set of task configurations is generated for a connector, the
Connect framework makes an effort to shut down all the old
t; >> producer with the same transactionalId which fences the current one.
>>> > >> 2023-03-12 11:32:45,224 ERROR [json-sftp-source-connector|task-0]
>>> > >> ExactlyOnceWorkerSourceTask{id=json-sftp-source-connector-0} failed
>>> to
>>> > se
ucer-json-sftp-source-connector-0]
>> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
>> newer
>> > >> producer with the same transactionalId which fences the current one.
>> > >> 2023-03-12 11:32:45,222 ERROR
>> > [js
)
> > >> [task-thread-json-sftp-source-connector-0]
> > >> org.apache.kafka.common.errors.ProducerFencedException: There is a
> newer
> > >> producer with the same transactionalId which fences the current one.
> > >> 2023-03-12 11:32:45,225 ERROR [j
ceTask{id=json-sftp-source-connector-0} Task threw
> an
> >> uncaught and unrecoverable exception. Task is being killed and will not
> >> recover until manually restarted
> >> (org.apache.kafka.connect.runtime.WorkerTask)
> >> [task-thread-json-sftp-source-connector
one more scenario, When I call the commit I saw the below
>> connect-cluster-json-sftp-source-connector-0::TransactionMetadata(transactionalId=connect-cluster-json-sftp-source-connector-0,
>> producerId=11, producerEpoch=2, txnTimeoutMs=6, state=*Ongoing*,
>> pendingState
txnLastUpdateTimestamp=1678620463834)
> Then, before changing the states to Abort, I dropped the next file then I
> dont see any issues. Previous transaction
> as well as the current transaction are committed.
>
> Thank you for your support.
>
> Thanks,
> Nitty
>
> O
error record, but
> commit is not happening for me. Kafka connect tries to abort the
> transaction automatically
>
> This is really interesting--are you certain that your task never invoked
> TransactionContext::abortTransaction in this case? I'm looking over the
> code base and
Hi Nitty,
> I called commitTransaction when I reach the first error record, but
commit is not happening for me. Kafka connect tries to abort the
transaction automatically
This is really interesting--are you certain that your task never invoked
TransactionContext::abortTransaction in this c
Hi Chris,
We have a use case to commit previous successful records and stop the
processing of the current file and move on with the next file. To achieve
that I called commitTransaction when I reach the first error record, but
commit is not happening for me. Kafka connect tries to abort the
saw that transaction state changed to
>>> CompleteAbort.
>>> > So for my next transaction I am getting InvalidProducerEpochException
>>> and
>>> > then task stopped after that. I tried calling the abort after sending
>>> last
>>> > reco
oing anything wrong here.
>> >
>> > Please advise.
>> > Thanks,
>> > Nitty
>> >
>> > On Tue 7 Mar 2023 at 2:21 p.m., Chris Egerton
>> > wrote:
>> >
>> > > Hi Nitty,
>> > >
>> > > We've re
1 - 100 of 693 matches
Mail list logo