This is an automated email from the ASF dual-hosted git repository.
apupier pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 77554dfd5d0f Avoid potential NPE when the kafka consumer failed to
start
77554dfd5d0f is described below
commit 77554dfd5d0f992b0c5528f3efd7903964e6b086
Author: Aurélien Pupier <[email protected]>
AuthorDate: Thu Mar 5 14:17:51 2026 +0100
Avoid potential NPE when the kafka consumer failed to start
to avoid this kind of NPE:
```
java.lang.NullPointerException: Cannot invoke
"org.apache.kafka.clients.consumer.Consumer.wakeup()" because
"this.consumer" is null
at
org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy.stop(SingleNodeKafkaResumeStrategy.java:412)
at
org.apache.camel.processor.resume.ResumableProcessor.doStop(ResumableProcessor.java:75)
at
org.apache.camel.support.service.BaseService.stop(BaseService.java:167)
at
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:201)
at
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:185)
at
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:170)
at
org.apache.camel.impl.engine.DefaultChannel.doStop(DefaultChannel.java:138)
at
org.apache.camel.support.service.BaseService.stop(BaseService.java:167)
at
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:201)
at
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:185)
at
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:216)
at
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:187)
at
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:170)
at org.apache.camel.processor.Pipeline.doStop(Pipeline.java:209)
at
org.apache.camel.support.service.BaseService.stop(BaseService.java:167)
at
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:201)
at
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:185)
at
org.apache.camel.support.processor.DelegateAsyncProcessor.doStop(DelegateAsyncProcessor.java:107)
at
org.apache.camel.support.service.BaseService.stop(BaseService.java:167)
at
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:201)
at
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:185)
at
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:170)
at
org.apache.camel.support.DefaultConsumer.doStop(DefaultConsumer.java:236)
at
org.apache.camel.component.kafka.KafkaConsumer.doStop(KafkaConsumer.java:225)
at
org.apache.camel.support.service.BaseService.stop(BaseService.java:167)
at
org.apache.camel.support.service.ServiceHelper.stopService(ServiceHelper.java:201)
at
org.apache.camel.impl.engine.DefaultShutdownStrategy.shutdownNow(DefaultShutdownStrategy.java:434)
at
org.apache.camel.impl.engine.DefaultShutdownStrategy$ShutdownTask.run(DefaultShutdownStrategy.java:743)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:545)
at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:328)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1090)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:614)
at java.base/java.lang.Thread.run(Thread.java:1474)
```
Signed-off-by: Aurélien Pupier <[email protected]>
---
.../processor/resume/kafka/SingleNodeKafkaResumeStrategy.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 2290a8c1b820..7d689761a028 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -409,7 +409,12 @@ public class SingleNodeKafkaResumeStrategy implements
KafkaResumeStrategy, Camel
try {
LOG.info("Closing the Kafka consumer");
- consumer.wakeup();
+ if (consumer != null) {
+ consumer.wakeup();
+ } else {
+ // This may happen if the start up has failed in some other
part
+ LOG.trace("There's no Kafka consumer available to stop");
+ }
if (executorService != null) {
executorService.shutdown();