This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 3c957201743 CAMEL-21193: camel-jbang - Add receive command (#15912)
3c957201743 is described below

commit 3c957201743ff8ec01530de27705c4544464d275
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Oct 10 15:10:57 2024 +0200

    CAMEL-21193: camel-jbang - Add receive command (#15912)
    
    * CAMEL-21193: camel-jbang - Add receive command
---
 .../apache/camel/catalog/dev-consoles.properties   |   1 +
 .../apache/camel/catalog/dev-consoles/receive.json |  15 +
 .../org/apache/camel/impl/engine/DefaultRoute.java |   4 +
 .../impl/console/ReceiveDevConsoleConfigurer.java  |  86 +++
 .../org/apache/camel/dev-console/receive.json      |  15 +
 ...org.apache.camel.impl.console.ReceiveDevConsole |   2 +
 .../services/org/apache/camel/dev-console/receive  |   2 +
 .../org/apache/camel/dev-consoles.properties       |   2 +-
 .../camel/impl/console/ReceiveDevConsole.java      | 345 +++++++++
 .../java/org/apache/camel/util/ObjectHelper.java   |  19 +
 .../modules/ROOT/pages/camel-jbang.adoc            |  65 +-
 .../camel/cli/connector/LocalCliConnector.java     |  64 +-
 .../dsl/jbang/core/commands/CamelCommand.java      |   4 +
 .../dsl/jbang/core/commands/CamelJBangMain.java    |   1 +
 .../core/commands/action/CamelBrowseAction.java    |  10 -
 .../core/commands/action/CamelReceiveAction.java   | 808 +++++++++++++++++++++
 16 files changed, 1429 insertions(+), 14 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
index d8d1929214d..280b182c430 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
@@ -29,6 +29,7 @@ platform-http
 properties
 protocol
 quartz
+receive
 reload
 resilience4j
 rest
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/receive.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/receive.json
new file mode 100644
index 00000000000..978a1f23579
--- /dev/null
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/receive.json
@@ -0,0 +1,15 @@
+{
+  "console": {
+    "kind": "console",
+    "group": "camel",
+    "name": "receive",
+    "title": "Camel Receive",
+    "description": "Consume messages from endpoints",
+    "deprecated": false,
+    "javaType": "org.apache.camel.impl.console.ReceiveDevConsole",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-console",
+    "version": "4.9.0-SNAPSHOT"
+  }
+}
+
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
index 567443d3302..446671ba875 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.Channel;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.ErrorHandlerFactory;
@@ -733,6 +734,9 @@ public class DefaultRoute extends ServiceSupport implements 
Route {
         List<Processor> list = nav.next();
         if (list != null) {
             for (Processor proc : list) {
+                if (proc instanceof Channel channel) {
+                    proc = channel.getNextProcessor();
+                }
                 String id = null;
                 if (proc instanceof IdAware idAware) {
                     id = idAware.getId();
diff --git 
a/core/camel-console/src/generated/java/org/apache/camel/impl/console/ReceiveDevConsoleConfigurer.java
 
b/core/camel-console/src/generated/java/org/apache/camel/impl/console/ReceiveDevConsoleConfigurer.java
new file mode 100644
index 00000000000..0129a9fae6a
--- /dev/null
+++ 
b/core/camel-console/src/generated/java/org/apache/camel/impl/console/ReceiveDevConsoleConfigurer.java
@@ -0,0 +1,86 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.impl.console;
+
+import javax.annotation.processing.Generated;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
+import org.apache.camel.spi.PropertyConfigurerGetter;
+import org.apache.camel.spi.ConfigurerStrategy;
+import org.apache.camel.spi.GeneratedPropertyConfigurer;
+import org.apache.camel.util.CaseInsensitiveMap;
+import org.apache.camel.impl.console.ReceiveDevConsole;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@Generated("org.apache.camel.maven.packaging.GenerateConfigurerMojo")
+@SuppressWarnings("unchecked")
+public class ReceiveDevConsoleConfigurer extends 
org.apache.camel.support.component.PropertyConfigurerSupport implements 
GeneratedPropertyConfigurer, ExtendedPropertyConfigurerGetter {
+
+    private static final Map<String, Object> ALL_OPTIONS;
+    static {
+        Map<String, Object> map = new CaseInsensitiveMap();
+        map.put("BodyMaxChars", int.class);
+        map.put("CamelContext", org.apache.camel.CamelContext.class);
+        map.put("Capacity", int.class);
+        map.put("RemoveOnDump", boolean.class);
+        ALL_OPTIONS = map;
+        
ConfigurerStrategy.addBootstrapConfigurerClearer(ReceiveDevConsoleConfigurer::clearBootstrapConfigurers);
+    }
+
+    @Override
+    public boolean configure(CamelContext camelContext, Object obj, String 
name, Object value, boolean ignoreCase) {
+        org.apache.camel.impl.console.ReceiveDevConsole target = 
(org.apache.camel.impl.console.ReceiveDevConsole) obj;
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "bodymaxchars":
+        case "bodyMaxChars": target.setBodyMaxChars(property(camelContext, 
int.class, value)); return true;
+        case "camelcontext":
+        case "camelContext": target.setCamelContext(property(camelContext, 
org.apache.camel.CamelContext.class, value)); return true;
+        case "capacity": target.setCapacity(property(camelContext, int.class, 
value)); return true;
+        case "removeondump":
+        case "removeOnDump": target.setRemoveOnDump(property(camelContext, 
boolean.class, value)); return true;
+        default: return false;
+        }
+    }
+
+    @Override
+    public Map<String, Object> getAllOptions(Object target) {
+        return ALL_OPTIONS;
+    }
+
+    public static void clearBootstrapConfigurers() {
+        ALL_OPTIONS.clear();
+    }
+
+    @Override
+    public Class<?> getOptionType(String name, boolean ignoreCase) {
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "bodymaxchars":
+        case "bodyMaxChars": return int.class;
+        case "camelcontext":
+        case "camelContext": return org.apache.camel.CamelContext.class;
+        case "capacity": return int.class;
+        case "removeondump":
+        case "removeOnDump": return boolean.class;
+        default: return null;
+        }
+    }
+
+    @Override
+    public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
+        org.apache.camel.impl.console.ReceiveDevConsole target = 
(org.apache.camel.impl.console.ReceiveDevConsole) obj;
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "bodymaxchars":
+        case "bodyMaxChars": return target.getBodyMaxChars();
+        case "camelcontext":
+        case "camelContext": return target.getCamelContext();
+        case "capacity": return target.getCapacity();
+        case "removeondump":
+        case "removeOnDump": return target.isRemoveOnDump();
+        default: return null;
+        }
+    }
+}
+
diff --git 
a/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/receive.json
 
b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/receive.json
new file mode 100644
index 00000000000..978a1f23579
--- /dev/null
+++ 
b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/receive.json
@@ -0,0 +1,15 @@
+{
+  "console": {
+    "kind": "console",
+    "group": "camel",
+    "name": "receive",
+    "title": "Camel Receive",
+    "description": "Consume messages from endpoints",
+    "deprecated": false,
+    "javaType": "org.apache.camel.impl.console.ReceiveDevConsole",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-console",
+    "version": "4.9.0-SNAPSHOT"
+  }
+}
+
diff --git 
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.ReceiveDevConsole
 
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.ReceiveDevConsole
new file mode 100644
index 00000000000..3a11d5ff2c9
--- /dev/null
+++ 
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.ReceiveDevConsole
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.impl.console.ReceiveDevConsoleConfigurer
diff --git 
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/receive
 
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/receive
new file mode 100644
index 00000000000..8454801f9f9
--- /dev/null
+++ 
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/receive
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.impl.console.ReceiveDevConsole
diff --git 
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
 
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
index fd1b9890684..8d3e19577ae 100644
--- 
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
+++ 
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
@@ -1,5 +1,5 @@
 # Generated by camel build tools - do NOT edit this file!
-dev-consoles=bean blocked browse circuit-breaker consumer context debug 
endpoint event gc health inflight java-security jvm log memory properties 
reload rest route route-controller route-dump service source startup-recorder 
thread top trace transformers type-converters variables
+dev-consoles=bean blocked browse circuit-breaker consumer context debug 
endpoint event gc health inflight java-security jvm log memory properties 
receive reload rest route route-controller route-dump service source 
startup-recorder thread top trace transformers type-converters variables
 groupId=org.apache.camel
 artifactId=camel-console
 version=4.9.0-SNAPSHOT
diff --git 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java
 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java
new file mode 100644
index 00000000000..ff3a3215e7b
--- /dev/null
+++ 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.console;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.Configurer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.DevConsole;
+import org.apache.camel.support.ExceptionHelper;
+import org.apache.camel.support.MessageHelper;
+import org.apache.camel.support.PatternHelper;
+import org.apache.camel.support.console.AbstractDevConsole;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import org.apache.camel.util.json.Jsoner;
+
+@DevConsole(name = "receive", displayName = "Camel Receive", description = 
"Consume messages from endpoints")
+@Configurer(bootstrap = true, extended = true)
+public class ReceiveDevConsole extends AbstractDevConsole {
+
+    @Metadata(defaultValue = "100",
+              description = "Maximum capacity of last number of messages to 
capture (capacity must be between 50 and 1000)")
+    private int capacity = 100;
+    @Metadata(defaultValue = "32768", label = "advanced",
+              description = "To limit the message body to a maximum size in 
the received message. Use 0 or negative value to use unlimited size.")
+    private int bodyMaxChars = 32 * 1024;
+    @Metadata(defaultValue = "true", label = "advanced",
+              description = "Whether all received messages should be removed 
when dumping. By default, the messages are removed, which means that dumping 
will not contain previous dumped messages.")
+    private boolean removeOnDump = true;
+
+    /**
+     * Whether to enable or disable receive mode
+     */
+    public static final String ENABLED = "enabled";
+
+    /**
+     * Whether to dump received messages
+     */
+    public static final String DUMP = "dump";
+
+    /**
+     * Endpoint for where to receive messages (can also refer to a route id, 
endpoint pattern).
+     */
+    public static final String ENDPOINT = "endpoint";
+
+    private final List<Consumer> consumers = new ArrayList<>();
+    private final AtomicBoolean enabled = new AtomicBoolean();
+    private final AtomicLong uuid = new AtomicLong();
+    private Queue<JsonObject> queue;
+    private long firstTimestamp;
+    private long lastTimestamp;
+
+    public ReceiveDevConsole() {
+        super("camel", "receive", "Camel Receive", "Consume messages from 
endpoints");
+    }
+
+    public int getCapacity() {
+        return capacity;
+    }
+
+    public void setCapacity(int capacity) {
+        this.capacity = capacity;
+    }
+
+    public int getBodyMaxChars() {
+        return bodyMaxChars;
+    }
+
+    public void setBodyMaxChars(int bodyMaxChars) {
+        this.bodyMaxChars = bodyMaxChars;
+    }
+
+    public boolean isRemoveOnDump() {
+        return removeOnDump;
+    }
+
+    public void setRemoveOnDump(boolean removeOnDump) {
+        this.removeOnDump = removeOnDump;
+    }
+
+    @Override
+    protected void doInit() throws Exception {
+        if (capacity > 1000 || capacity < 50) {
+            throw new IllegalArgumentException("Capacity must be between 50 
and 1000");
+        }
+        this.queue = new LinkedBlockingQueue<>(capacity);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        stopConsumers();
+    }
+
+    protected void stopConsumers() {
+        for (Consumer c : consumers) {
+            ServiceHelper.stopAndShutdownServices(c);
+        }
+        consumers.clear();
+    }
+
+    protected String doCallText(Map<String, Object> options) {
+        StringBuilder sb = new StringBuilder();
+
+        String dump = (String) options.get(DUMP);
+        if ("true".equals(dump)) {
+            JsonArray arr = new JsonArray();
+            arr.addAll(queue);
+            if (removeOnDump) {
+                queue.clear();
+            }
+            JsonObject jo = (JsonObject) arr.get(0);
+            firstTimestamp = jo.getLongOrDefault("timestamp", 0);
+            jo = (JsonObject) arr.get(arr.size() - 1);
+            lastTimestamp = jo.getLongOrDefault("timestamp", 0);
+            String json = arr.toJson();
+            sb.append(json).append("\n");
+            return sb.toString();
+        }
+
+        String enabled = (String) options.get(ENABLED);
+        if ("false".equals(enabled)) {
+            // turn off all consumers
+            stopConsumers();
+            this.enabled.set(false);
+            sb.append("Enabled: ").append("false").append("\n");
+            return sb.toString();
+        }
+
+        String pattern = (String) options.get(ENDPOINT);
+        if (pattern != null) {
+            try {
+                Endpoint target = findMatchingEndpoint(getCamelContext(), 
pattern);
+                if (target != null) {
+                    sb.append("Starting to receive messages from: 
").append(target.getEndpointUri());
+                    Consumer consumer = createConsumer(target);
+                    if (!consumers.contains(consumer)) {
+                        consumers.add(consumer);
+                        ServiceHelper.startService(consumer);
+                    }
+                }
+                this.enabled.set(true);
+            } catch (Exception e) {
+                sb.append("Error starting to receive messages due to: 
").append(e.getMessage());
+            }
+        }
+
+        sb.append("Enabled: ").append(this.enabled.get()).append("\n");
+        sb.append("Total: ").append(this.uuid.get()).append("\n");
+        for (Consumer c : consumers) {
+            sb.append("    ").append(c.getEndpoint().toString()).append("\n");
+        }
+        return sb.toString();
+    }
+
+    protected JsonObject doCallJson(Map<String, Object> options) {
+        JsonObject root = new JsonObject();
+
+        String dump = (String) options.get(DUMP);
+        if ("true".equals(dump)) {
+            JsonArray arr = new JsonArray();
+            arr.addAll(queue);
+            if (removeOnDump) {
+                queue.clear();
+            }
+            root.put("messages", arr);
+            JsonObject jo = (JsonObject) arr.get(0);
+            firstTimestamp = jo.getLongOrDefault("timestamp", 0);
+            jo = (JsonObject) arr.get(arr.size() - 1);
+            lastTimestamp = jo.getLongOrDefault("timestamp", 0);
+            return root;
+        }
+
+        String enabled = (String) options.get(ENABLED);
+        if ("false".equals(enabled)) {
+            // turn off all consumers
+            stopConsumers();
+            this.enabled.set(false);
+            root.put("enabled", false);
+            return root;
+        }
+
+        String pattern = (String) options.get(ENDPOINT);
+        if (pattern != null) {
+            try {
+                Endpoint target = findMatchingEndpoint(getCamelContext(), 
pattern);
+                if (target != null) {
+                    root.put("url", target.getEndpointUri());
+                    Consumer consumer = createConsumer(target);
+                    if (!consumers.contains(consumer)) {
+                        consumers.add(consumer);
+                        ServiceHelper.startService(consumer);
+                    }
+                }
+                this.enabled.set(true);
+            } catch (Exception e) {
+                root.put("error", Jsoner.escape(e.getMessage()));
+                JsonArray arr2 = new JsonArray();
+                final String trace = ExceptionHelper.stackTraceToString(e);
+                root.put("stackTrace", arr2);
+                Collections.addAll(arr2, trace.split("\n"));
+            }
+        }
+
+        root.put("enabled", this.enabled.get());
+        root.put("total", uuid.get());
+        root.put("firstTimestamp", firstTimestamp);
+        root.put("lastTimestamp", lastTimestamp);
+
+        JsonArray arr = new JsonArray();
+        for (Consumer c : consumers) {
+            JsonObject jo = new JsonObject();
+            jo.put("uri", c.getEndpoint().toString());
+            jo.put("remote", c.getEndpoint().isRemote());
+            arr.add(jo);
+        }
+        if (!arr.isEmpty()) {
+            root.put("endpoints", arr);
+        }
+        return root;
+    }
+
+    private Consumer createConsumer(Endpoint target) throws Exception {
+        for (Consumer c : consumers) {
+            if (c.getEndpoint() == target) {
+                return c;
+            }
+        }
+        return target.createConsumer(this::addMessage);
+    }
+
+    private void addMessage(Exchange exchange) {
+        JsonObject json
+                = MessageHelper.dumpAsJSonObject(exchange.getMessage(), false, 
false, true, true, true, true, bodyMaxChars);
+        json.put("uid", uuid.incrementAndGet());
+        json.put("endpointUri", exchange.getFromEndpoint().toString());
+        json.put("remoteEndpoint", exchange.getFromEndpoint().isRemote());
+        lastTimestamp = exchange.getMessage().getMessageTimestamp();
+        json.put("timestamp", lastTimestamp);
+
+        // ensure there is space on the queue by polling until at least single 
slot is free
+        int drain = queue.size() - capacity + 1;
+        if (drain > 0) {
+            for (int i = 0; i < drain; i++) {
+                queue.poll();
+            }
+        }
+        queue.add(json);
+    }
+
+    protected static Endpoint findMatchingEndpoint(CamelContext camelContext, 
String endpoint) {
+        Endpoint target = null;
+        boolean scheme = endpoint.contains(":");
+        boolean pattern = endpoint.endsWith("*");
+        if (!scheme || pattern) {
+            if (!scheme && !endpoint.endsWith("*")) {
+                endpoint = endpoint + "*";
+            }
+            // find all producers for this camel context via JMX mbeans (this 
allows to find also producers created via dynamic EIPs)
+            MBeanServer mbeanServer = 
camelContext.getManagementStrategy().getManagementAgent().getMBeanServer();
+            if (mbeanServer != null) {
+                try {
+                    String jmxDomain
+                            = 
camelContext.getManagementStrategy().getManagementAgent().getMBeanObjectDomainName();
+                    String prefix
+                            = 
camelContext.getManagementStrategy().getManagementAgent().getIncludeHostName() 
? "*/" : "";
+                    ObjectName query = ObjectName.getInstance(
+                            jmxDomain + ":context=" + prefix + 
camelContext.getManagementName() + ",type=producers,*");
+                    Set<ObjectName> set = mbeanServer.queryNames(query, null);
+                    if (set != null && !set.isEmpty()) {
+                        for (ObjectName on : set) {
+                            String uri = (String) mbeanServer.getAttribute(on, 
"EndpointUri");
+                            if (PatternHelper.matchPattern(uri, endpoint)) {
+                                // is the endpoint able to create a consumer
+                                target = camelContext.getEndpoint(uri);
+                                // is the target able to create a consumer
+                                org.apache.camel.spi.UriEndpoint ann
+                                        = 
ObjectHelper.getAnnotationDeep(target, org.apache.camel.spi.UriEndpoint.class);
+                                if (ann != null) {
+                                    if (ann.producerOnly()) {
+                                        // skip if the endpoint cannot consume 
(we need to be able to consume to receive)
+                                        target = null;
+                                    }
+                                    if ("*".equals(endpoint) && !ann.remote()) 
{
+                                        // skip internal when matching 
everything
+                                        target = null;
+                                    }
+                                }
+                                if (target != null) {
+                                    break;
+                                }
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+        } else {
+            target = camelContext.getEndpoint(endpoint);
+            // is the target able to create a consumer
+            org.apache.camel.spi.UriEndpoint ann
+                    = ObjectHelper.getAnnotationDeep(target, 
org.apache.camel.spi.UriEndpoint.class);
+            if (ann != null) {
+                if (ann.producerOnly()) {
+                    // skip if the endpoint cannot consume (we need to be able 
to consume to receive)
+                    throw new IllegalArgumentException("Cannot consume from 
endpoint: " + endpoint);
+                }
+            }
+        }
+        return target;
+    }
+
+}
diff --git 
a/core/camel-util/src/main/java/org/apache/camel/util/ObjectHelper.java 
b/core/camel-util/src/main/java/org/apache/camel/util/ObjectHelper.java
index e82dc4e21c1..941188187bc 100644
--- a/core/camel-util/src/main/java/org/apache/camel/util/ObjectHelper.java
+++ b/core/camel-util/src/main/java/org/apache/camel/util/ObjectHelper.java
@@ -1103,6 +1103,25 @@ public final class ObjectHelper {
         return instance.getClass().getAnnotation(type);
     }
 
+    /**
+     * Gets the annotation from the given instance (searching super classes 
also).
+     *
+     * @param  instance the instance
+     * @param  type     the annotation
+     * @return          the annotation, or <tt>null</tt> if the instance does 
not have the given annotation
+     */
+    public static <A extends java.lang.annotation.Annotation> A 
getAnnotationDeep(Object instance, Class<A> type) {
+        Class<?> clazz = instance.getClass();
+        while (clazz != Object.class) {
+            A ann = clazz.getAnnotation(type);
+            if (ann != null) {
+                return ann;
+            }
+            clazz = clazz.getSuperclass();
+        }
+        return null;
+    }
+
     /**
      * Converts the given value to the required type or throw a meaningful 
exception
      */
diff --git a/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc 
b/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc
index ea752eeb077..f25867ed324 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc
@@ -1399,11 +1399,74 @@ For example to poll a message from a ActiveMQ queue 
named cheese you can do:
 
 [source,bash]
 ----
-$ camel cmd send --poll --endpoint=activemq:cheese
+$ camel cmd send --poll --endpoint='activemq:cheese'
 ----
 
 When you poll then you do not send any payload (body or headers).
 
+=== Receiving messages via Camel
+
+*Available since Camel 4.9*
+
+When building a prototype integration with Camel JBang, you may route messages 
to external systems.
+To know whether messages are being routed correctly, you may use system 
consoles to look inside these systems
+which messages have arrived, such as SQL prompts, web consoles, CLI tools etc.
+
+The Camel JBang now comes with a new command to receive messages from remote 
endpoints.
+This can be used to quickly look or tail in terminal the messages that an 
external systems has received.
+Camel does this by consuming the messages (if the component has support for 
consumer) and then let Camel JBang dump the messages from the CLI.
+
+For example to start dumping all messages from ActiveMQ in one command, you 
can do:
+
+[source,bash]
+----
+$ camel cmd receive --endpoint='activemq:cheese'
+----
+
+You can also use pattern syntax for the endpoint, so suppose you have the 
following route:
+
+[source,java]
+----
+from("ftp:myserver:1234/foo")
+  .to("log:order")
+  .to("activemq:orders");
+----
+
+Then you can tell Camel to automatic start receiving messages with:
+
+[source,bash]
+----
+$ camel cmd receive --action=start
+----
+
+TIP: You can enable and disable this mode with `--action=start` and 
`--action-stop`.
+
+Then Camel will automatically discover from the running integration, all the 
_producers_ and
+find the first _producer_ that is remote and also has consumer support. In the 
example above,
+that is the `activemq` component, and thus Camel will start receive from 
`activemq:orders`.
+
+You can see the status via:
+
+[source,bash]
+----
+$ camel cmd receive
+ PID   NAME   AGE   STATUS    TOTAL  SINCE  ENDPOINT
+  4364  foo   1m33s  Enabled     18     2s  activemq://orders
+----
+
+You can then dump all the received messages with:
+
+[source,bash]
+----
+$ camel cmd receive --action=dump
+----
+
+This will dump all the messages, and continue to dump new incoming messages. 
Use (ctrl + c) to break and exit.
+You can turn follow off with `--follow=false`.
+
+TIP: Use `camel cmd receive --help` to see all the various options for this 
command.
+
+
 === Controlling local Camel integrations
 
 To list the currently running Camel integrations, you use the `ps` command:
diff --git 
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
 
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
index 4a74f68d444..279eecc1f0c 100644
--- 
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
+++ 
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
@@ -114,8 +114,10 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
     private File actionFile;
     private File outputFile;
     private File traceFile;
+    private long traceFilePos;   // keep track of trace offset
     private File debugFile;
-    private long traceFilePos; // keep track of trace offset
+    private File receiveFile;
+    private long receiveFilePos; // keep track of receive offset
     private byte[] lastSource;
     private ExpressionDefinition lastSourceExpression;
 
@@ -180,6 +182,7 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
             outputFile = createLockFile(lockFile.getName() + "-output.json");
             traceFile = createLockFile(lockFile.getName() + "-trace.json");
             debugFile = createLockFile(lockFile.getName() + "-debug.json");
+            receiveFile = createLockFile(lockFile.getName() + "-receive.json");
             executor.scheduleWithFixedDelay(this::task, 0, delay, 
TimeUnit.MILLISECONDS);
             LOG.info("Camel JBang CLI enabled");
         } else {
@@ -223,7 +226,7 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
         actionTask();
         statusTask();
         // only run this every 2nd time as gathering this data has more 
overhead
-        // and are only needed when doing tracing or debugging
+        // and are only needed when doing tracing/debugging/receive
         if (++counter % 2 == 0) {
             traceTask();
         }
@@ -280,6 +283,8 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
                 doActionTraceTask(root);
             } else if ("browse".equals(action)) {
                 doActionBrowseTask(root);
+            } else if ("receive".equals(action)) {
+                doActionReceiveTask(root);
             }
         } catch (Exception e) {
             // ignore
@@ -811,6 +816,24 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
         }
     }
 
+    private void doActionReceiveTask(JsonObject root) throws IOException {
+        DevConsole dc = 
camelContext.getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class)
+                .resolveById("receive");
+        if (dc != null) {
+            JsonObject json;
+            String endpoint = root.getString("endpoint");
+            if (endpoint != null) {
+                json = (JsonObject) dc.call(DevConsole.MediaType.JSON, 
Map.of("enabled", "true", "endpoint", endpoint));
+            } else {
+                json = (JsonObject) dc.call(DevConsole.MediaType.JSON, 
Map.of("enabled", "false"));
+            }
+            LOG.trace("Updating output file: {}", outputFile);
+            IOHelper.writeText(json.toJson(), outputFile);
+        } else {
+            IOHelper.writeText("{}", outputFile);
+        }
+    }
+
     private void doActionBeanTask(JsonObject root) throws IOException {
         String filter = root.getStringOrDefault("filter", "");
         String properties = root.getStringOrDefault("properties", "true");
@@ -1132,6 +1155,13 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
                         root.put("main-configuration", json);
                     }
                 }
+                DevConsole dc23 = dcr.resolveById("receive");
+                if (dc23 != null) {
+                    JsonObject json = (JsonObject) 
dc23.call(DevConsole.MediaType.JSON);
+                    if (json != null && !json.isEmpty()) {
+                        root.put("receive", json);
+                    }
+                }
             }
             // various details
             JsonObject mem = collectMemory();
@@ -1206,6 +1236,33 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
             LOG.trace("Error updating debug file: {} due to: {}. This 
exception is ignored.",
                     debugFile, e.getMessage(), e);
         }
+        try {
+            DevConsole dc14 = 
camelContext.getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class)
+                    .resolveById("receive");
+            if (dc14 != null) {
+                JsonObject json = (JsonObject) 
dc14.call(DevConsole.MediaType.JSON, Map.of("dump", "true"));
+                JsonArray arr = json.getCollection("messages");
+                // filter based on last uid
+                if (receiveFilePos > 0) {
+                    arr.removeIf(r -> {
+                        JsonObject jo = (JsonObject) r;
+                        return jo.getLong("uid") <= receiveFilePos;
+                    });
+                }
+                if (arr != null && !arr.isEmpty()) {
+                    // store messages in a special file
+                    LOG.trace("Updating receive file: {}", receiveFile);
+                    String data = json.toJson() + System.lineSeparator();
+                    IOHelper.appendText(data, receiveFile);
+                    json = arr.getMap(arr.size() - 1);
+                    receiveFilePos = json.getLong("uid");
+                }
+            }
+        } catch (Exception e) {
+            // ignore
+            LOG.trace("Error updating receive file: {} due to: {}. This 
exception is ignored.",
+                    receiveFile, e.getMessage(), e);
+        }
     }
 
     private JsonObject collectMemory() {
@@ -1331,6 +1388,9 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
         if (debugFile != null) {
             FileUtil.deleteFile(debugFile);
         }
+        if (receiveFile != null) {
+            FileUtil.deleteFile(receiveFile);
+        }
         if (executor != null) {
             camelContext.getExecutorServiceManager().shutdown(executor);
             executor = null;
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java
index 41ae9f33f67..7413250e008 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java
@@ -110,6 +110,10 @@ public abstract class CamelCommand implements 
Callable<Integer> {
         return new File(CommandLineHelper.getCamelDir(), pid + "-trace.json");
     }
 
+    public File getReceiveFile(String pid) {
+        return new File(CommandLineHelper.getCamelDir(), pid + 
"-receive.json");
+    }
+
     public File getDebugFile(String pid) {
         return new File(CommandLineHelper.getCamelDir(), pid + "-debug.json");
     }
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
index 58d7b13e44f..2dd90261654 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
@@ -122,6 +122,7 @@ public class CamelJBangMain implements Callable<Integer> {
                         .addSubcommand("reset-stats", new CommandLine(new 
CamelResetStatsAction(main)))
                         .addSubcommand("reload", new CommandLine(new 
CamelReloadAction(main)))
                         .addSubcommand("send", new CommandLine(new 
CamelSendAction(main)))
+                        .addSubcommand("receive", new CommandLine(new 
CamelReceiveAction(main)))
                         .addSubcommand("browse", new CommandLine(new 
CamelBrowseAction(main)))
                         .addSubcommand("stub", new CommandLine(new 
CamelStubAction(main)))
                         .addSubcommand("thread-dump", new CommandLine(new 
CamelThreadDump(main)))
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
index 580118f5d60..9076f27f97d 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
@@ -212,16 +212,6 @@ public class CamelBrowseAction extends ActionBaseCommand {
         return 0;
     }
 
-    private static long getLongValueFromCollection(JsonArray arr, String key) {
-        for (Object o : arr) {
-            JsonObject jo = (JsonObject) o;
-            if (key.equalsIgnoreCase(jo.getString("key"))) {
-                return jo.getLong("value");
-            }
-        }
-        return 0;
-    }
-
     protected void dumpMessages(List<Row> rows, boolean onlyBody) {
         MessageTableHelper tableHelper = new MessageTableHelper();
         tableHelper.setPretty(pretty);
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java
new file mode 100644
index 00000000000..055b140e8bb
--- /dev/null
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java
@@ -0,0 +1,808 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dsl.jbang.core.commands.action;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.LineNumberReader;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.regex.Pattern;
+
+import com.github.freva.asciitable.AsciiTable;
+import com.github.freva.asciitable.Column;
+import com.github.freva.asciitable.HorizontalAlign;
+import com.github.freva.asciitable.OverflowBehaviour;
+import org.apache.camel.catalog.impl.TimePatternConverter;
+import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain;
+import org.apache.camel.dsl.jbang.core.common.PidNameAgeCompletionCandidates;
+import org.apache.camel.dsl.jbang.core.common.ProcessHelper;
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.StringHelper;
+import org.apache.camel.util.TimeUtils;
+import org.apache.camel.util.URISupport;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import org.apache.camel.util.json.Jsoner;
+import org.fusesource.jansi.Ansi;
+import picocli.CommandLine;
+
[email protected](name = "receive",
+                     description = "Receive and dump messages from remote 
endpoints", sortOptions = false)
+public class CamelReceiveAction extends ActionBaseCommand {
+
+    private static final int NAME_MAX_WIDTH = 25;
+    private static final int NAME_MIN_WIDTH = 10;
+
+    public static class PrefixCompletionCandidates implements Iterable<String> 
{
+
+        public PrefixCompletionCandidates() {
+        }
+
+        @Override
+        public Iterator<String> iterator() {
+            return List.of("auto", "true", "false").iterator();
+        }
+    }
+
+    public static class ActionCompletionCandidates implements Iterable<String> 
{
+
+        public ActionCompletionCandidates() {
+        }
+
+        @Override
+        public Iterator<String> iterator() {
+            return List.of("dump", "start", "stop", "status", 
"clear").iterator();
+        }
+    }
+
+    @CommandLine.Parameters(description = "Name or pid of running Camel 
integration. (default selects all)", arity = "0..1")
+    String name = "*";
+
+    @CommandLine.Option(names = { "--action" }, completionCandidates = 
ActionCompletionCandidates.class,
+                        defaultValue = "status",
+                        description = "Action to start, stop, clear, list 
status, or dump messages")
+    String action;
+
+    @CommandLine.Option(names = { "--endpoint" },
+                        description = "Endpoint to browse messages (can be uri 
or pattern to refer to existing endpoint)")
+    String endpoint;
+
+    @CommandLine.Option(names = { "--sort" }, completionCandidates = 
PidNameAgeCompletionCandidates.class,
+                        description = "Sort by pid, name or age for showing 
status of messages", defaultValue = "pid")
+    String sort;
+
+    @CommandLine.Option(names = { "--follow" }, defaultValue = "true",
+                        description = "Keep following and outputting new 
messages (use ctrl + c to exit).")
+    boolean follow = true;
+
+    @CommandLine.Option(names = { "--prefix" }, defaultValue = "auto",
+                        completionCandidates = 
PrefixCompletionCandidates.class,
+                        description = "Print prefix with running Camel 
integration name. auto=only prefix when running multiple integrations. 
true=always prefix. false=prefix off.")
+    String prefix = "auto";
+
+    @CommandLine.Option(names = { "--tail" }, defaultValue = "-1",
+                        description = "The number of messages from the end to 
show. Use -1 to read from the beginning. Use 0 to read only new lines. Defaults 
to showing all messages from beginning.")
+    int tail = -1;
+
+    @CommandLine.Option(names = { "--since" },
+                        description = "Return messages newer than a relative 
duration like 5s, 2m, or 1h. The value is in seconds if no unit specified.")
+    String since;
+
+    @CommandLine.Option(names = { "--find" },
+                        description = "Find and highlight matching text 
(ignore case).", arity = "0..*")
+    String[] find;
+
+    @CommandLine.Option(names = { "--grep" },
+                        description = "Filter messages to only output matching 
text (ignore case).", arity = "0..*")
+    String[] grep;
+
+    @CommandLine.Option(names = { "--show-headers" }, defaultValue = "true",
+                        description = "Show message headers in received 
messages")
+    boolean showHeaders = true;
+
+    @CommandLine.Option(names = { "--show-body" }, defaultValue = "true",
+                        description = "Show message body in received messages")
+    boolean showBody = true;
+
+    @CommandLine.Option(names = { "--only-body" }, defaultValue = "false",
+                        description = "Show only message body in received 
messages")
+    boolean onlyBody;
+
+    @CommandLine.Option(names = { "--logging-color" }, defaultValue = "true", 
description = "Use colored logging")
+    boolean loggingColor = true;
+
+    @CommandLine.Option(names = { "--compact" }, defaultValue = "true",
+                        description = "Compact output (no empty line 
separating messages)")
+    boolean compact = true;
+
+    @CommandLine.Option(names = { "--short-uri" },
+                        description = "List endpoint URI without query 
parameters (short)")
+    boolean shortUri;
+
+    @CommandLine.Option(names = { "--wide-uri" },
+                        description = "List endpoint URI in full details")
+    boolean wideUri;
+
+    @CommandLine.Option(names = { "--mask" },
+                        description = "Whether to mask endpoint URIs to avoid 
printing sensitive information such as password or access keys")
+    boolean mask;
+
+    @CommandLine.Option(names = { "--pretty" },
+                        description = "Pretty print message body when using 
JSon or XML format")
+    boolean pretty;
+
+    String findAnsi;
+    private int nameMaxWidth;
+    private boolean prefixShown;
+    private MessageTableHelper tableHelper;
+    private final Map<String, Ansi.Color> nameColors = new HashMap<>();
+
+    public CamelReceiveAction(CamelJBangMain main) {
+        super(main);
+    }
+
+    @Override
+    public Integer doCall() throws Exception {
+        boolean autoDump = false;
+        if (endpoint != null) {
+            // if using --endpoint then action should be start and auto-dump
+            action = "start";
+            autoDump = true;
+        }
+
+        if ("dump".equals(action)) {
+            return doDumpCall();
+        } else if ("status".equals(action)) {
+            return doStatusCall();
+        }
+
+        List<Long> pids = findPids(name);
+        for (long pid : pids) {
+            if ("clear".equals(action)) {
+                File f = getReceiveFile("" + pid);
+                if (f.exists()) {
+                    IOHelper.writeText("{}", f);
+                }
+            } else {
+                // ensure output file is deleted before executing action
+                File outputFile = getOutputFile(Long.toString(pid));
+                FileUtil.deleteFile(outputFile);
+
+                JsonObject root = new JsonObject();
+                root.put("action", "receive");
+                if ("start".equals(action)) {
+                    root.put("enabled", "true");
+                    if (endpoint != null) {
+                        root.put("endpoint", endpoint);
+                    } else {
+                        root.put("endpoint", "*");
+                    }
+                } else if ("stop".equals(action)) {
+                    root.put("enabled", "false");
+                }
+                File f = getActionFile(Long.toString(pid));
+                IOHelper.writeText(root.toJson(), f);
+
+                JsonObject jo = waitForOutputFile(outputFile);
+                if (jo != null) {
+                    String error = jo.getString("error");
+                    if (error != null) {
+                        error = Jsoner.unescape(error);
+                        String url = jo.getString("url");
+                        List<String> stackTrace = 
jo.getCollection("stackTrace");
+                        if (url != null) {
+                            printer().println("Error starting to receive 
messages from: " + url + " due to: " + error);
+
+                        } else {
+                            printer().println("Error starting to receive 
messages due to: " + error);
+                        }
+                        printer().println(StringHelper.fillChars('-', 120));
+                        printer().println(StringHelper.padString(1, 55) + 
"STACK-TRACE");
+                        printer().println(StringHelper.fillChars('-', 120));
+                        StringBuilder sb = new StringBuilder();
+                        for (int i = 0; i < stackTrace.size(); i++) {
+                            sb.append(String.format("\t%s%n", 
stackTrace.get(i)));
+                        }
+                        printer().println(String.valueOf(sb));
+                    }
+                }
+            }
+        }
+
+        if (autoDump) {
+            return doDumpCall();
+        }
+
+        return 0;
+    }
+
+    protected JsonObject waitForOutputFile(File outputFile) {
+        return getJsonObject(outputFile);
+    }
+
+    protected Integer doStatusCall() {
+        List<StatusRow> rows = new ArrayList<>();
+
+        List<Long> pids = findPids(name);
+        ProcessHandle.allProcesses()
+                .filter(ph -> pids.contains(ph.pid()))
+                .forEach(ph -> {
+                    JsonObject root = loadStatus(ph.pid());
+                    if (root != null) {
+                        StatusRow row = new StatusRow();
+                        JsonObject context = (JsonObject) root.get("context");
+                        if (context == null) {
+                            return;
+                        }
+                        row.name = context.getString("name");
+                        if ("CamelJBang".equals(row.name)) {
+                            row.name = ProcessHelper.extractName(root, ph);
+                        }
+                        row.pid = Long.toString(ph.pid());
+                        row.uptime = extractSince(ph);
+                        row.age = TimeUtils.printSince(row.uptime);
+                        JsonObject jo = root.getMap("receive");
+                        if (jo != null) {
+                            row.enabled = jo.getBoolean("enabled");
+                            row.counter = jo.getLong("total");
+                            row.firstTimestamp = 
jo.getLongOrDefault("firstTimestamp", 0);
+                            row.lastTimestamp = 
jo.getLongOrDefault("lastTimestamp", 0);
+                            JsonArray arr = jo.getCollection("endpoints");
+                            if (arr != null) {
+                                for (Object e : arr) {
+                                    jo = (JsonObject) e;
+                                    row.uri = jo.getString("uri");
+                                    if (mask) {
+                                        row.uri = 
URISupport.sanitizeUri(row.uri);
+                                    }
+                                    rows.add(row);
+                                    row = row.copy();
+                                }
+                            } else {
+                                rows.add(row);
+                            }
+                        }
+                    }
+                });
+
+        // sort rows
+        rows.sort(this::sortStatusRow);
+
+        if (!rows.isEmpty()) {
+            printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, 
Arrays.asList(
+                    new 
Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid),
+                    new 
Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, 
OverflowBehaviour.ELLIPSIS_RIGHT)
+                            .with(r -> r.name),
+                    new 
Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age),
+                    new Column().header("STATUS").with(this::getStatus),
+                    new Column().header("TOTAL").with(r -> r.enabled ? "" + 
r.counter : ""),
+                    new 
Column().header("SINCE").headerAlign(HorizontalAlign.CENTER)
+                            .with(this::getMessageAgo),
+                    new 
Column().header("ENDPOINT").visible(!wideUri).dataAlign(HorizontalAlign.LEFT)
+                            .maxWidth(90, OverflowBehaviour.ELLIPSIS_RIGHT)
+                            .with(this::getEndpointUri),
+                    new 
Column().header("ENDPOINT").visible(wideUri).dataAlign(HorizontalAlign.LEFT)
+                            .maxWidth(140, OverflowBehaviour.NEWLINE)
+                            .with(r -> r.uri))));
+        }
+
+        return 0;
+    }
+
+    private String getStatus(StatusRow r) {
+        if (r.enabled) {
+            return "Enabled";
+        }
+        return "Disabled";
+    }
+
+    protected int sortStatusRow(StatusRow o1, StatusRow o2) {
+        String s = sort;
+        int negate = 1;
+        if (s.startsWith("-")) {
+            s = s.substring(1);
+            negate = -1;
+        }
+        switch (s) {
+            case "pid":
+                return Long.compare(Long.parseLong(o1.pid), 
Long.parseLong(o2.pid)) * negate;
+            case "name":
+                return o1.name.compareToIgnoreCase(o2.name) * negate;
+            case "age":
+                return Long.compare(o1.uptime, o2.uptime) * negate;
+            default:
+                return 0;
+        }
+    }
+
+    protected Integer doDumpCall() throws Exception {
+        // setup table helper
+        tableHelper = new MessageTableHelper();
+        tableHelper.setPretty(pretty);
+        tableHelper.setLoggingColor(loggingColor);
+
+        Map<Long, Pid> pids = new LinkedHashMap<>();
+
+        // find new pids
+        updatePids(pids);
+        if (!pids.isEmpty()) {
+            // read existing received files (skip by tail/since)
+            if (find != null) {
+                findAnsi = 
Ansi.ansi().fg(Ansi.Color.BLACK).bg(Ansi.Color.YELLOW).a("$0").reset().toString();
+                for (int i = 0; i < find.length; i++) {
+                    String f = find[i];
+                    f = Pattern.quote(f);
+                    find[i] = f;
+                }
+            }
+            if (grep != null) {
+                findAnsi = 
Ansi.ansi().fg(Ansi.Color.BLACK).bg(Ansi.Color.YELLOW).a("$0").reset().toString();
+                for (int i = 0; i < grep.length; i++) {
+                    String f = grep[i];
+                    f = Pattern.quote(f);
+                    grep[i] = f;
+                }
+            }
+            Date limit = null;
+            if (since != null) {
+                long millis;
+                if (StringHelper.isDigit(since)) {
+                    // is in seconds by default
+                    millis = TimePatternConverter.toMilliSeconds(since) * 1000;
+                } else {
+                    millis = TimePatternConverter.toMilliSeconds(since);
+                }
+                limit = new Date(System.currentTimeMillis() - millis);
+            }
+            // dump existing messages
+            if (tail != 0) {
+                tailReceiveFiles(pids, tail);
+                dumpReceiveFiles(pids, tail, limit);
+            }
+        }
+
+        if (follow) {
+            boolean waitMessage = true;
+            StopWatch watch = new StopWatch();
+            boolean more = true;
+            boolean init = true;
+            do {
+                if (pids.isEmpty()) {
+                    if (waitMessage) {
+                        printer().println("Waiting for messages ...");
+                        waitMessage = false;
+                    }
+                    Thread.sleep(500);
+                    updatePids(pids);
+                } else {
+                    waitMessage = true;
+                    if (watch.taken() > 500) {
+                        // check for new messages
+                        updatePids(pids);
+                        watch.restart();
+                    }
+                    int lines = readReceiveFiles(pids);
+                    if (lines > 0) {
+                        more = dumpReceiveFiles(pids, 0, null);
+                        init = false;
+                    } else if (lines == 0) {
+                        if (init) {
+                            printer().println("Waiting for messages ...");
+                            init = false;
+                        }
+                        Thread.sleep(100);
+                    } else {
+                        break;
+                    }
+                }
+            } while (more);
+        }
+
+        return 0;
+    }
+
+    private void tailReceiveFiles(Map<Long, Pid> pids, int tail) throws 
Exception {
+        for (Pid pid : pids.values()) {
+            File file = getReceiveFile(pid.pid);
+            if (file.exists() && file.length() > 0) {
+                pid.reader = new LineNumberReader(new FileReader(file));
+                String line;
+                if (tail <= 0) {
+                    pid.fifo = new ArrayDeque<>();
+                } else {
+                    pid.fifo = new ArrayBlockingQueue<>(tail);
+                }
+                do {
+                    line = pid.reader.readLine();
+                    if (line != null) {
+                        while (!pid.fifo.offer(line)) {
+                            pid.fifo.poll();
+                        }
+                    }
+                } while (line != null);
+            }
+        }
+    }
+
+    private void updatePids(Map<Long, Pid> rows) {
+        List<Long> pids = findPids(name);
+        ProcessHandle.allProcesses()
+                .filter(ph -> pids.contains(ph.pid()))
+                .forEach(ph -> {
+                    JsonObject root = loadStatus(ph.pid());
+                    if (root != null) {
+                        Pid row = new Pid();
+                        row.pid = Long.toString(ph.pid());
+                        JsonObject context = (JsonObject) root.get("context");
+                        if (context == null) {
+                            return;
+                        }
+                        row.name = context.getString("name");
+                        if ("CamelJBang".equals(row.name)) {
+                            row.name = ProcessHelper.extractName(root, ph);
+                        }
+                        int len = row.name.length();
+                        if (len < NAME_MIN_WIDTH) {
+                            len = NAME_MIN_WIDTH;
+                        }
+                        if (len > NAME_MAX_WIDTH) {
+                            len = NAME_MAX_WIDTH;
+                        }
+                        if (len > nameMaxWidth) {
+                            nameMaxWidth = len;
+                        }
+                        if (!rows.containsKey(ph.pid())) {
+                            rows.put(ph.pid(), row);
+                        }
+                    }
+                });
+
+        // remove pids that are no long active from the rows
+        Set<Long> remove = new HashSet<>();
+        for (long pid : rows.keySet()) {
+            if (!pids.contains(pid)) {
+                remove.add(pid);
+            }
+        }
+        for (long pid : remove) {
+            rows.remove(pid);
+        }
+    }
+
+    private int readReceiveFiles(Map<Long, Pid> pids) throws Exception {
+        int lines = 0;
+
+        for (Pid pid : pids.values()) {
+            if (pid.reader == null) {
+                File file = getReceiveFile(pid.pid);
+                if (file.exists()) {
+                    pid.reader = new LineNumberReader(new FileReader(file));
+                    if (tail == 0) {
+                        // only read new lines so forward to end of reader
+                        long size = file.length();
+                        pid.reader.skip(size);
+                    }
+                }
+            }
+            if (pid.reader != null) {
+                String line;
+                do {
+                    try {
+                        line = pid.reader.readLine();
+                        if (line != null) {
+                            lines++;
+                            // switch fifo to be unlimited as we use it for 
new messages
+                            if (pid.fifo == null || pid.fifo instanceof 
ArrayBlockingQueue) {
+                                pid.fifo = new ArrayDeque<>();
+                            }
+                            pid.fifo.offer(line);
+                        }
+                    } catch (IOException e) {
+                        // ignore
+                        line = null;
+                    }
+                } while (line != null);
+            }
+        }
+
+        return lines;
+    }
+
+    private List<Row> parseReceiveLine(Pid pid, String line) {
+        JsonObject root = null;
+        try {
+            root = (JsonObject) Jsoner.deserialize(line);
+        } catch (Exception e) {
+            // ignore
+        }
+        if (root != null) {
+            List<Row> rows = new ArrayList<>();
+            JsonArray arr = root.getCollection("messages");
+            if (arr != null) {
+                for (Object o : arr) {
+                    Row row = new Row(pid);
+                    row.pid = pid.pid;
+                    row.name = pid.name;
+                    JsonObject jo = (JsonObject) o;
+                    row.uid = jo.getLong("uid");
+                    String uri = jo.getString("endpointUri");
+                    if (uri != null) {
+                        row.endpoint = new JsonObject();
+                        if (mask) {
+                            uri = URISupport.sanitizeUri(uri);
+                        }
+                        row.endpoint.put("endpoint", uri);
+                        row.endpoint.put("remote", 
jo.getBooleanOrDefault("remoteEndpoint", true));
+                    }
+                    JsonObject es = jo.getMap("endpointService");
+                    if (es != null) {
+                        row.endpointService = es;
+                    }
+                    Long ts = jo.getLong("timestamp");
+                    if (ts != null) {
+                        row.timestamp = ts;
+                    }
+                    row.message = jo.getMap("message");
+                    row.message.remove("exchangeId");
+                    row.message.remove("exchangePattern");
+                    row.message.remove("exchangeProperties");
+                    if (onlyBody) {
+                        row.message.remove("headers");
+                        row.message.remove("messageType");
+                    } else {
+                        if (!showHeaders) {
+                            row.message.remove("headers");
+                        }
+                        if (!showBody) {
+                            row.message.remove("body");
+                        }
+                    }
+                    rows.add(row);
+                }
+            }
+            return rows;
+        }
+        return null;
+    }
+
+    private boolean dumpReceiveFiles(Map<Long, Pid> pids, int tail, Date 
limit) {
+        Set<String> names = new HashSet<>();
+        List<Row> rows = new ArrayList<>();
+        for (Pid pid : pids.values()) {
+            Queue<String> queue = pid.fifo;
+            if (queue != null) {
+                for (String l : queue) {
+                    names.add(pid.name);
+                    List<Row> parsed = parseReceiveLine(pid, l);
+                    if (parsed != null && !parsed.isEmpty()) {
+                        rows.addAll(parsed);
+                    }
+                }
+                pid.fifo.clear();
+            }
+        }
+
+        // only sort if there are multiple Camels running
+        if (names.size() > 1) {
+            // sort lines
+            final Map<String, Long> lastTimestamp = new HashMap<>();
+            rows.sort((r1, r2) -> {
+                long t1 = r1.timestamp;
+                long t2 = r2.timestamp;
+                if (t1 == 0) {
+                    t1 = lastTimestamp.get(r1.name);
+                }
+                if (t1 == 0) {
+                    t1 = lastTimestamp.get(r2.name);
+                }
+                if (t1 == 0 && t2 == 0) {
+                    return 0;
+                } else if (t1 == 0) {
+                    return -1;
+                } else if (t2 == 0) {
+                    return 1;
+                }
+                lastTimestamp.put(r1.name, t1);
+                lastTimestamp.put(r2.name, t2);
+                return Long.compare(t1, t2);
+            });
+        }
+        if (tail > 0) {
+            // cut according to tail
+            int pos = rows.size() - tail;
+            if (pos > 0) {
+                rows = rows.subList(pos, rows.size());
+            }
+        }
+
+        for (Row r : rows) {
+            printDump(r.name, pids.size(), r, limit);
+        }
+        return true;
+    }
+
+    private boolean isValidGrep(String line) {
+        if (grep == null) {
+            return true;
+        }
+        for (String g : grep) {
+            boolean m = Pattern.compile("(?i)" + g).matcher(line).find();
+            if (m) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isValidSince(Date limit, long timestamp) {
+        if (limit == null || timestamp == 0) {
+            return true;
+        }
+        Date row = new Date(timestamp);
+        return row.compareTo(limit) >= 0;
+    }
+
+    protected void printDump(String name, int pids, Row row, Date limit) {
+        if (!prefixShown) {
+            // compute whether to show prefix or not
+            if ("false".equals(prefix) || "auto".equals(prefix) && pids <= 1) {
+                name = null;
+            }
+        }
+        prefixShown = name != null;
+
+        String data = getDataAsTable(row);
+        boolean valid = isValidSince(limit, row.timestamp) && 
isValidGrep(data);
+        if (!valid) {
+            return;
+        }
+
+        String nameWithPrefix = null;
+        if (name != null) {
+            if (loggingColor) {
+                Ansi.Color color = nameColors.get(name);
+                if (color == null) {
+                    // grab a new color
+                    int idx = (nameColors.size() % 6) + 1;
+                    color = Ansi.Color.values()[idx];
+                    nameColors.put(name, color);
+                }
+                String n = String.format("%-" + nameMaxWidth + "s", name);
+                nameWithPrefix = Ansi.ansi().fg(color).a(n).a("| 
").reset().toString();
+            } else {
+                nameWithPrefix = String.format("%-" + nameMaxWidth + "s", 
name) + "| ";
+            }
+            printer().print(nameWithPrefix);
+        }
+        // header
+        String header = String.format("Received Message: (%s)", row.uid);
+        if (loggingColor) {
+            
printer().println(Ansi.ansi().fgGreen().a(header).reset().toString());
+        } else {
+            printer().println(header);
+        }
+        String[] lines = data.split(System.lineSeparator());
+        if (lines.length > 0) {
+            for (String line : lines) {
+                if (find != null) {
+                    for (String f : find) {
+                        line = line.replaceAll("(?i)" + f, findAnsi);
+                    }
+                }
+                if (grep != null) {
+                    for (String g : grep) {
+                        line = line.replaceAll("(?i)" + g, findAnsi);
+                    }
+                }
+                if (nameWithPrefix != null) {
+                    printer().print(nameWithPrefix);
+                }
+                printer().print(" ");
+                printer().println(line);
+            }
+            if (!compact) {
+                if (nameWithPrefix != null) {
+                    printer().println(nameWithPrefix);
+                } else {
+                    // empty line
+                    printer().println();
+                }
+            }
+        }
+    }
+
+    private String getDataAsTable(Row r) {
+        return tableHelper.getDataAsTable(null, null, r.endpoint, 
r.endpointService, r.message, null);
+    }
+
+    protected String getEndpointUri(StatusRow r) {
+        String u = r.uri;
+        if (shortUri) {
+            int pos = u.indexOf('?');
+            if (pos > 0) {
+                u = u.substring(0, pos);
+            }
+        }
+        return u;
+    }
+
+    protected String getMessageAgo(StatusRow r) {
+        if (r.lastTimestamp > 0) {
+            return TimeUtils.printSince(r.lastTimestamp);
+        }
+        return "";
+    }
+
+    private static class Pid {
+        String pid;
+        String name;
+        Queue<String> fifo;
+        LineNumberReader reader;
+    }
+
+    private static class Row {
+        Pid parent;
+        String pid;
+        String name;
+        long uid;
+        long timestamp;
+        JsonObject endpoint;
+        JsonObject endpointService;
+        JsonObject message;
+
+        Row(Pid parent) {
+            this.parent = parent;
+        }
+
+    }
+
+    private static class StatusRow {
+        String pid;
+        String name;
+        String age;
+        long uptime;
+        boolean enabled;
+        long counter;
+        long firstTimestamp;
+        long lastTimestamp;
+        String uri;
+
+        StatusRow copy() {
+            try {
+                return (StatusRow) clone();
+            } catch (CloneNotSupportedException e) {
+                return null;
+            }
+        }
+    }
+
+}

Reply via email to