This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new d2f271407cf Kafka dev console and jbang command (#14768)
d2f271407cf is described below

commit d2f271407cf5f0506a5f971abc9b1be3fa1a1e8a
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Jul 10 10:21:33 2024 +0200

    Kafka dev console and jbang command (#14768)
    
    * CAMEL-20956: camel-kafka - Add dev console and jbang command
---
 .../apache/camel/catalog/dev-consoles.properties   |   1 +
 .../apache/camel/catalog/dev-consoles/kafka.json   |  15 +
 components/camel-kafka/pom.xml                     |   4 +
 .../org/apache/camel/dev-console/kafka.json        |  15 +
 .../services/org/apache/camel/dev-console/kafka    |   2 +
 .../org/apache/camel/dev-consoles.properties       |   7 +
 .../camel/component/kafka/KafkaConsumer.java       |   5 +
 .../camel/component/kafka/KafkaDevConsole.java     | 181 +++++++++++
 .../camel/component/kafka/KafkaFetchRecords.java   |  99 +++++-
 .../camel/component/kafka/TaskHealthState.java     |   5 +-
 .../component/kafka/consumer/CommitManager.java    |   1 -
 .../support/KafkaRecordProcessorFacade.java        |   2 +-
 .../kafka/consumer/support/ProcessingResult.java   |  33 +-
 .../kafka/consumer/support/TopicHelper.java        |   3 +-
 .../batching/KafkaRecordBatchingProcessor.java     |   1 -
 .../support/classic/ClassicRebalanceListener.java  |   2 +-
 .../classic/PartitionAssignmentAdapter.java        |   3 +
 .../support/resume/ResumeRebalanceListener.java    |   2 +-
 .../streaming/KafkaRecordStreamingProcessor.java   |   7 +-
 .../subcription/DefaultSubscribeAdapter.java       |   2 +-
 .../support/subcription/SubscribeAdapter.java      |   2 +-
 .../camel/cli/connector/LocalCliConnector.java     |  39 +++
 .../dsl/jbang/core/commands/CamelJBangMain.java    |   1 +
 .../jbang/core/commands/process/ListHealth.java    |  18 +-
 .../dsl/jbang/core/commands/process/ListKafka.java | 340 +++++++++++++++++++++
 .../core/commands/process/ProcessWatchCommand.java |   6 +-
 .../core/commands/version/VersionGetTest.java      |   5 +-
 27 files changed, 766 insertions(+), 35 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
index ecd2d10d672..3c95c1b6ed1 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
@@ -17,6 +17,7 @@ health
 inflight
 java-security
 jvm
+kafka
 log
 memory
 micrometer
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/kafka.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/kafka.json
new file mode 100644
index 00000000000..55e93212a3c
--- /dev/null
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/kafka.json
@@ -0,0 +1,15 @@
+{
+  "console": {
+    "kind": "console",
+    "group": "camel",
+    "name": "kafka",
+    "title": "Kafka",
+    "description": "Apache Kafka",
+    "deprecated": false,
+    "javaType": "org.apache.camel.component.kafka.KafkaDevConsole",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-kafka",
+    "version": "4.7.0-SNAPSHOT"
+  }
+}
+
diff --git a/components/camel-kafka/pom.xml b/components/camel-kafka/pom.xml
index 55b8986d86a..e64ee2bd4e6 100644
--- a/components/camel-kafka/pom.xml
+++ b/components/camel-kafka/pom.xml
@@ -42,6 +42,10 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-health</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-console</artifactId>
+        </dependency>
 
         <!-- kafka java client -->
         <dependency>
diff --git 
a/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/dev-console/kafka.json
 
b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/dev-console/kafka.json
new file mode 100644
index 00000000000..55e93212a3c
--- /dev/null
+++ 
b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/dev-console/kafka.json
@@ -0,0 +1,15 @@
+{
+  "console": {
+    "kind": "console",
+    "group": "camel",
+    "name": "kafka",
+    "title": "Kafka",
+    "description": "Apache Kafka",
+    "deprecated": false,
+    "javaType": "org.apache.camel.component.kafka.KafkaDevConsole",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-kafka",
+    "version": "4.7.0-SNAPSHOT"
+  }
+}
+
diff --git 
a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-console/kafka
 
b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-console/kafka
new file mode 100644
index 00000000000..975921645c6
--- /dev/null
+++ 
b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-console/kafka
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.kafka.KafkaDevConsole
diff --git 
a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
 
b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
new file mode 100644
index 00000000000..3f7bbe40841
--- /dev/null
+++ 
b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
@@ -0,0 +1,7 @@
+# Generated by camel build tools - do NOT edit this file!
+dev-consoles=kafka
+groupId=org.apache.camel
+artifactId=camel-kafka
+version=4.7.0-SNAPSHOT
+projectName=Camel :: Kafka
+projectDescription=Camel Kafka support
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index f0c75bd3eb6..9ea0e1502ca 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.kafka;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
@@ -256,6 +257,10 @@ public class KafkaConsumer extends DefaultConsumer
         return tasks.stream().allMatch(KafkaFetchRecords::isPaused);
     }
 
+    protected List<KafkaFetchRecords> tasks() {
+        return Collections.unmodifiableList(tasks);
+    }
+
     @Override
     public String adapterFactoryService() {
         return "kafka-adapter-factory";
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
new file mode 100644
index 00000000000..1f80df5a8b6
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Route;
+import org.apache.camel.spi.annotations.DevConsole;
+import org.apache.camel.support.console.AbstractDevConsole;
+import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.TimeUtils;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@DevConsole(name = "kafka", displayName = "Kafka", description = "Apache 
Kafka")
+public class KafkaDevConsole extends AbstractDevConsole {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaDevConsole.class);
+
+    private static final long COMMITTED_TIMEOUT = 10000;
+
+    /**
+     * Whether to include committed offset (sync operation to Kafka broker)
+     */
+    public static final String COMMITTED = "committed";
+
+    public KafkaDevConsole() {
+        super("camel", "kafka", "Kafka", "Apache Kafka");
+    }
+
+    @Override
+    protected String doCallText(Map<String, Object> options) {
+        final boolean committed = 
"true".equals(options.getOrDefault(COMMITTED, "false"));
+
+        StringBuilder sb = new StringBuilder();
+        for (Route route : getCamelContext().getRoutes()) {
+            if (route.getConsumer() instanceof KafkaConsumer kc) {
+                sb.append(String.format("\n    Route Id: %s", 
route.getRouteId()));
+                sb.append(String.format("\n    From: %s", 
route.getEndpoint().getEndpointUri()));
+                for (KafkaFetchRecords t : kc.tasks()) {
+                    sb.append(String.format("\n        Worked Thread: %s", 
t.getThreadId()));
+                    sb.append(String.format("\n        Worker State: %s", 
t.getState()));
+                    TaskHealthState hs = t.healthState();
+                    if (!hs.isReady()) {
+                        sb.append(String.format("\n        Worker Last Error: 
%s", hs.buildStateMessage()));
+                    }
+                    KafkaFetchRecords.GroupMetadata meta = 
t.getGroupMetadata();
+                    if (meta != null) {
+                        sb.append(String.format("\n        Group Id: %s", 
meta.groupId()));
+                        sb.append(String.format("\n        Group Instance Id: 
%s", meta.groupInstanceId()));
+                        sb.append(String.format("\n        Member Id: %s", 
meta.memberId()));
+                        sb.append(String.format("\n        Generation Id: %d", 
meta.generationId()));
+                    }
+                    if (t.getLastRecord() != null) {
+                        sb.append(String.format("\n        Last Topic: %s", 
t.getLastRecord().topic()));
+                        sb.append(String.format("\n        Last Partition: 
%d", t.getLastRecord().partition()));
+                        sb.append(String.format("\n        Last Offset: %d", 
t.getLastRecord().offset()));
+                    }
+                    if (committed) {
+                        List<KafkaFetchRecords.KafkaTopicPosition> l = 
fetchCommitOffsets(kc, t);
+                        if (l != null) {
+                            for (KafkaFetchRecords.KafkaTopicPosition r : l) {
+                                sb.append(String.format("\n        Commit 
Topic: %s", r.topic()));
+                                sb.append(String.format("\n        Commit 
Partition: %s", r.partition()));
+                                sb.append(String.format("\n        Commit 
Offset: %s", r.offset()));
+                                if (r.epoch() > 0) {
+                                    long delta = System.currentTimeMillis() - 
r.epoch();
+                                    sb.append(String.format("\n        Commit 
Offset Since: %s",
+                                            TimeUtils.printDuration(delta, 
true)));
+                                }
+                            }
+                        }
+                    }
+                }
+                sb.append("\n");
+            }
+        }
+
+        return sb.toString();
+    }
+
+    private static List<KafkaFetchRecords.KafkaTopicPosition> 
fetchCommitOffsets(KafkaConsumer kc, KafkaFetchRecords task) {
+        StopWatch watch = new StopWatch();
+
+        CountDownLatch latch = task.fetchCommitRecords();
+        long timeout = 
Math.min(kc.getEndpoint().getConfiguration().getPollTimeoutMs(), 
COMMITTED_TIMEOUT);
+        try {
+            latch.await(timeout, TimeUnit.MILLISECONDS);
+            var answer = task.getCommitRecords();
+            LOG.debug("Fetching commit offsets took: {} ms", watch.taken());
+            return answer;
+        } catch (Exception e) {
+            // ignore
+        }
+        return null;
+    }
+
+    @Override
+    protected Map<String, Object> doCallJson(Map<String, Object> options) {
+        final boolean committed = 
"true".equals(options.getOrDefault(COMMITTED, "false"));
+
+        JsonObject root = new JsonObject();
+
+        List<JsonObject> list = new ArrayList<>();
+        root.put("kafkaConsumers", list);
+
+        for (Route route : getCamelContext().getRoutes()) {
+            if (route.getConsumer() instanceof KafkaConsumer kc) {
+                JsonObject jo = new JsonObject();
+                jo.put("routeId", route.getRouteId());
+                jo.put("uri", route.getEndpoint().getEndpointUri());
+
+                JsonArray arr = new JsonArray();
+                jo.put("workers", arr);
+
+                for (KafkaFetchRecords t : kc.tasks()) {
+                    JsonObject wo = new JsonObject();
+                    arr.add(wo);
+                    wo.put("threadId", t.getThreadId());
+                    wo.put("state", t.getState());
+                    TaskHealthState hs = t.healthState();
+                    if (!hs.isReady()) {
+                        wo.put("lastError", hs.buildStateMessage());
+                    }
+                    KafkaFetchRecords.GroupMetadata meta = 
t.getGroupMetadata();
+                    if (meta != null) {
+                        wo.put("groupId", meta.groupId());
+                        wo.put("groupInstanceId", meta.groupInstanceId());
+                        wo.put("memberId", meta.memberId());
+                        wo.put("generationId", meta.generationId());
+                    }
+                    if (t.getLastRecord() != null) {
+                        wo.put("lastTopic", t.getLastRecord().topic());
+                        wo.put("lastPartition", t.getLastRecord().partition());
+                        wo.put("lastOffset", t.getLastRecord().offset());
+                    }
+                    if (committed) {
+                        List<KafkaFetchRecords.KafkaTopicPosition> l = 
fetchCommitOffsets(kc, t);
+                        if (l != null) {
+                            JsonArray ca = new JsonArray();
+                            for (KafkaFetchRecords.KafkaTopicPosition r : l) {
+                                JsonObject cr = new JsonObject();
+                                cr.put("topic", r.topic());
+                                cr.put("partition", r.partition());
+                                cr.put("offset", r.offset());
+                                cr.put("epoch", r.epoch());
+                                ca.add(cr);
+                            }
+                            if (!ca.isEmpty()) {
+                                wo.put("committed", ca);
+                            }
+                        }
+                    }
+                }
+                list.add(jo);
+            }
+        }
+        return root;
+    }
+
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index a40a29ebbd6..18423905894 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -17,8 +17,15 @@
 package org.apache.camel.component.kafka;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
@@ -45,6 +52,7 @@ import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ReflectionHelper;
 import org.apache.camel.util.TimeUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -58,6 +66,7 @@ import org.slf4j.LoggerFactory;
 import static java.rmi.registry.LocateRegistry.getRegistry;
 
 public class KafkaFetchRecords implements Runnable {
+
     /*
      This keeps track of the state the record fetcher is. Because the Kafka 
consumer is not thread safe, it may take
      some time between the pause or resume request is triggered and it is 
actually set.
@@ -92,12 +101,24 @@ public class KafkaFetchRecords implements Runnable {
 
     private volatile boolean terminated;
     private volatile long currentBackoffInterval;
-
     private volatile boolean reconnect; // The reconnect must be false at init 
(this is the policy whether to reconnect).
     private volatile boolean connected; // this is the state (connected or not)
-
     private volatile State state = State.RUNNING;
 
+    // dev-console records and state
+    record GroupMetadata(String groupId, String groupInstanceId, String 
memberId, int generationId) {
+    }
+
+    record KafkaTopicPosition(String topic, int partition, long offset, int 
epoch) {
+    }
+
+    private final boolean devConsoleEnabled;
+    private volatile GroupMetadata groupMetadata;
+    private volatile KafkaTopicPosition lastRecord;
+    private final List<KafkaTopicPosition> commitRecords = new ArrayList<>();
+    private final AtomicBoolean commitRecordsRequested = new AtomicBoolean();
+    private final AtomicReference<CountDownLatch> latch = new 
AtomicReference<>();
+
     KafkaFetchRecords(KafkaConsumer kafkaConsumer,
                       BridgeExceptionHandlerToErrorHandler bridge, String 
topicName, Pattern topicPattern, String id,
                       Properties kafkaProps, KafkaConsumerListener 
consumerListener) {
@@ -108,6 +129,7 @@ public class KafkaFetchRecords implements Runnable {
         this.consumerListener = consumerListener;
         this.threadId = topicName + "-" + "Thread " + id;
         this.kafkaProps = kafkaProps;
+        this.devConsoleEnabled = 
kafkaConsumer.getEndpoint().getCamelContext().isDevConsole();
     }
 
     @Override
@@ -169,6 +191,15 @@ public class KafkaFetchRecords implements Runnable {
                 setConnected(true);
             }
 
+            if (devConsoleEnabled && isConnected()) {
+                // store metadata
+                ConsumerGroupMetadata meta = consumer.groupMetadata();
+                if (meta != null) {
+                    groupMetadata = new GroupMetadata(
+                            meta.groupId(), meta.groupInstanceId().orElse(""), 
meta.memberId(), meta.generationId());
+                }
+            }
+
             setLastError(null);
             startPolling();
         } while ((pollExceptionStrategy.canContinue() || isReconnect()) && 
isKafkaConsumerRunnable());
@@ -343,6 +374,31 @@ public class KafkaFetchRecords implements Runnable {
             final KafkaRecordProcessorFacade recordProcessorFacade = 
createRecordProcessor();
 
             while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && 
pollExceptionStrategy.canContinue()) {
+
+                // if dev-console is in use then a request to fetch the commit 
offsets can be requested on-demand
+                // which must happen using this polling thread, so we use the 
commitRecordsRequested to trigger this
+                if (devConsoleEnabled && 
commitRecordsRequested.compareAndSet(true, false)) {
+                    try {
+                        Map<TopicPartition, OffsetAndMetadata> commits = 
consumer.committed(consumer.assignment());
+                        commitRecords.clear();
+                        for (var e : commits.entrySet()) {
+                            KafkaTopicPosition p
+                                    = new KafkaTopicPosition(
+                                            e.getKey().topic(), 
e.getKey().partition(), e.getValue().offset(),
+                                            
e.getValue().leaderEpoch().orElse(0));
+                            commitRecords.add(p);
+                        }
+                        CountDownLatch count = latch.get();
+                        if (count != null) {
+                            count.countDown();
+                        }
+                    } catch (Exception e) {
+                        // ignore cannot get last commit details
+                        LOG.debug("Cannot get last offset committed from Kafka 
brokers due to: {}. This exception is ignored.",
+                                e.getMessage(), e);
+                    }
+                }
+
                 ConsumerRecords<Object, Object> allRecords = 
consumer.poll(pollDuration);
                 if (consumerListener != null) {
                     if (!consumerListener.afterConsume(consumer)) {
@@ -351,6 +407,10 @@ public class KafkaFetchRecords implements Runnable {
                 }
 
                 ProcessingResult result = 
recordProcessorFacade.processPolledRecords(allRecords);
+                if (devConsoleEnabled && result != null && result.getTopic() 
!= null) {
+                    // dev-console uses information from last processed record
+                    lastRecord = new KafkaTopicPosition(result.getTopic(), 
result.getPartition(), result.getOffset(), 0);
+                }
                 updateTaskState();
 
                 // when breakOnFirstError we want to unsubscribe from Kafka
@@ -494,7 +554,7 @@ public class KafkaFetchRecords implements Runnable {
         return kafkaConsumer.isRunAllowed() && 
!kafkaConsumer.isStoppingOrStopped();
     }
 
-    private boolean isReconnect() {
+    boolean isReconnect() {
         return reconnect;
     }
 
@@ -633,4 +693,37 @@ public class KafkaFetchRecords implements Runnable {
     private synchronized void setLastError(Exception lastError) {
         this.lastError = lastError;
     }
+
+    // dev console information
+    // ------------------------------------------------------------------------
+
+    GroupMetadata getGroupMetadata() {
+        return groupMetadata;
+    }
+
+    KafkaTopicPosition getLastRecord() {
+        return lastRecord;
+    }
+
+    String getThreadId() {
+        return threadId;
+    }
+
+    String getState() {
+        return state.name();
+    }
+
+    List<KafkaTopicPosition> getCommitRecords() {
+        return Collections.unmodifiableList(commitRecords);
+    }
+
+    CountDownLatch fetchCommitRecords() {
+        // use a latch to wait for commit records to be ready
+        // as the consumer thread must be calling Kafka brokers to get this 
information
+        // so this thread need to wait for that to be complete
+        CountDownLatch answer = new CountDownLatch(1);
+        latch.set(answer);
+        commitRecordsRequested.set(true);
+        return answer;
+    }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/TaskHealthState.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/TaskHealthState.java
index f8f0f74437a..6f445a6dcef 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/TaskHealthState.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/TaskHealthState.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka;
 
 import java.util.Properties;
@@ -28,16 +27,14 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
  * should be marked as volatile).
  */
 public class TaskHealthState {
+
     private final boolean ready;
     private final boolean isTerminated;
     private final boolean isRecoverable;
     private final Exception lastError;
     private final String clientId;
-
     private final String bootstrapServers;
-
     private final long currentBackoffInterval;
-
     private final Properties clientProperties;
 
     public TaskHealthState(boolean ready, boolean isTerminated, boolean 
isRecoverable, Exception lastError, String clientId,
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java
index a8b8a776aa3..6e94d33ac56 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitManager.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer;
 
 import org.apache.camel.Exchange;
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index 41a747b0ef9..89416f86c49 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support;
 
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -23,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
  * A processing facade that allows processing consumer records in different 
ways
  */
 public interface KafkaRecordProcessorFacade {
+
     /**
      * Sends a set of records polled from Kafka for processing
      *
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
index 4be6b8ba7eb..3fe4f785503 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
@@ -14,18 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support;
 
 /**
  * Holds the result of processing one or more consumer records
  */
 public final class ProcessingResult {
+
     private static final ProcessingResult UNPROCESSED_RESULT
             = new ProcessingResult(false, false);
 
     private final boolean breakOnErrorHit;
     private final boolean failed;
+    private final String topic;
+    private final int partition;
+    private final long offset;
 
     /**
      * Constructs a new processing result
@@ -34,8 +37,24 @@ public final class ProcessingResult {
      * @param failed          whether processing has failed
      */
     public ProcessingResult(boolean breakOnErrorHit, boolean failed) {
+        this(breakOnErrorHit, failed, null, 0, 0);
+    }
+
+    /**
+     * Constructs a new processing result
+     *
+     * @param breakOnErrorHit break on error hit setting
+     * @param failed          whether processing has failed
+     * @param topic           the topic
+     * @param partition       the partition of the topic
+     * @param offset          the consumer offset
+     */
+    public ProcessingResult(boolean breakOnErrorHit, boolean failed, String 
topic, int partition, long offset) {
         this.breakOnErrorHit = breakOnErrorHit;
         this.failed = failed;
+        this.topic = topic;
+        this.partition = partition;
+        this.offset = offset;
     }
 
     public boolean isBreakOnErrorHit() {
@@ -46,6 +65,18 @@ public final class ProcessingResult {
         return failed;
     }
 
+    public String getTopic() {
+        return topic;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
     public static ProcessingResult newUnprocessed() {
         return UNPROCESSED_RESULT;
     }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/TopicHelper.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/TopicHelper.java
index fc7c16d5d97..3e21afbf19a 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/TopicHelper.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/TopicHelper.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support;
 
 import java.util.Collection;
@@ -23,8 +22,8 @@ import java.util.regex.Pattern;
 import org.apache.camel.component.kafka.consumer.support.subcription.TopicInfo;
 
 public final class TopicHelper {
-    private TopicHelper() {
 
+    private TopicHelper() {
     }
 
     public static String getPrintableTopic(Pattern topicPattern, 
Collection<String> topics) {
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
index ed924dc78dd..3e10a65e0ea 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java
@@ -147,7 +147,6 @@ final class KafkaRecordBatchingProcessor extends 
KafkaRecordProcessor {
         // None of the states provided by the processing result are relevant 
for batch processing. We can simply return the
         // default state
         return ProcessingResult.newUnprocessed();
-
     }
 
     private boolean hasExpiredRecords(ConsumerRecords<Object, Object> 
consumerRecords) {
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java
index ba11d1a7657..2a8b3ca49c8 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support.classic;
 
 import java.util.Collection;
@@ -28,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ClassicRebalanceListener implements ConsumerRebalanceListener {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(ClassicRebalanceListener.class);
 
     private final String threadId;
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java
index 84f25bce7e7..634f59951ca 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java
@@ -36,5 +36,8 @@ public interface PartitionAssignmentAdapter {
      */
     void setConsumer(Consumer<?, ?> consumer);
 
+    /**
+     * Callback for custom logic when partitions has been assigned.
+     */
     void handlePartitionAssignment();
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java
index 41aa7de74a8..e8847c83238 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support.resume;
 
 import java.util.Collection;
@@ -29,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ResumeRebalanceListener implements ConsumerRebalanceListener {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(ResumeRebalanceListener.class);
 
     private final String threadId;
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java
index f9eabf13ce2..9b8be56a523 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessor.java
@@ -86,9 +86,12 @@ final class KafkaRecordStreamingProcessor extends 
KafkaRecordProcessor {
             final ExceptionHandler exceptionHandler = 
camelKafkaConsumer.getExceptionHandler();
 
             boolean breakOnErrorExit = processException(exchange, 
topicPartition, consumerRecord, exceptionHandler);
-            result = new ProcessingResult(breakOnErrorExit, true);
+            result = new ProcessingResult(
+                    breakOnErrorExit, true, consumerRecord.topic(), 
consumerRecord.partition(), consumerRecord.offset());
         } else {
-            result = new ProcessingResult(false, exchange.getException() != 
null);
+            result = new ProcessingResult(
+                    false, exchange.getException() != null, 
consumerRecord.topic(), consumerRecord.partition(),
+                    consumerRecord.offset());
         }
 
         if (!result.isBreakOnErrorHit()) {
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java
index 07fb5f2ef30..ce4c78ed36d 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/DefaultSubscribeAdapter.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support.subcription;
 
 import org.apache.camel.component.kafka.consumer.support.TopicHelper;
@@ -24,6 +23,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class DefaultSubscribeAdapter implements SubscribeAdapter {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultSubscribeAdapter.class);
 
     @Override
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/SubscribeAdapter.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/SubscribeAdapter.java
index ac257f535af..a7b3e4a94bb 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/SubscribeAdapter.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/subcription/SubscribeAdapter.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.component.kafka.consumer.support.subcription;
 
 import org.apache.kafka.clients.consumer.Consumer;
@@ -24,6 +23,7 @@ import 
org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
  * A pluggable adapter for handling custom subscriptions
  */
 public interface SubscribeAdapter {
+
     /**
      * Handle the subscription to a Kafka topic
      *
diff --git 
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
 
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
index d230f7fd06e..6812bcb72d4 100644
--- 
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
+++ 
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
@@ -265,6 +265,8 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
                 doActionTransformTask(root);
             } else if ("bean".equals(action)) {
                 doActionBeanTask(root);
+            } else if ("kafka".equals(action)) {
+                doActionKafkaTask();
             }
         } catch (Exception e) {
             // ignore
@@ -610,6 +612,8 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
                                 .getMap("exception"));
                 IOHelper.writeText(jo.toJson(), outputFile);
             }
+        } else {
+            IOHelper.writeText("{}", outputFile);
         }
     }
 
@@ -625,6 +629,8 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
                     Map.of("filter", filter, "limit", limit, "browse", 
browse));
             LOG.trace("Updating output file: {}", outputFile);
             IOHelper.writeText(json.toJson(), outputFile);
+        } else {
+            IOHelper.writeText("{}", outputFile);
         }
     }
 
@@ -635,6 +641,8 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
             JsonObject json = (JsonObject) dc.call(DevConsole.MediaType.JSON);
             LOG.trace("Updating output file: {}", outputFile);
             IOHelper.writeText(json.toJson(), outputFile);
+        } else {
+            IOHelper.writeText("{}", outputFile);
         }
     }
 
@@ -646,6 +654,8 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
             JsonObject json = (JsonObject) dc.call(DevConsole.MediaType.JSON, 
Map.of("stacktrace", stacktrace));
             LOG.trace("Updating output file: {}", outputFile);
             IOHelper.writeText(json.toJson(), outputFile);
+        } else {
+            IOHelper.writeText("{}", outputFile);
         }
     }
 
@@ -661,6 +671,8 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
                             Map.of("filter", filter, "format", format, 
"uriAsParameters", uriAsParameters));
             LOG.trace("Updating output file: {}", outputFile);
             IOHelper.writeText(json.toJson(), outputFile);
+        } else {
+            IOHelper.writeText("{}", outputFile);
         }
     }
 
@@ -672,6 +684,8 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
             JsonObject json = (JsonObject) dc.call(DevConsole.MediaType.JSON, 
Map.of("filter", filter));
             LOG.trace("Updating output file: {}", outputFile);
             IOHelper.writeText(json.toJson(), outputFile);
+        } else {
+            IOHelper.writeText("{}", outputFile);
         }
     }
 
@@ -682,6 +696,8 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
             JsonObject json = (JsonObject) dc.call(DevConsole.MediaType.JSON, 
Map.of(Exchange.HTTP_PATH, "/*"));
             LOG.trace("Updating output file: {}", outputFile);
             IOHelper.writeText(json.toJson(), outputFile);
+        } else {
+            IOHelper.writeText("{}", outputFile);
         }
     }
 
@@ -692,6 +708,20 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
             JsonObject json = (JsonObject) dc.call(DevConsole.MediaType.JSON, 
Map.of("stackTrace", "true"));
             LOG.trace("Updating output file: {}", outputFile);
             IOHelper.writeText(json.toJson(), outputFile);
+        } else {
+            IOHelper.writeText("{}", outputFile);
+        }
+    }
+
+    private void doActionKafkaTask() throws IOException {
+        DevConsole dc = 
camelContext.getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class)
+                .resolveById("kafka");
+        if (dc != null) {
+            JsonObject json = (JsonObject) dc.call(DevConsole.MediaType.JSON, 
Map.of("committed", "true"));
+            LOG.trace("Updating output file: {}", outputFile);
+            IOHelper.writeText(json.toJson(), outputFile);
+        } else {
+            IOHelper.writeText("{}", outputFile);
         }
     }
 
@@ -738,6 +768,8 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
                     Map.of("command", cmd, "breakpoint", bp, "history", 
history));
             LOG.trace("Updating output file: {}", outputFile);
             IOHelper.writeText(json.toJson(), outputFile);
+        } else {
+            IOHelper.writeText("{}", outputFile);
         }
     }
 
@@ -1016,6 +1048,13 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
                         root.put("rests", json);
                     }
                 }
+                DevConsole dc20 = dcr.resolveById("kafka");
+                if (dc20 != null) {
+                    JsonObject json = (JsonObject) 
dc20.call(DevConsole.MediaType.JSON);
+                    if (json != null && !json.isEmpty()) {
+                        root.put("kafka", json);
+                    }
+                }
             }
             // various details
             JsonObject mem = collectMemory();
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
index d39848eee04..8b74f5f5e93 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
@@ -102,6 +102,7 @@ public class CamelJBangMain implements Callable<Integer> {
                         .addSubcommand("service", new CommandLine(new 
ListService(main)))
                         .addSubcommand("rest", new CommandLine(new 
ListRest(main)))
                         .addSubcommand("platform-http", new CommandLine(new 
ListPlatformHttp(main)))
+                        .addSubcommand("kafka", new CommandLine(new 
ListKafka(main)))
                         .addSubcommand("source", new CommandLine(new 
CamelSourceAction(main)))
                         .addSubcommand("route-dump", new CommandLine(new 
CamelRouteDumpAction(main)))
                         .addSubcommand("startup-recorder", new CommandLine(new 
CamelStartupRecorderAction(main)))
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListHealth.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListHealth.java
index 628ee3cd7e8..93769abf532 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListHealth.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListHealth.java
@@ -132,26 +132,20 @@ public class ListHealth extends ProcessWatchCommand {
                                 String time = d.getString("invocation.time");
                                 if (time != null) {
                                     ZonedDateTime zdt = 
ZonedDateTime.parse(time);
-                                    if (zdt != null) {
-                                        long delta = 
Math.abs(ZonedDateTime.now().until(zdt, ChronoUnit.MILLIS));
-                                        row.sinceLast = 
TimeUtils.printAge(delta);
-                                    }
+                                    long delta = 
Math.abs(ZonedDateTime.now().until(zdt, ChronoUnit.MILLIS));
+                                    row.sinceLast = TimeUtils.printAge(delta);
                                 }
                                 time = d.getString("success.start.time");
                                 if (time != null) {
                                     ZonedDateTime zdt = 
ZonedDateTime.parse(time);
-                                    if (zdt != null) {
-                                        long delta = 
Math.abs(ZonedDateTime.now().until(zdt, ChronoUnit.MILLIS));
-                                        row.sinceStartSuccess = 
TimeUtils.printAge(delta);
-                                    }
+                                    long delta = 
Math.abs(ZonedDateTime.now().until(zdt, ChronoUnit.MILLIS));
+                                    row.sinceStartSuccess = 
TimeUtils.printAge(delta);
                                 }
                                 time = d.getString("failure.start.time");
                                 if (time != null) {
                                     ZonedDateTime zdt = 
ZonedDateTime.parse(time);
-                                    if (zdt != null) {
-                                        long delta = 
Math.abs(ZonedDateTime.now().until(zdt, ChronoUnit.MILLIS));
-                                        row.sinceStartFailure = 
TimeUtils.printAge(delta);
-                                    }
+                                    long delta = 
Math.abs(ZonedDateTime.now().until(zdt, ChronoUnit.MILLIS));
+                                    row.sinceStartFailure = 
TimeUtils.printAge(delta);
                                 }
                                 for (Map.Entry<String, Object> entry : 
d.entrySet()) {
                                     String k = entry.getKey();
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListKafka.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListKafka.java
new file mode 100644
index 00000000000..eb20255c43c
--- /dev/null
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListKafka.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dsl.jbang.core.commands.process;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.StringJoiner;
+
+import com.github.freva.asciitable.AsciiTable;
+import com.github.freva.asciitable.Column;
+import com.github.freva.asciitable.HorizontalAlign;
+import com.github.freva.asciitable.OverflowBehaviour;
+import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain;
+import org.apache.camel.dsl.jbang.core.common.PidNameAgeCompletionCandidates;
+import org.apache.camel.dsl.jbang.core.common.ProcessHelper;
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.StringHelper;
+import org.apache.camel.util.TimeUtils;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import org.apache.camel.util.json.Jsoner;
+import picocli.CommandLine;
+import picocli.CommandLine.Command;
+
+@Command(name = "kafka",
+         description = "List Kafka consumers of Camel integrations", 
sortOptions = false)
+public class ListKafka extends ProcessWatchCommand {
+
+    @CommandLine.Parameters(description = "Name or pid of running Camel 
integration", arity = "0..1")
+    String name = "*";
+
+    @CommandLine.Option(names = { "--sort" }, completionCandidates = 
PidNameAgeCompletionCandidates.class,
+                        description = "Sort by pid, name or age", defaultValue 
= "pid")
+    String sort;
+
+    @CommandLine.Option(names = { "--metadata" },
+                        description = "Show group metadata")
+    boolean metadata;
+
+    @CommandLine.Option(names = { "--committed" },
+                        description = "Show committed offset (slower due to 
sync call to Kafka brokers)")
+    boolean committed;
+
+    @CommandLine.Option(names = { "--short-uri" },
+                        description = "List endpoint URI without query 
parameters (short)")
+    boolean shortUri;
+
+    @CommandLine.Option(names = { "--wide-uri" },
+                        description = "List endpoint URI in full details")
+    boolean wideUri;
+
+    public ListKafka(CamelJBangMain main) {
+        super(main);
+    }
+
+    @Override
+    protected void autoClearScreen() {
+        // do not auto clear as can be slow when getting committed details
+    }
+
+    @Override
+    public Integer doProcessWatchCall() throws Exception {
+        List<Row> rows = new ArrayList<>();
+
+        List<Long> pids = findPids(name);
+        ProcessHandle.allProcesses()
+                .filter(ph -> pids.contains(ph.pid()))
+                .forEach(ph -> {
+                    JsonObject root = loadStatus(ph.pid());
+                    // there must be a status file for the running Camel 
integration
+                    if (root != null) {
+                        Row copy = new Row();
+                        JsonObject context = (JsonObject) root.get("context");
+                        if (context == null) {
+                            return;
+                        }
+                        copy.name = context.getString("name");
+                        if ("CamelJBang".equals(copy.name)) {
+                            copy.name = ProcessHelper.extractName(root, ph);
+                        }
+                        copy.pid = Long.toString(ph.pid());
+                        copy.uptime = extractSince(ph);
+                        copy.age = TimeUtils.printSince(copy.uptime);
+
+                        JsonObject jo = (JsonObject) root.get("kafka");
+                        if (jo != null) {
+                            if (committed) {
+                                // we ask for committed so need to do an 
action on-demand to get data
+                                // ensure output file is deleted before 
executing action
+                                File outputFile = 
getOutputFile(Long.toString(ph.pid()));
+                                FileUtil.deleteFile(outputFile);
+
+                                JsonObject root2 = new JsonObject();
+                                root2.put("action", "kafka");
+                                File file = 
getActionFile(Long.toString(ph.pid()));
+                                try {
+                                    IOHelper.writeText(root2.toJson(), file);
+                                } catch (Exception e) {
+                                    // ignore
+                                }
+                                jo = waitForOutputFile(outputFile);
+                            }
+                            JsonArray arr = jo != null ? (JsonArray) 
jo.get("kafkaConsumers") : null;
+                            if (arr != null) {
+                                for (int i = 0; i < arr.size(); i++) {
+                                    Row row = copy.copy();
+                                    jo = (JsonObject) arr.get(i);
+                                    row.routeId = jo.getString("routeId");
+                                    row.uri = jo.getString("uri");
+                                    row.state = jo.getString("state");
+
+                                    JsonArray wa = (JsonArray) 
jo.get("workers");
+                                    if (wa != null) {
+                                        for (int j = 0; j < wa.size(); j++) {
+                                            JsonObject wo = (JsonObject) 
wa.get(j);
+                                            row.threadId = 
wo.getString("threadId");
+                                            row.state = wo.getString("state");
+                                            row.lastError = 
wo.getString("lastError");
+                                            row.groupId = 
wo.getString("groupId");
+                                            row.groupInstanceId = 
wo.getString("groupInstanceId");
+                                            row.memberId = 
wo.getString("memberId");
+                                            row.generationId = 
wo.getIntegerOrDefault("generationId", 0);
+                                            row.lastTopic = 
wo.getString("lastTopic");
+                                            row.lastPartition = 
wo.getIntegerOrDefault("lastPartition", 0);
+                                            row.lastOffset = 
wo.getLongOrDefault("lastOffset", 0);
+                                            if (committed) {
+                                                JsonArray ca = (JsonArray) 
wo.get("committed");
+                                                if (ca != null) {
+                                                    JsonObject found = null;
+                                                    for (int k = 0; k < 
ca.size(); k++) {
+                                                        JsonObject co = 
(JsonObject) ca.get(k);
+                                                        if (row.lastTopic == 
null
+                                                                || 
(row.lastTopic.equals(co.getString("topic"))
+                                                                        && 
row.lastPartition == co.getInteger("partition"))) {
+                                                            found = co;
+                                                            break;
+                                                        }
+                                                    }
+                                                    if (found != null) {
+                                                        row.lastTopic = 
found.getString("topic");
+                                                        row.lastPartition = 
found.getIntegerOrDefault("partition", 0);
+                                                        row.committedOffset = 
found.getLongOrDefault("offset", 0);
+                                                        row.committedEpoch = 
found.getLongOrDefault("epoch", 0);
+                                                    }
+                                                }
+                                            }
+                                            rows.add(row);
+                                            row = row.copy();
+                                        }
+                                    } else {
+                                        rows.add(row);
+                                    }
+                                }
+                            }
+                        }
+                    }
+                });
+
+        // sort rows
+        rows.sort(this::sortRow);
+
+        // clear before writing new data
+        if (watch) {
+            clearScreen();
+        }
+
+        if (!rows.isEmpty()) {
+            printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, 
Arrays.asList(
+                    new 
Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid),
+                    new 
Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, 
OverflowBehaviour.ELLIPSIS_RIGHT)
+                            .with(r -> r.name),
+                    new 
Column().header("ROUTE").dataAlign(HorizontalAlign.LEFT).with(this::getRouteId),
+                    new 
Column().header("METADATA").visible(metadata).dataAlign(HorizontalAlign.LEFT).with(this::getMetadata),
+                    new 
Column().header("STATE").dataAlign(HorizontalAlign.LEFT).with(this::getState),
+                    new 
Column().header("TOPIC").dataAlign(HorizontalAlign.RIGHT).with(r -> 
r.lastTopic),
+                    new 
Column().header("PARTITION").dataAlign(HorizontalAlign.RIGHT).with(r -> "" + 
r.lastPartition),
+                    new 
Column().header("OFFSET").dataAlign(HorizontalAlign.RIGHT).with(r -> "" + 
r.lastOffset),
+                    new 
Column().header("COMMITTED").visible(committed).dataAlign(HorizontalAlign.RIGHT)
+                            .with(this::getCommitted),
+                    new 
Column().header("ERROR").dataAlign(HorizontalAlign.LEFT)
+                            .maxWidth(60, OverflowBehaviour.NEWLINE)
+                            .with(this::getLastError),
+                    new 
Column().header("ENDPOINT").visible(!wideUri).dataAlign(HorizontalAlign.LEFT)
+                            .maxWidth(90, OverflowBehaviour.ELLIPSIS_RIGHT)
+                            .with(this::getUri),
+                    new 
Column().header("ENDPOINT").visible(wideUri).dataAlign(HorizontalAlign.LEFT)
+                            .maxWidth(140, OverflowBehaviour.NEWLINE)
+                            .with(this::getUri))));
+        }
+
+        return 0;
+    }
+
+    protected int sortRow(Row o1, Row o2) {
+        String s = sort;
+        int negate = 1;
+        if (s.startsWith("-")) {
+            s = s.substring(1);
+            negate = -1;
+        }
+        switch (s) {
+            case "pid":
+                return Long.compare(Long.parseLong(o1.pid), 
Long.parseLong(o2.pid)) * negate;
+            case "name":
+                return o1.name.compareToIgnoreCase(o2.name) * negate;
+            case "age":
+                return Long.compare(o1.uptime, o2.uptime) * negate;
+            default:
+                return 0;
+        }
+    }
+
+    private String getRouteId(Row r) {
+        if (r.routeId != null) {
+            return r.routeId;
+        }
+        return "";
+    }
+
+    private String getLastError(Row r) {
+        if (r.lastError != null) {
+            return r.lastError;
+        }
+        return "";
+    }
+
+    private String getUri(Row r) {
+        String u = r.uri;
+        if (shortUri) {
+            int pos = u.indexOf('?');
+            if (pos > 0) {
+                u = u.substring(0, pos);
+            }
+        }
+        return u;
+    }
+
+    private String getMetadata(Row r) {
+        StringJoiner sj = new StringJoiner(" ");
+        if (r.groupId != null && !r.groupId.isEmpty()) {
+            sj.add("groupId=" + r.groupId);
+        }
+        if (r.groupInstanceId != null && !r.groupInstanceId.isEmpty()) {
+            sj.add("groupInstanceId=" + r.groupInstanceId);
+        }
+        if (r.memberId != null && !r.memberId.isEmpty()) {
+            sj.add("memberId=" + r.memberId);
+        }
+        if (r.generationId > 0) {
+            sj.add("generationId=" + r.generationId);
+        }
+        return sj.toString();
+    }
+
+    private String getState(Row r) {
+        return StringHelper.capitalize(r.state.toLowerCase(Locale.ROOT));
+    }
+
+    private String getCommitted(Row r) {
+        if (r.committedEpoch > 0) {
+            String age = TimeUtils.printSince(r.committedEpoch);
+            return r.committedOffset + " (" + age + ")";
+        } else {
+            return "" + r.committedOffset;
+        }
+    }
+
+    private static class Row implements Cloneable {
+        String pid;
+        String name;
+        String age;
+        long uptime;
+        String routeId;
+        String uri;
+        // worker
+        String threadId;
+        String state;
+        String lastError;
+        String groupId;
+        String groupInstanceId;
+        String memberId;
+        int generationId;
+        String lastTopic;
+        int lastPartition;
+        long lastOffset;
+        long committedOffset;
+        long committedEpoch;
+
+        Row copy() {
+            try {
+                return (Row) clone();
+            } catch (CloneNotSupportedException e) {
+                return null;
+            }
+        }
+    }
+
+    private JsonObject waitForOutputFile(File outputFile) {
+        JsonObject answer = null;
+
+        StopWatch watch = new StopWatch();
+        while (watch.taken() < 10000 && answer == null) {
+            try {
+                // give time for response to be ready
+                Thread.sleep(100);
+
+                if (outputFile.exists()) {
+                    FileInputStream fis = new FileInputStream(outputFile);
+                    String text = IOHelper.loadText(fis);
+                    IOHelper.close(fis);
+                    answer = (JsonObject) Jsoner.deserialize(text);
+                }
+            } catch (Exception e) {
+                // ignore
+            }
+        }
+        return answer;
+    }
+
+}
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ProcessWatchCommand.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ProcessWatchCommand.java
index c1a3d377403..5ac45dabdfd 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ProcessWatchCommand.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ProcessWatchCommand.java
@@ -39,7 +39,7 @@ abstract class ProcessWatchCommand extends ProcessBaseCommand 
{
         int exit;
         if (watch) {
             do {
-                clearScreen();
+                autoClearScreen();
                 exit = doProcessWatchCall();
                 if (exit == 0) {
                     // use 2-sec delay in watch mode
@@ -52,6 +52,10 @@ abstract class ProcessWatchCommand extends 
ProcessBaseCommand {
         return exit;
     }
 
+    protected void autoClearScreen() {
+        clearScreen();
+    }
+
     protected void clearScreen() {
         AnsiConsole.out().print(Ansi.ansi().eraseScreen().cursor(1, 1));
     }
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/version/VersionGetTest.java
 
b/dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/version/VersionGetTest.java
index d41bce2d486..116f10a5a33 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/version/VersionGetTest.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/test/java/org/apache/camel/dsl/jbang/core/commands/version/VersionGetTest.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.camel.dsl.jbang.core.commands.version;
 
 import java.io.IOException;
@@ -41,7 +40,7 @@ class VersionGetTest extends CamelCommandBaseTest {
         command.doCall();
 
         List<String> lines = printer.getLines();
-        Assertions.assertEquals("JBang version: 0.100", lines.get(0));
+        Assertions.assertTrue(lines.get(0).startsWith("JBang version:"));
         Assertions.assertTrue(lines.get(1).startsWith("Camel JBang version:"));
     }
 
@@ -59,7 +58,7 @@ class VersionGetTest extends CamelCommandBaseTest {
 
         List<String> lines = printer.getLines();
         Assertions.assertEquals(5, lines.size());
-        Assertions.assertEquals("JBang version: 0.101", lines.get(0));
+        Assertions.assertTrue(lines.get(0).startsWith("JBang version:"));
         Assertions.assertTrue(lines.get(1).startsWith("Camel JBang version:"));
         Assertions.assertEquals("User configuration:", lines.get(2));
         Assertions.assertEquals("camel-version = latest", lines.get(3));

Reply via email to