This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch rcmd
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 5126aad2a8c8446d00103d8e31bcdac058d71ce5
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Oct 9 16:26:45 2024 +0200

    CAMEL-21193: camel-jbang - Add listen command
---
 .../impl/console/ReceiveDevConsoleConfigurer.java  |  86 ++++++
 .../org/apache/camel/dev-console/receive.json      |  15 ++
 ...org.apache.camel.impl.console.ReceiveDevConsole |   2 +
 .../services/org/apache/camel/dev-console/receive  |   2 +
 .../org/apache/camel/dev-consoles.properties       |   2 +-
 .../camel/impl/console/ReceiveDevConsole.java      | 282 ++++++++++++++++++++
 .../camel/cli/connector/LocalCliConnector.java     |  57 +++-
 .../dsl/jbang/core/commands/CamelCommand.java      |   4 +
 .../core/commands/action/CamelBrowseAction.java    |  10 -
 .../core/commands/action/CamelReceiveAction.java   | 296 +++++++++++++++++++++
 10 files changed, 743 insertions(+), 13 deletions(-)

diff --git 
a/core/camel-console/src/generated/java/org/apache/camel/impl/console/ReceiveDevConsoleConfigurer.java
 
b/core/camel-console/src/generated/java/org/apache/camel/impl/console/ReceiveDevConsoleConfigurer.java
new file mode 100644
index 00000000000..0129a9fae6a
--- /dev/null
+++ 
b/core/camel-console/src/generated/java/org/apache/camel/impl/console/ReceiveDevConsoleConfigurer.java
@@ -0,0 +1,86 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.impl.console;
+
+import javax.annotation.processing.Generated;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
+import org.apache.camel.spi.PropertyConfigurerGetter;
+import org.apache.camel.spi.ConfigurerStrategy;
+import org.apache.camel.spi.GeneratedPropertyConfigurer;
+import org.apache.camel.util.CaseInsensitiveMap;
+import org.apache.camel.impl.console.ReceiveDevConsole;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@Generated("org.apache.camel.maven.packaging.GenerateConfigurerMojo")
+@SuppressWarnings("unchecked")
+public class ReceiveDevConsoleConfigurer extends 
org.apache.camel.support.component.PropertyConfigurerSupport implements 
GeneratedPropertyConfigurer, ExtendedPropertyConfigurerGetter {
+
+    private static final Map<String, Object> ALL_OPTIONS;
+    static {
+        Map<String, Object> map = new CaseInsensitiveMap();
+        map.put("BodyMaxChars", int.class);
+        map.put("CamelContext", org.apache.camel.CamelContext.class);
+        map.put("Capacity", int.class);
+        map.put("RemoveOnDump", boolean.class);
+        ALL_OPTIONS = map;
+        
ConfigurerStrategy.addBootstrapConfigurerClearer(ReceiveDevConsoleConfigurer::clearBootstrapConfigurers);
+    }
+
+    @Override
+    public boolean configure(CamelContext camelContext, Object obj, String 
name, Object value, boolean ignoreCase) {
+        org.apache.camel.impl.console.ReceiveDevConsole target = 
(org.apache.camel.impl.console.ReceiveDevConsole) obj;
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "bodymaxchars":
+        case "bodyMaxChars": target.setBodyMaxChars(property(camelContext, 
int.class, value)); return true;
+        case "camelcontext":
+        case "camelContext": target.setCamelContext(property(camelContext, 
org.apache.camel.CamelContext.class, value)); return true;
+        case "capacity": target.setCapacity(property(camelContext, int.class, 
value)); return true;
+        case "removeondump":
+        case "removeOnDump": target.setRemoveOnDump(property(camelContext, 
boolean.class, value)); return true;
+        default: return false;
+        }
+    }
+
+    @Override
+    public Map<String, Object> getAllOptions(Object target) {
+        return ALL_OPTIONS;
+    }
+
+    public static void clearBootstrapConfigurers() {
+        ALL_OPTIONS.clear();
+    }
+
+    @Override
+    public Class<?> getOptionType(String name, boolean ignoreCase) {
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "bodymaxchars":
+        case "bodyMaxChars": return int.class;
+        case "camelcontext":
+        case "camelContext": return org.apache.camel.CamelContext.class;
+        case "capacity": return int.class;
+        case "removeondump":
+        case "removeOnDump": return boolean.class;
+        default: return null;
+        }
+    }
+
+    @Override
+    public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
+        org.apache.camel.impl.console.ReceiveDevConsole target = 
(org.apache.camel.impl.console.ReceiveDevConsole) obj;
+        switch (ignoreCase ? name.toLowerCase() : name) {
+        case "bodymaxchars":
+        case "bodyMaxChars": return target.getBodyMaxChars();
+        case "camelcontext":
+        case "camelContext": return target.getCamelContext();
+        case "capacity": return target.getCapacity();
+        case "removeondump":
+        case "removeOnDump": return target.isRemoveOnDump();
+        default: return null;
+        }
+    }
+}
+
diff --git 
a/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/receive.json
 
b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/receive.json
new file mode 100644
index 00000000000..978a1f23579
--- /dev/null
+++ 
b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/receive.json
@@ -0,0 +1,15 @@
+{
+  "console": {
+    "kind": "console",
+    "group": "camel",
+    "name": "receive",
+    "title": "Camel Receive",
+    "description": "Consume messages from endpoints",
+    "deprecated": false,
+    "javaType": "org.apache.camel.impl.console.ReceiveDevConsole",
+    "groupId": "org.apache.camel",
+    "artifactId": "camel-console",
+    "version": "4.9.0-SNAPSHOT"
+  }
+}
+
diff --git 
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.ReceiveDevConsole
 
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.ReceiveDevConsole
new file mode 100644
index 00000000000..3a11d5ff2c9
--- /dev/null
+++ 
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.ReceiveDevConsole
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.impl.console.ReceiveDevConsoleConfigurer
diff --git 
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/receive
 
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/receive
new file mode 100644
index 00000000000..8454801f9f9
--- /dev/null
+++ 
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/receive
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.impl.console.ReceiveDevConsole
diff --git 
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
 
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
index fd1b9890684..8d3e19577ae 100644
--- 
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
+++ 
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
@@ -1,5 +1,5 @@
 # Generated by camel build tools - do NOT edit this file!
-dev-consoles=bean blocked browse circuit-breaker consumer context debug 
endpoint event gc health inflight java-security jvm log memory properties 
reload rest route route-controller route-dump service source startup-recorder 
thread top trace transformers type-converters variables
+dev-consoles=bean blocked browse circuit-breaker consumer context debug 
endpoint event gc health inflight java-security jvm log memory properties 
receive reload rest route route-controller route-dump service source 
startup-recorder thread top trace transformers type-converters variables
 groupId=org.apache.camel
 artifactId=camel-console
 version=4.9.0-SNAPSHOT
diff --git 
a/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java
 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java
new file mode 100644
index 00000000000..d1cd9b09c85
--- /dev/null
+++ 
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.console;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.spi.Configurer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.DevConsole;
+import org.apache.camel.support.EndpointHelper;
+import org.apache.camel.support.MessageHelper;
+import org.apache.camel.support.console.AbstractDevConsole;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+
+@DevConsole(name = "receive", displayName = "Camel Receive", description = 
"Consume messages from endpoints")
+@Configurer(bootstrap = true, extended = true)
+public class ReceiveDevConsole extends AbstractDevConsole {
+
+    @Metadata(defaultValue = "100",
+              description = "Maximum capacity of last number of messages to 
capture (capacity must be between 50 and 1000)")
+    private int capacity = 100;
+    @Metadata(defaultValue = "32768", label = "advanced",
+              description = "To limit the message body to a maximum size in 
the received message. Use 0 or negative value to use unlimited size.")
+    private int bodyMaxChars = 32 * 1024;
+    @Metadata(defaultValue = "true", label = "advanced",
+              description = "Whether all received messages should be removed 
when dumping. By default, the messages are removed, which means that dumping 
will not contain previous dumped messages.")
+    private boolean removeOnDump = true;
+
+    /**
+     * Whether to enable or disable receive mode
+     */
+    public static final String ENABLED = "enabled";
+
+    /**
+     * Whether to dump received messages
+     */
+    public static final String DUMP = "dump";
+
+    /**
+     * Endpoint for where to receive messages (can also refer to a route id, 
endpoint pattern).
+     */
+    public static final String ENDPOINT = "endpoint";
+
+    private final List<Consumer> consumers = new ArrayList<>();
+
+    private final AtomicBoolean enabled = new AtomicBoolean();
+    private final AtomicLong uuid = new AtomicLong();
+    private Queue<JsonObject> queue;
+
+    public ReceiveDevConsole() {
+        super("camel", "receive", "Camel Receive", "Consume messages from 
endpoints");
+    }
+
+    public int getCapacity() {
+        return capacity;
+    }
+
+    public void setCapacity(int capacity) {
+        this.capacity = capacity;
+    }
+
+    public int getBodyMaxChars() {
+        return bodyMaxChars;
+    }
+
+    public void setBodyMaxChars(int bodyMaxChars) {
+        this.bodyMaxChars = bodyMaxChars;
+    }
+
+    public boolean isRemoveOnDump() {
+        return removeOnDump;
+    }
+
+    public void setRemoveOnDump(boolean removeOnDump) {
+        this.removeOnDump = removeOnDump;
+    }
+
+    @Override
+    protected void doInit() throws Exception {
+        if (capacity > 1000 || capacity < 50) {
+            throw new IllegalArgumentException("Capacity must be between 50 
and 1000");
+        }
+        this.queue = new LinkedBlockingQueue<>(capacity);
+    }
+
+    protected void stopConsumers() {
+        for (Consumer c : consumers) {
+            ServiceHelper.stopAndShutdownServices(c);
+        }
+        consumers.clear();
+    }
+
+    protected String doCallText(Map<String, Object> options) {
+        StringBuilder sb = new StringBuilder();
+
+        String dump = (String) options.get(DUMP);
+        if ("true".equals(dump)) {
+            JsonArray arr = new JsonArray();
+            arr.addAll(queue);
+            if (removeOnDump) {
+                queue.clear();
+            }
+            String json = arr.toJson();
+            sb.append(json).append("\n");
+            return sb.toString();
+        }
+
+        String enabled = (String) options.get(ENABLED);
+        if ("false".equals(enabled)) {
+            // turn off all consumers
+            stopConsumers();
+            this.enabled.set(false);
+            sb.append("Enabled: ").append("false").append("\n");
+            return sb.toString();
+        }
+
+        String pattern = (String) options.get(ENDPOINT);
+        if (pattern != null) {
+            this.enabled.set(true);
+            Endpoint target = findMatchingEndpoint(getCamelContext(), pattern);
+            if (target != null) {
+                try {
+                    Consumer consumer = createConsumer(getCamelContext(), 
target);
+                    if (!consumers.contains(consumer)) {
+                        consumers.add(consumer);
+                        ServiceHelper.startService(consumer);
+                    }
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+        }
+
+        sb.append("Enabled: ").append(this.enabled.get()).append("\n");
+        sb.append("Total: ").append(this.uuid.get()).append("\n");
+        for (Consumer c : consumers) {
+            sb.append("    ").append(c.getEndpoint().toString()).append("\n");
+        }
+        return sb.toString();
+    }
+
+    private Consumer createConsumer(CamelContext camelContext, Endpoint 
target) throws Exception {
+        for (Consumer c : consumers) {
+            if (c.getEndpoint() == target) {
+                return c;
+            }
+        }
+        return target.createConsumer(this::addMessage);
+    }
+
+    private void addMessage(Exchange exchange) {
+        JsonObject json
+                = MessageHelper.dumpAsJSonObject(exchange.getMessage(), false, 
false, true, true, true, true, bodyMaxChars);
+        json.put("uuid", uuid.incrementAndGet());
+
+        // ensure there is space on the queue by polling until at least single 
slot is free
+        int drain = queue.size() - capacity + 1;
+        if (drain > 0) {
+            for (int i = 0; i < drain; i++) {
+                queue.poll();
+            }
+        }
+        queue.add(json);
+    }
+
+    protected Endpoint findMatchingEndpoint(CamelContext camelContext, String 
endpoint) {
+        // TODO: find all processors that are endpoint aware and match pattern
+
+
+        Endpoint target = null;
+        // is the endpoint a pattern or route id
+        boolean scheme = endpoint.contains(":");
+        boolean pattern = endpoint.endsWith("*");
+        if (!scheme || pattern) {
+            if (!scheme) {
+                endpoint = endpoint + "*";
+            }
+            for (Route route : camelContext.getRoutes()) {
+                // find last output
+                Endpoint e = route.getEndpoint();
+                if (EndpointHelper.matchEndpoint(camelContext, 
e.getEndpointUri(), endpoint)) {
+                    target = e;
+                    break;
+                }
+            }
+            if (target == null) {
+                // okay it may refer to a route id
+                for (Route route : camelContext.getRoutes()) {
+                    String id = route.getRouteId();
+                    Endpoint e = route.getEndpoint();
+                    if (EndpointHelper.matchEndpoint(camelContext, id, 
endpoint)) {
+                        target = e;
+                        break;
+                    }
+                }
+            }
+        } else {
+            target = camelContext.getEndpoint(endpoint);
+        }
+        return target;
+    }
+
+    protected JsonObject doCallJson(Map<String, Object> options) {
+        JsonObject root = new JsonObject();
+
+        String dump = (String) options.get(DUMP);
+        if ("true".equals(dump)) {
+            JsonArray arr = new JsonArray();
+            arr.addAll(queue);
+            if (removeOnDump) {
+                queue.clear();
+            }
+            root.put("messages", arr);
+            return root;
+        }
+
+        String enabled = (String) options.get(ENABLED);
+        if ("false".equals(enabled)) {
+            // turn off all consumers
+            stopConsumers();
+            this.enabled.set(false);
+            root.put("enabled", false);
+            return root;
+        }
+
+        String pattern = (String) options.get(ENDPOINT);
+        if (pattern != null) {
+            this.enabled.set(true);
+            Endpoint target = findMatchingEndpoint(getCamelContext(), pattern);
+            if (target != null) {
+                try {
+                    Consumer consumer = createConsumer(getCamelContext(), 
target);
+                    if (!consumers.contains(consumer)) {
+                        consumers.add(consumer);
+                        ServiceHelper.startService(consumer);
+                    }
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+        }
+
+        root.put("enabled", true);
+        root.put("total", uuid.get());
+        JsonArray arr = new JsonArray();
+        for (Consumer c : consumers) {
+            JsonObject jo = new JsonObject();
+            jo.put("uri", c.getEndpoint().toString());
+            arr.add(jo);
+        }
+        root.put("endpoints", arr);
+        return root;
+    }
+
+}
diff --git 
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
 
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
index 4a74f68d444..e7d6d3620e5 100644
--- 
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
+++ 
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
@@ -114,8 +114,10 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
     private File actionFile;
     private File outputFile;
     private File traceFile;
+    private long traceFilePos;   // keep track of trace offset
     private File debugFile;
-    private long traceFilePos; // keep track of trace offset
+    private File receiveFile;
+    private long receiveFilePos; // keep track of receive offset
     private byte[] lastSource;
     private ExpressionDefinition lastSourceExpression;
 
@@ -180,6 +182,7 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
             outputFile = createLockFile(lockFile.getName() + "-output.json");
             traceFile = createLockFile(lockFile.getName() + "-trace.json");
             debugFile = createLockFile(lockFile.getName() + "-debug.json");
+            receiveFile = createLockFile(lockFile.getName() + "-receive.json");
             executor.scheduleWithFixedDelay(this::task, 0, delay, 
TimeUnit.MILLISECONDS);
             LOG.info("Camel JBang CLI enabled");
         } else {
@@ -223,7 +226,7 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
         actionTask();
         statusTask();
         // only run this every 2nd time as gathering this data has more 
overhead
-        // and are only needed when doing tracing or debugging
+        // and are only needed when doing tracing/debugging/receive
         if (++counter % 2 == 0) {
             traceTask();
         }
@@ -280,6 +283,8 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
                 doActionTraceTask(root);
             } else if ("browse".equals(action)) {
                 doActionBrowseTask(root);
+            } else if ("receive".equals(action)) {
+                doActionReceiveTask(root);
             }
         } catch (Exception e) {
             // ignore
@@ -811,6 +816,24 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
         }
     }
 
+    private void doActionReceiveTask(JsonObject root) throws IOException {
+        DevConsole dc = 
camelContext.getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class)
+                .resolveById("receive");
+        if (dc != null) {
+            JsonObject json;
+            String endpoint = root.getString("endpoint");
+            if (endpoint != null) {
+                json = (JsonObject) dc.call(DevConsole.MediaType.JSON, 
Map.of("enabled", "true", "endpoint", endpoint));
+            } else {
+                json = (JsonObject) dc.call(DevConsole.MediaType.JSON, 
Map.of("enabled", "false"));
+            }
+            LOG.trace("Updating output file: {}", outputFile);
+            IOHelper.writeText(json.toJson(), outputFile);
+        } else {
+            IOHelper.writeText("{}", outputFile);
+        }
+    }
+
     private void doActionBeanTask(JsonObject root) throws IOException {
         String filter = root.getStringOrDefault("filter", "");
         String properties = root.getStringOrDefault("properties", "true");
@@ -1206,6 +1229,33 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
             LOG.trace("Error updating debug file: {} due to: {}. This 
exception is ignored.",
                     debugFile, e.getMessage(), e);
         }
+        try {
+            DevConsole dc14 = 
camelContext.getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class)
+                    .resolveById("receive");
+            if (dc14 != null) {
+                JsonObject json = (JsonObject) 
dc14.call(DevConsole.MediaType.JSON, Map.of("dump", "true"));
+                JsonArray arr = json.getCollection("messages");
+                // filter based on last uid
+                if (receiveFilePos > 0) {
+                    arr.removeIf(r -> {
+                        JsonObject jo = (JsonObject) r;
+                        return jo.getLong("uid") <= receiveFilePos;
+                    });
+                }
+                if (arr != null && !arr.isEmpty()) {
+                    // store messages in a special file
+                    LOG.trace("Updating receive file: {}", receiveFile);
+                    String data = json.toJson() + System.lineSeparator();
+                    IOHelper.appendText(data, receiveFile);
+                    json = arr.getMap(arr.size() - 1);
+                    receiveFilePos = json.getLong("uid");
+                }
+            }
+        } catch (Exception e) {
+            // ignore
+            LOG.trace("Error updating receive file: {} due to: {}. This 
exception is ignored.",
+                    receiveFile, e.getMessage(), e);
+        }
     }
 
     private JsonObject collectMemory() {
@@ -1331,6 +1381,9 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
         if (debugFile != null) {
             FileUtil.deleteFile(debugFile);
         }
+        if (receiveFile != null) {
+            FileUtil.deleteFile(receiveFile);
+        }
         if (executor != null) {
             camelContext.getExecutorServiceManager().shutdown(executor);
             executor = null;
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java
index 41ae9f33f67..7413250e008 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java
@@ -110,6 +110,10 @@ public abstract class CamelCommand implements 
Callable<Integer> {
         return new File(CommandLineHelper.getCamelDir(), pid + "-trace.json");
     }
 
+    public File getReceiveFile(String pid) {
+        return new File(CommandLineHelper.getCamelDir(), pid + 
"-receive.json");
+    }
+
     public File getDebugFile(String pid) {
         return new File(CommandLineHelper.getCamelDir(), pid + "-debug.json");
     }
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
index 580118f5d60..9076f27f97d 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
@@ -212,16 +212,6 @@ public class CamelBrowseAction extends ActionBaseCommand {
         return 0;
     }
 
-    private static long getLongValueFromCollection(JsonArray arr, String key) {
-        for (Object o : arr) {
-            JsonObject jo = (JsonObject) o;
-            if (key.equalsIgnoreCase(jo.getString("key"))) {
-                return jo.getLong("value");
-            }
-        }
-        return 0;
-    }
-
     protected void dumpMessages(List<Row> rows, boolean onlyBody) {
         MessageTableHelper tableHelper = new MessageTableHelper();
         tableHelper.setPretty(pretty);
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java
new file mode 100644
index 00000000000..b9f859fb4f2
--- /dev/null
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dsl.jbang.core.commands.action;
+
+import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain;
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.TimeUtils;
+import org.apache.camel.util.json.JsonObject;
+import org.apache.camel.util.json.Jsoner;
+import org.fusesource.jansi.Ansi;
+import org.fusesource.jansi.AnsiConsole;
+import picocli.CommandLine;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.LineNumberReader;
+import java.text.SimpleDateFormat;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+
[email protected](name = "receive",
+                     description = "Receive messages from endpoints", 
sortOptions = false)
+public class CamelReceiveAction extends ActionBaseCommand {
+
+    public static class ActionCompletionCandidates implements Iterable<String> 
{
+
+        public ActionCompletionCandidates() {
+        }
+
+        @Override
+        public Iterator<String> iterator() {
+            return List.of("dump", "start", "stop").iterator();
+        }
+    }
+
+    @CommandLine.Parameters(description = "Name or pid of running Camel 
integration", arity = "0..1")
+    String name = "*";
+
+    @CommandLine.Option(names = { "--action" }, completionCandidates = 
ActionCompletionCandidates.class,
+            defaultValue = "dump",
+            description = "Action to start, stop, or dump messages")
+    String action;
+
+    @CommandLine.Option(names = { "--endpoint" },
+                        description = "Endpoint where to receive the messages 
from (can be uri, pattern, or refer to a route id)")
+    String endpoint;
+
+    @CommandLine.Option(names = { "--output-file" },
+                        description = "Saves messages received to the file 
with the given name (override if exists)")
+    String outputFile;
+
+    @CommandLine.Option(names = { "--tail" }, defaultValue = "-1",
+            description = "The number of messages from the end of the receive 
to show. Use -1 to read from the beginning. Use 0 to read only new lines. 
Defaults to showing all messages from beginning.")
+    int tail = -1;
+
+    @CommandLine.Option(names = { "--show-exchange-properties" }, defaultValue 
= "false",
+                        description = "Show exchange properties in traced 
messages")
+    boolean showExchangeProperties;
+
+    @CommandLine.Option(names = { "--show-headers" }, defaultValue = "true",
+                        description = "Show message headers in traced 
messages")
+    boolean showHeaders = true;
+
+    @CommandLine.Option(names = { "--show-body" }, defaultValue = "true",
+                        description = "Show message body in traced messages")
+    boolean showBody = true;
+
+    @CommandLine.Option(names = { "--show-exception" }, defaultValue = "true",
+                        description = "Show exception and stacktrace for 
failed messages")
+    boolean showException = true;
+
+    @CommandLine.Option(names = { "--logging-color" }, defaultValue = "true", 
description = "Use colored logging")
+    boolean loggingColor = true;
+
+    @CommandLine.Option(names = { "--pretty" },
+                        description = "Pretty print message body when using 
JSon or XML format")
+    boolean pretty;
+
+    @CommandLine.Option(names = { "--follow" }, defaultValue = "true",
+            description = "Keep following and outputting new received messages 
(use ctrl + c to exit).")
+    boolean follow = true;
+
+    private volatile long pid;
+
+    private MessageTableHelper tableHelper;
+
+    private static class Pid {
+        String pid;
+        String name;
+        Queue<String> fifo;
+        int depth;
+        LineNumberReader reader;
+    }
+
+    public CamelReceiveAction(CamelJBangMain main) {
+        super(main);
+    }
+
+    @Override
+    public Integer doCall() throws Exception {
+        if ("dump".equals(action)) {
+            return doDumpCall();
+        } else {
+            return doStartStopCall(action);
+        }
+    }
+
+    protected Integer doStartStopCall(String action) throws Exception {
+        // ensure output file is deleted before executing action
+        File outputFile = getOutputFile(Long.toString(pid));
+        FileUtil.deleteFile(outputFile);
+
+        JsonObject root = new JsonObject();
+        root.put("action", "receive");
+        root.put("endpoint", endpoint);
+        root.put("enabled", "start".equals(action) ? "true" : "false");
+
+        File f = getActionFile(Long.toString(pid));
+        try {
+            IOHelper.writeText(root.toJson(), f);
+        } catch (Exception e) {
+            // ignore
+        }
+        waitForOutputFile(outputFile);
+
+        // delete output file after use
+        FileUtil.deleteFile(outputFile);
+
+        return 0;
+    }
+
+    protected Integer doDumpCall() throws Exception {
+        List<Long> pids = findPids(name);
+        if (pids.isEmpty()) {
+            return 0;
+        } else if (pids.size() > 1) {
+            printer().println("Name or pid " + name + " matches " + pids.size()
+                              + " running Camel integrations. Specify a name 
or PID that matches exactly one.");
+            return 0;
+        }
+
+        this.pid = pids.get(0);
+
+        boolean waitMessage = true;
+        StopWatch watch = new StopWatch();
+        boolean more = true;
+        Pid pid = new Pid();
+        pid.pid = "" + this.pid;
+
+
+        if (tail != 0) {
+            tailTraceFiles(pids, tail);
+        } else {
+
+        }
+
+        do {
+            waitMessage = true;
+            int lines = readReceiveFiles(pid);
+            if (lines > 0) {
+                more = dumpReceivedFiles(pids, 0, null);
+            } else if (lines == 0) {
+                Thread.sleep(100);
+            } else {
+                break;
+            }
+        } while (follow || more);
+
+        return 0;
+    }
+
+    private int readReceiveFiles(Pid pid) throws Exception {
+        int lines = 0;
+        if (pid.reader == null) {
+            File file = getReceiveFile(pid.pid);
+            if (file.exists()) {
+                pid.reader = new LineNumberReader(new FileReader(file));
+                if (tail == 0) {
+                    // only read new lines so forward to end of reader
+                    long size = file.length();
+                    pid.reader.skip(size);
+                }
+            }
+        }
+        if (pid.reader != null) {
+            String line;
+            do {
+                try {
+                    line = pid.reader.readLine();
+                    if (line != null) {
+                        lines++;
+                        // switch fifo to be unlimited as we use it for new 
traces
+                        if (pid.fifo == null || pid.fifo instanceof 
ArrayBlockingQueue) {
+                            pid.fifo = new ArrayDeque<>();
+                        }
+                        pid.fifo.offer(line);
+                    }
+                } catch (IOException e) {
+                    // ignore
+                    line = null;
+                }
+            } while (line != null);
+        }
+
+        return lines;
+    }
+
+    private boolean dumpReceivedFiles(Pid pid) {
+        List<Row> rows = new ArrayList<>();
+
+        Queue<String> queue = pid.fifo;
+
+
+
+
+    }
+
+    private void tailTraceFiles(Map<Long, Pid> pids, int tail) throws 
Exception {
+        for (Pid pid : pids.values()) {
+            File file = getReceiveFile(pid.pid);
+            if (file.exists()) {
+                pid.reader = new LineNumberReader(new FileReader(file));
+                String line;
+                if (tail <= 0) {
+                    pid.fifo = new ArrayDeque<>();
+                } else {
+                    pid.fifo = new ArrayBlockingQueue<>(tail);
+                }
+                do {
+                    line = pid.reader.readLine();
+                    if (line != null) {
+                        while (!pid.fifo.offer(line)) {
+                            pid.fifo.poll();
+                        }
+                    }
+                } while (line != null);
+            }
+        }
+    }
+
+    protected JsonObject waitForOutputFile(File outputFile) {
+        StopWatch watch = new StopWatch();
+        while (watch.taken() < 5000) {
+            try {
+                // give time for response to be ready
+                Thread.sleep(20);
+
+                if (outputFile.exists()) {
+                    FileInputStream fis = new FileInputStream(outputFile);
+                    String text = IOHelper.loadText(fis);
+                    IOHelper.close(fis);
+                    return (JsonObject) Jsoner.deserialize(text);
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            } catch (Exception e) {
+                // ignore
+            }
+        }
+        return null;
+    }
+
+    private static class Row {
+        String pid;
+        long uid;
+        JsonObject message;
+    }
+
+}


Reply via email to