This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 3c957201743 CAMEL-21193: camel-jbang - Add receive command (#15912)
3c957201743 is described below
commit 3c957201743ff8ec01530de27705c4544464d275
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Oct 10 15:10:57 2024 +0200
CAMEL-21193: camel-jbang - Add receive command (#15912)
* CAMEL-21193: camel-jbang - Add receive command
---
.../apache/camel/catalog/dev-consoles.properties | 1 +
.../apache/camel/catalog/dev-consoles/receive.json | 15 +
.../org/apache/camel/impl/engine/DefaultRoute.java | 4 +
.../impl/console/ReceiveDevConsoleConfigurer.java | 86 +++
.../org/apache/camel/dev-console/receive.json | 15 +
...org.apache.camel.impl.console.ReceiveDevConsole | 2 +
.../services/org/apache/camel/dev-console/receive | 2 +
.../org/apache/camel/dev-consoles.properties | 2 +-
.../camel/impl/console/ReceiveDevConsole.java | 345 +++++++++
.../java/org/apache/camel/util/ObjectHelper.java | 19 +
.../modules/ROOT/pages/camel-jbang.adoc | 65 +-
.../camel/cli/connector/LocalCliConnector.java | 64 +-
.../dsl/jbang/core/commands/CamelCommand.java | 4 +
.../dsl/jbang/core/commands/CamelJBangMain.java | 1 +
.../core/commands/action/CamelBrowseAction.java | 10 -
.../core/commands/action/CamelReceiveAction.java | 808 +++++++++++++++++++++
16 files changed, 1429 insertions(+), 14 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
index d8d1929214d..280b182c430 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties
@@ -29,6 +29,7 @@ platform-http
properties
protocol
quartz
+receive
reload
resilience4j
rest
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/receive.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/receive.json
new file mode 100644
index 00000000000..978a1f23579
--- /dev/null
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/receive.json
@@ -0,0 +1,15 @@
+{
+ "console": {
+ "kind": "console",
+ "group": "camel",
+ "name": "receive",
+ "title": "Camel Receive",
+ "description": "Consume messages from endpoints",
+ "deprecated": false,
+ "javaType": "org.apache.camel.impl.console.ReceiveDevConsole",
+ "groupId": "org.apache.camel",
+ "artifactId": "camel-console",
+ "version": "4.9.0-SNAPSHOT"
+ }
+}
+
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
index 567443d3302..446671ba875 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.camel.CamelContext;
+import org.apache.camel.Channel;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.ErrorHandlerFactory;
@@ -733,6 +734,9 @@ public class DefaultRoute extends ServiceSupport implements
Route {
List<Processor> list = nav.next();
if (list != null) {
for (Processor proc : list) {
+ if (proc instanceof Channel channel) {
+ proc = channel.getNextProcessor();
+ }
String id = null;
if (proc instanceof IdAware idAware) {
id = idAware.getId();
diff --git
a/core/camel-console/src/generated/java/org/apache/camel/impl/console/ReceiveDevConsoleConfigurer.java
b/core/camel-console/src/generated/java/org/apache/camel/impl/console/ReceiveDevConsoleConfigurer.java
new file mode 100644
index 00000000000..0129a9fae6a
--- /dev/null
+++
b/core/camel-console/src/generated/java/org/apache/camel/impl/console/ReceiveDevConsoleConfigurer.java
@@ -0,0 +1,86 @@
+/* Generated by camel build tools - do NOT edit this file! */
+package org.apache.camel.impl.console;
+
+import javax.annotation.processing.Generated;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
+import org.apache.camel.spi.PropertyConfigurerGetter;
+import org.apache.camel.spi.ConfigurerStrategy;
+import org.apache.camel.spi.GeneratedPropertyConfigurer;
+import org.apache.camel.util.CaseInsensitiveMap;
+import org.apache.camel.impl.console.ReceiveDevConsole;
+
+/**
+ * Generated by camel build tools - do NOT edit this file!
+ */
+@Generated("org.apache.camel.maven.packaging.GenerateConfigurerMojo")
+@SuppressWarnings("unchecked")
+public class ReceiveDevConsoleConfigurer extends
org.apache.camel.support.component.PropertyConfigurerSupport implements
GeneratedPropertyConfigurer, ExtendedPropertyConfigurerGetter {
+
+ private static final Map<String, Object> ALL_OPTIONS;
+ static {
+ Map<String, Object> map = new CaseInsensitiveMap();
+ map.put("BodyMaxChars", int.class);
+ map.put("CamelContext", org.apache.camel.CamelContext.class);
+ map.put("Capacity", int.class);
+ map.put("RemoveOnDump", boolean.class);
+ ALL_OPTIONS = map;
+
ConfigurerStrategy.addBootstrapConfigurerClearer(ReceiveDevConsoleConfigurer::clearBootstrapConfigurers);
+ }
+
+ @Override
+ public boolean configure(CamelContext camelContext, Object obj, String
name, Object value, boolean ignoreCase) {
+ org.apache.camel.impl.console.ReceiveDevConsole target =
(org.apache.camel.impl.console.ReceiveDevConsole) obj;
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "bodymaxchars":
+ case "bodyMaxChars": target.setBodyMaxChars(property(camelContext,
int.class, value)); return true;
+ case "camelcontext":
+ case "camelContext": target.setCamelContext(property(camelContext,
org.apache.camel.CamelContext.class, value)); return true;
+ case "capacity": target.setCapacity(property(camelContext, int.class,
value)); return true;
+ case "removeondump":
+ case "removeOnDump": target.setRemoveOnDump(property(camelContext,
boolean.class, value)); return true;
+ default: return false;
+ }
+ }
+
+ @Override
+ public Map<String, Object> getAllOptions(Object target) {
+ return ALL_OPTIONS;
+ }
+
+ public static void clearBootstrapConfigurers() {
+ ALL_OPTIONS.clear();
+ }
+
+ @Override
+ public Class<?> getOptionType(String name, boolean ignoreCase) {
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "bodymaxchars":
+ case "bodyMaxChars": return int.class;
+ case "camelcontext":
+ case "camelContext": return org.apache.camel.CamelContext.class;
+ case "capacity": return int.class;
+ case "removeondump":
+ case "removeOnDump": return boolean.class;
+ default: return null;
+ }
+ }
+
+ @Override
+ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
+ org.apache.camel.impl.console.ReceiveDevConsole target =
(org.apache.camel.impl.console.ReceiveDevConsole) obj;
+ switch (ignoreCase ? name.toLowerCase() : name) {
+ case "bodymaxchars":
+ case "bodyMaxChars": return target.getBodyMaxChars();
+ case "camelcontext":
+ case "camelContext": return target.getCamelContext();
+ case "capacity": return target.getCapacity();
+ case "removeondump":
+ case "removeOnDump": return target.isRemoveOnDump();
+ default: return null;
+ }
+ }
+}
+
diff --git
a/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/receive.json
b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/receive.json
new file mode 100644
index 00000000000..978a1f23579
--- /dev/null
+++
b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/receive.json
@@ -0,0 +1,15 @@
+{
+ "console": {
+ "kind": "console",
+ "group": "camel",
+ "name": "receive",
+ "title": "Camel Receive",
+ "description": "Consume messages from endpoints",
+ "deprecated": false,
+ "javaType": "org.apache.camel.impl.console.ReceiveDevConsole",
+ "groupId": "org.apache.camel",
+ "artifactId": "camel-console",
+ "version": "4.9.0-SNAPSHOT"
+ }
+}
+
diff --git
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.ReceiveDevConsole
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.ReceiveDevConsole
new file mode 100644
index 00000000000..3a11d5ff2c9
--- /dev/null
+++
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/configurer/org.apache.camel.impl.console.ReceiveDevConsole
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.impl.console.ReceiveDevConsoleConfigurer
diff --git
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/receive
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/receive
new file mode 100644
index 00000000000..8454801f9f9
--- /dev/null
+++
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/receive
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.impl.console.ReceiveDevConsole
diff --git
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
index fd1b9890684..8d3e19577ae 100644
---
a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
+++
b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties
@@ -1,5 +1,5 @@
# Generated by camel build tools - do NOT edit this file!
-dev-consoles=bean blocked browse circuit-breaker consumer context debug
endpoint event gc health inflight java-security jvm log memory properties
reload rest route route-controller route-dump service source startup-recorder
thread top trace transformers type-converters variables
+dev-consoles=bean blocked browse circuit-breaker consumer context debug
endpoint event gc health inflight java-security jvm log memory properties
receive reload rest route route-controller route-dump service source
startup-recorder thread top trace transformers type-converters variables
groupId=org.apache.camel
artifactId=camel-console
version=4.9.0-SNAPSHOT
diff --git
a/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java
new file mode 100644
index 00000000000..ff3a3215e7b
--- /dev/null
+++
b/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.console;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.Configurer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.DevConsole;
+import org.apache.camel.support.ExceptionHelper;
+import org.apache.camel.support.MessageHelper;
+import org.apache.camel.support.PatternHelper;
+import org.apache.camel.support.console.AbstractDevConsole;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import org.apache.camel.util.json.Jsoner;
+
+@DevConsole(name = "receive", displayName = "Camel Receive", description =
"Consume messages from endpoints")
+@Configurer(bootstrap = true, extended = true)
+public class ReceiveDevConsole extends AbstractDevConsole {
+
+ @Metadata(defaultValue = "100",
+ description = "Maximum capacity of last number of messages to
capture (capacity must be between 50 and 1000)")
+ private int capacity = 100;
+ @Metadata(defaultValue = "32768", label = "advanced",
+ description = "To limit the message body to a maximum size in
the received message. Use 0 or negative value to use unlimited size.")
+ private int bodyMaxChars = 32 * 1024;
+ @Metadata(defaultValue = "true", label = "advanced",
+ description = "Whether all received messages should be removed
when dumping. By default, the messages are removed, which means that dumping
will not contain previous dumped messages.")
+ private boolean removeOnDump = true;
+
+ /**
+ * Whether to enable or disable receive mode
+ */
+ public static final String ENABLED = "enabled";
+
+ /**
+ * Whether to dump received messages
+ */
+ public static final String DUMP = "dump";
+
+ /**
+ * Endpoint for where to receive messages (can also refer to a route id,
endpoint pattern).
+ */
+ public static final String ENDPOINT = "endpoint";
+
+ private final List<Consumer> consumers = new ArrayList<>();
+ private final AtomicBoolean enabled = new AtomicBoolean();
+ private final AtomicLong uuid = new AtomicLong();
+ private Queue<JsonObject> queue;
+ private long firstTimestamp;
+ private long lastTimestamp;
+
+ public ReceiveDevConsole() {
+ super("camel", "receive", "Camel Receive", "Consume messages from
endpoints");
+ }
+
+ public int getCapacity() {
+ return capacity;
+ }
+
+ public void setCapacity(int capacity) {
+ this.capacity = capacity;
+ }
+
+ public int getBodyMaxChars() {
+ return bodyMaxChars;
+ }
+
+ public void setBodyMaxChars(int bodyMaxChars) {
+ this.bodyMaxChars = bodyMaxChars;
+ }
+
+ public boolean isRemoveOnDump() {
+ return removeOnDump;
+ }
+
+ public void setRemoveOnDump(boolean removeOnDump) {
+ this.removeOnDump = removeOnDump;
+ }
+
+ @Override
+ protected void doInit() throws Exception {
+ if (capacity > 1000 || capacity < 50) {
+ throw new IllegalArgumentException("Capacity must be between 50
and 1000");
+ }
+ this.queue = new LinkedBlockingQueue<>(capacity);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ stopConsumers();
+ }
+
+ protected void stopConsumers() {
+ for (Consumer c : consumers) {
+ ServiceHelper.stopAndShutdownServices(c);
+ }
+ consumers.clear();
+ }
+
+ protected String doCallText(Map<String, Object> options) {
+ StringBuilder sb = new StringBuilder();
+
+ String dump = (String) options.get(DUMP);
+ if ("true".equals(dump)) {
+ JsonArray arr = new JsonArray();
+ arr.addAll(queue);
+ if (removeOnDump) {
+ queue.clear();
+ }
+ JsonObject jo = (JsonObject) arr.get(0);
+ firstTimestamp = jo.getLongOrDefault("timestamp", 0);
+ jo = (JsonObject) arr.get(arr.size() - 1);
+ lastTimestamp = jo.getLongOrDefault("timestamp", 0);
+ String json = arr.toJson();
+ sb.append(json).append("\n");
+ return sb.toString();
+ }
+
+ String enabled = (String) options.get(ENABLED);
+ if ("false".equals(enabled)) {
+ // turn off all consumers
+ stopConsumers();
+ this.enabled.set(false);
+ sb.append("Enabled: ").append("false").append("\n");
+ return sb.toString();
+ }
+
+ String pattern = (String) options.get(ENDPOINT);
+ if (pattern != null) {
+ try {
+ Endpoint target = findMatchingEndpoint(getCamelContext(),
pattern);
+ if (target != null) {
+ sb.append("Starting to receive messages from:
").append(target.getEndpointUri());
+ Consumer consumer = createConsumer(target);
+ if (!consumers.contains(consumer)) {
+ consumers.add(consumer);
+ ServiceHelper.startService(consumer);
+ }
+ }
+ this.enabled.set(true);
+ } catch (Exception e) {
+ sb.append("Error starting to receive messages due to:
").append(e.getMessage());
+ }
+ }
+
+ sb.append("Enabled: ").append(this.enabled.get()).append("\n");
+ sb.append("Total: ").append(this.uuid.get()).append("\n");
+ for (Consumer c : consumers) {
+ sb.append(" ").append(c.getEndpoint().toString()).append("\n");
+ }
+ return sb.toString();
+ }
+
+ protected JsonObject doCallJson(Map<String, Object> options) {
+ JsonObject root = new JsonObject();
+
+ String dump = (String) options.get(DUMP);
+ if ("true".equals(dump)) {
+ JsonArray arr = new JsonArray();
+ arr.addAll(queue);
+ if (removeOnDump) {
+ queue.clear();
+ }
+ root.put("messages", arr);
+ JsonObject jo = (JsonObject) arr.get(0);
+ firstTimestamp = jo.getLongOrDefault("timestamp", 0);
+ jo = (JsonObject) arr.get(arr.size() - 1);
+ lastTimestamp = jo.getLongOrDefault("timestamp", 0);
+ return root;
+ }
+
+ String enabled = (String) options.get(ENABLED);
+ if ("false".equals(enabled)) {
+ // turn off all consumers
+ stopConsumers();
+ this.enabled.set(false);
+ root.put("enabled", false);
+ return root;
+ }
+
+ String pattern = (String) options.get(ENDPOINT);
+ if (pattern != null) {
+ try {
+ Endpoint target = findMatchingEndpoint(getCamelContext(),
pattern);
+ if (target != null) {
+ root.put("url", target.getEndpointUri());
+ Consumer consumer = createConsumer(target);
+ if (!consumers.contains(consumer)) {
+ consumers.add(consumer);
+ ServiceHelper.startService(consumer);
+ }
+ }
+ this.enabled.set(true);
+ } catch (Exception e) {
+ root.put("error", Jsoner.escape(e.getMessage()));
+ JsonArray arr2 = new JsonArray();
+ final String trace = ExceptionHelper.stackTraceToString(e);
+ root.put("stackTrace", arr2);
+ Collections.addAll(arr2, trace.split("\n"));
+ }
+ }
+
+ root.put("enabled", this.enabled.get());
+ root.put("total", uuid.get());
+ root.put("firstTimestamp", firstTimestamp);
+ root.put("lastTimestamp", lastTimestamp);
+
+ JsonArray arr = new JsonArray();
+ for (Consumer c : consumers) {
+ JsonObject jo = new JsonObject();
+ jo.put("uri", c.getEndpoint().toString());
+ jo.put("remote", c.getEndpoint().isRemote());
+ arr.add(jo);
+ }
+ if (!arr.isEmpty()) {
+ root.put("endpoints", arr);
+ }
+ return root;
+ }
+
+ private Consumer createConsumer(Endpoint target) throws Exception {
+ for (Consumer c : consumers) {
+ if (c.getEndpoint() == target) {
+ return c;
+ }
+ }
+ return target.createConsumer(this::addMessage);
+ }
+
+ private void addMessage(Exchange exchange) {
+ JsonObject json
+ = MessageHelper.dumpAsJSonObject(exchange.getMessage(), false,
false, true, true, true, true, bodyMaxChars);
+ json.put("uid", uuid.incrementAndGet());
+ json.put("endpointUri", exchange.getFromEndpoint().toString());
+ json.put("remoteEndpoint", exchange.getFromEndpoint().isRemote());
+ lastTimestamp = exchange.getMessage().getMessageTimestamp();
+ json.put("timestamp", lastTimestamp);
+
+ // ensure there is space on the queue by polling until at least single
slot is free
+ int drain = queue.size() - capacity + 1;
+ if (drain > 0) {
+ for (int i = 0; i < drain; i++) {
+ queue.poll();
+ }
+ }
+ queue.add(json);
+ }
+
+ protected static Endpoint findMatchingEndpoint(CamelContext camelContext,
String endpoint) {
+ Endpoint target = null;
+ boolean scheme = endpoint.contains(":");
+ boolean pattern = endpoint.endsWith("*");
+ if (!scheme || pattern) {
+ if (!scheme && !endpoint.endsWith("*")) {
+ endpoint = endpoint + "*";
+ }
+ // find all producers for this camel context via JMX mbeans (this
allows to find also producers created via dynamic EIPs)
+ MBeanServer mbeanServer =
camelContext.getManagementStrategy().getManagementAgent().getMBeanServer();
+ if (mbeanServer != null) {
+ try {
+ String jmxDomain
+ =
camelContext.getManagementStrategy().getManagementAgent().getMBeanObjectDomainName();
+ String prefix
+ =
camelContext.getManagementStrategy().getManagementAgent().getIncludeHostName()
? "*/" : "";
+ ObjectName query = ObjectName.getInstance(
+ jmxDomain + ":context=" + prefix +
camelContext.getManagementName() + ",type=producers,*");
+ Set<ObjectName> set = mbeanServer.queryNames(query, null);
+ if (set != null && !set.isEmpty()) {
+ for (ObjectName on : set) {
+ String uri = (String) mbeanServer.getAttribute(on,
"EndpointUri");
+ if (PatternHelper.matchPattern(uri, endpoint)) {
+ // is the endpoint able to create a consumer
+ target = camelContext.getEndpoint(uri);
+ // is the target able to create a consumer
+ org.apache.camel.spi.UriEndpoint ann
+ =
ObjectHelper.getAnnotationDeep(target, org.apache.camel.spi.UriEndpoint.class);
+ if (ann != null) {
+ if (ann.producerOnly()) {
+ // skip if the endpoint cannot consume
(we need to be able to consume to receive)
+ target = null;
+ }
+ if ("*".equals(endpoint) && !ann.remote())
{
+ // skip internal when matching
everything
+ target = null;
+ }
+ }
+ if (target != null) {
+ break;
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ } else {
+ target = camelContext.getEndpoint(endpoint);
+ // is the target able to create a consumer
+ org.apache.camel.spi.UriEndpoint ann
+ = ObjectHelper.getAnnotationDeep(target,
org.apache.camel.spi.UriEndpoint.class);
+ if (ann != null) {
+ if (ann.producerOnly()) {
+ // skip if the endpoint cannot consume (we need to be able
to consume to receive)
+ throw new IllegalArgumentException("Cannot consume from
endpoint: " + endpoint);
+ }
+ }
+ }
+ return target;
+ }
+
+}
diff --git
a/core/camel-util/src/main/java/org/apache/camel/util/ObjectHelper.java
b/core/camel-util/src/main/java/org/apache/camel/util/ObjectHelper.java
index e82dc4e21c1..941188187bc 100644
--- a/core/camel-util/src/main/java/org/apache/camel/util/ObjectHelper.java
+++ b/core/camel-util/src/main/java/org/apache/camel/util/ObjectHelper.java
@@ -1103,6 +1103,25 @@ public final class ObjectHelper {
return instance.getClass().getAnnotation(type);
}
+ /**
+ * Gets the annotation from the given instance (searching super classes
also).
+ *
+ * @param instance the instance
+ * @param type the annotation
+ * @return the annotation, or <tt>null</tt> if the instance does
not have the given annotation
+ */
+ public static <A extends java.lang.annotation.Annotation> A
getAnnotationDeep(Object instance, Class<A> type) {
+ Class<?> clazz = instance.getClass();
+ while (clazz != Object.class) {
+ A ann = clazz.getAnnotation(type);
+ if (ann != null) {
+ return ann;
+ }
+ clazz = clazz.getSuperclass();
+ }
+ return null;
+ }
+
/**
* Converts the given value to the required type or throw a meaningful
exception
*/
diff --git a/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc
b/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc
index ea752eeb077..f25867ed324 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc
@@ -1399,11 +1399,74 @@ For example to poll a message from a ActiveMQ queue
named cheese you can do:
[source,bash]
----
-$ camel cmd send --poll --endpoint=activemq:cheese
+$ camel cmd send --poll --endpoint='activemq:cheese'
----
When you poll then you do not send any payload (body or headers).
+=== Receiving messages via Camel
+
+*Available since Camel 4.9*
+
+When building a prototype integration with Camel JBang, you may route messages
to external systems.
+To know whether messages are being routed correctly, you may use system
consoles to look inside these systems
+which messages have arrived, such as SQL prompts, web consoles, CLI tools etc.
+
+The Camel JBang now comes with a new command to receive messages from remote
endpoints.
+This can be used to quickly look or tail in terminal the messages that an
external systems has received.
+Camel does this by consuming the messages (if the component has support for
consumer) and then let Camel JBang dump the messages from the CLI.
+
+For example to start dumping all messages from ActiveMQ in one command, you
can do:
+
+[source,bash]
+----
+$ camel cmd receive --endpoint='activemq:cheese'
+----
+
+You can also use pattern syntax for the endpoint, so suppose you have the
following route:
+
+[source,java]
+----
+from("ftp:myserver:1234/foo")
+ .to("log:order")
+ .to("activemq:orders");
+----
+
+Then you can tell Camel to automatic start receiving messages with:
+
+[source,bash]
+----
+$ camel cmd receive --action=start
+----
+
+TIP: You can enable and disable this mode with `--action=start` and
`--action-stop`.
+
+Then Camel will automatically discover from the running integration, all the
_producers_ and
+find the first _producer_ that is remote and also has consumer support. In the
example above,
+that is the `activemq` component, and thus Camel will start receive from
`activemq:orders`.
+
+You can see the status via:
+
+[source,bash]
+----
+$ camel cmd receive
+ PID NAME AGE STATUS TOTAL SINCE ENDPOINT
+ 4364 foo 1m33s Enabled 18 2s activemq://orders
+----
+
+You can then dump all the received messages with:
+
+[source,bash]
+----
+$ camel cmd receive --action=dump
+----
+
+This will dump all the messages, and continue to dump new incoming messages.
Use (ctrl + c) to break and exit.
+You can turn follow off with `--follow=false`.
+
+TIP: Use `camel cmd receive --help` to see all the various options for this
command.
+
+
=== Controlling local Camel integrations
To list the currently running Camel integrations, you use the `ps` command:
diff --git
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
index 4a74f68d444..279eecc1f0c 100644
---
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
+++
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
@@ -114,8 +114,10 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
private File actionFile;
private File outputFile;
private File traceFile;
+ private long traceFilePos; // keep track of trace offset
private File debugFile;
- private long traceFilePos; // keep track of trace offset
+ private File receiveFile;
+ private long receiveFilePos; // keep track of receive offset
private byte[] lastSource;
private ExpressionDefinition lastSourceExpression;
@@ -180,6 +182,7 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
outputFile = createLockFile(lockFile.getName() + "-output.json");
traceFile = createLockFile(lockFile.getName() + "-trace.json");
debugFile = createLockFile(lockFile.getName() + "-debug.json");
+ receiveFile = createLockFile(lockFile.getName() + "-receive.json");
executor.scheduleWithFixedDelay(this::task, 0, delay,
TimeUnit.MILLISECONDS);
LOG.info("Camel JBang CLI enabled");
} else {
@@ -223,7 +226,7 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
actionTask();
statusTask();
// only run this every 2nd time as gathering this data has more
overhead
- // and are only needed when doing tracing or debugging
+ // and are only needed when doing tracing/debugging/receive
if (++counter % 2 == 0) {
traceTask();
}
@@ -280,6 +283,8 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
doActionTraceTask(root);
} else if ("browse".equals(action)) {
doActionBrowseTask(root);
+ } else if ("receive".equals(action)) {
+ doActionReceiveTask(root);
}
} catch (Exception e) {
// ignore
@@ -811,6 +816,24 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
}
}
+ private void doActionReceiveTask(JsonObject root) throws IOException {
+ DevConsole dc =
camelContext.getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class)
+ .resolveById("receive");
+ if (dc != null) {
+ JsonObject json;
+ String endpoint = root.getString("endpoint");
+ if (endpoint != null) {
+ json = (JsonObject) dc.call(DevConsole.MediaType.JSON,
Map.of("enabled", "true", "endpoint", endpoint));
+ } else {
+ json = (JsonObject) dc.call(DevConsole.MediaType.JSON,
Map.of("enabled", "false"));
+ }
+ LOG.trace("Updating output file: {}", outputFile);
+ IOHelper.writeText(json.toJson(), outputFile);
+ } else {
+ IOHelper.writeText("{}", outputFile);
+ }
+ }
+
private void doActionBeanTask(JsonObject root) throws IOException {
String filter = root.getStringOrDefault("filter", "");
String properties = root.getStringOrDefault("properties", "true");
@@ -1132,6 +1155,13 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
root.put("main-configuration", json);
}
}
+ DevConsole dc23 = dcr.resolveById("receive");
+ if (dc23 != null) {
+ JsonObject json = (JsonObject)
dc23.call(DevConsole.MediaType.JSON);
+ if (json != null && !json.isEmpty()) {
+ root.put("receive", json);
+ }
+ }
}
// various details
JsonObject mem = collectMemory();
@@ -1206,6 +1236,33 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
LOG.trace("Error updating debug file: {} due to: {}. This
exception is ignored.",
debugFile, e.getMessage(), e);
}
+ try {
+ DevConsole dc14 =
camelContext.getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class)
+ .resolveById("receive");
+ if (dc14 != null) {
+ JsonObject json = (JsonObject)
dc14.call(DevConsole.MediaType.JSON, Map.of("dump", "true"));
+ JsonArray arr = json.getCollection("messages");
+ // filter based on last uid
+ if (receiveFilePos > 0) {
+ arr.removeIf(r -> {
+ JsonObject jo = (JsonObject) r;
+ return jo.getLong("uid") <= receiveFilePos;
+ });
+ }
+ if (arr != null && !arr.isEmpty()) {
+ // store messages in a special file
+ LOG.trace("Updating receive file: {}", receiveFile);
+ String data = json.toJson() + System.lineSeparator();
+ IOHelper.appendText(data, receiveFile);
+ json = arr.getMap(arr.size() - 1);
+ receiveFilePos = json.getLong("uid");
+ }
+ }
+ } catch (Exception e) {
+ // ignore
+ LOG.trace("Error updating receive file: {} due to: {}. This
exception is ignored.",
+ receiveFile, e.getMessage(), e);
+ }
}
private JsonObject collectMemory() {
@@ -1331,6 +1388,9 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
if (debugFile != null) {
FileUtil.deleteFile(debugFile);
}
+ if (receiveFile != null) {
+ FileUtil.deleteFile(receiveFile);
+ }
if (executor != null) {
camelContext.getExecutorServiceManager().shutdown(executor);
executor = null;
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java
index 41ae9f33f67..7413250e008 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelCommand.java
@@ -110,6 +110,10 @@ public abstract class CamelCommand implements
Callable<Integer> {
return new File(CommandLineHelper.getCamelDir(), pid + "-trace.json");
}
+ public File getReceiveFile(String pid) {
+ return new File(CommandLineHelper.getCamelDir(), pid +
"-receive.json");
+ }
+
public File getDebugFile(String pid) {
return new File(CommandLineHelper.getCamelDir(), pid + "-debug.json");
}
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
index 58d7b13e44f..2dd90261654 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
@@ -122,6 +122,7 @@ public class CamelJBangMain implements Callable<Integer> {
.addSubcommand("reset-stats", new CommandLine(new
CamelResetStatsAction(main)))
.addSubcommand("reload", new CommandLine(new
CamelReloadAction(main)))
.addSubcommand("send", new CommandLine(new
CamelSendAction(main)))
+ .addSubcommand("receive", new CommandLine(new
CamelReceiveAction(main)))
.addSubcommand("browse", new CommandLine(new
CamelBrowseAction(main)))
.addSubcommand("stub", new CommandLine(new
CamelStubAction(main)))
.addSubcommand("thread-dump", new CommandLine(new
CamelThreadDump(main)))
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
index 580118f5d60..9076f27f97d 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelBrowseAction.java
@@ -212,16 +212,6 @@ public class CamelBrowseAction extends ActionBaseCommand {
return 0;
}
- private static long getLongValueFromCollection(JsonArray arr, String key) {
- for (Object o : arr) {
- JsonObject jo = (JsonObject) o;
- if (key.equalsIgnoreCase(jo.getString("key"))) {
- return jo.getLong("value");
- }
- }
- return 0;
- }
-
protected void dumpMessages(List<Row> rows, boolean onlyBody) {
MessageTableHelper tableHelper = new MessageTableHelper();
tableHelper.setPretty(pretty);
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java
new file mode 100644
index 00000000000..055b140e8bb
--- /dev/null
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelReceiveAction.java
@@ -0,0 +1,808 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dsl.jbang.core.commands.action;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.LineNumberReader;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.regex.Pattern;
+
+import com.github.freva.asciitable.AsciiTable;
+import com.github.freva.asciitable.Column;
+import com.github.freva.asciitable.HorizontalAlign;
+import com.github.freva.asciitable.OverflowBehaviour;
+import org.apache.camel.catalog.impl.TimePatternConverter;
+import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain;
+import org.apache.camel.dsl.jbang.core.common.PidNameAgeCompletionCandidates;
+import org.apache.camel.dsl.jbang.core.common.ProcessHelper;
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.StringHelper;
+import org.apache.camel.util.TimeUtils;
+import org.apache.camel.util.URISupport;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import org.apache.camel.util.json.Jsoner;
+import org.fusesource.jansi.Ansi;
+import picocli.CommandLine;
+
[email protected](name = "receive",
+ description = "Receive and dump messages from remote
endpoints", sortOptions = false)
+public class CamelReceiveAction extends ActionBaseCommand {
+
+ private static final int NAME_MAX_WIDTH = 25;
+ private static final int NAME_MIN_WIDTH = 10;
+
+ public static class PrefixCompletionCandidates implements Iterable<String>
{
+
+ public PrefixCompletionCandidates() {
+ }
+
+ @Override
+ public Iterator<String> iterator() {
+ return List.of("auto", "true", "false").iterator();
+ }
+ }
+
+ public static class ActionCompletionCandidates implements Iterable<String>
{
+
+ public ActionCompletionCandidates() {
+ }
+
+ @Override
+ public Iterator<String> iterator() {
+ return List.of("dump", "start", "stop", "status",
"clear").iterator();
+ }
+ }
+
+ @CommandLine.Parameters(description = "Name or pid of running Camel
integration. (default selects all)", arity = "0..1")
+ String name = "*";
+
+ @CommandLine.Option(names = { "--action" }, completionCandidates =
ActionCompletionCandidates.class,
+ defaultValue = "status",
+ description = "Action to start, stop, clear, list
status, or dump messages")
+ String action;
+
+ @CommandLine.Option(names = { "--endpoint" },
+ description = "Endpoint to browse messages (can be uri
or pattern to refer to existing endpoint)")
+ String endpoint;
+
+ @CommandLine.Option(names = { "--sort" }, completionCandidates =
PidNameAgeCompletionCandidates.class,
+ description = "Sort by pid, name or age for showing
status of messages", defaultValue = "pid")
+ String sort;
+
+ @CommandLine.Option(names = { "--follow" }, defaultValue = "true",
+ description = "Keep following and outputting new
messages (use ctrl + c to exit).")
+ boolean follow = true;
+
+ @CommandLine.Option(names = { "--prefix" }, defaultValue = "auto",
+ completionCandidates =
PrefixCompletionCandidates.class,
+ description = "Print prefix with running Camel
integration name. auto=only prefix when running multiple integrations.
true=always prefix. false=prefix off.")
+ String prefix = "auto";
+
+ @CommandLine.Option(names = { "--tail" }, defaultValue = "-1",
+ description = "The number of messages from the end to
show. Use -1 to read from the beginning. Use 0 to read only new lines. Defaults
to showing all messages from beginning.")
+ int tail = -1;
+
+ @CommandLine.Option(names = { "--since" },
+ description = "Return messages newer than a relative
duration like 5s, 2m, or 1h. The value is in seconds if no unit specified.")
+ String since;
+
+ @CommandLine.Option(names = { "--find" },
+ description = "Find and highlight matching text
(ignore case).", arity = "0..*")
+ String[] find;
+
+ @CommandLine.Option(names = { "--grep" },
+ description = "Filter messages to only output matching
text (ignore case).", arity = "0..*")
+ String[] grep;
+
+ @CommandLine.Option(names = { "--show-headers" }, defaultValue = "true",
+ description = "Show message headers in received
messages")
+ boolean showHeaders = true;
+
+ @CommandLine.Option(names = { "--show-body" }, defaultValue = "true",
+ description = "Show message body in received messages")
+ boolean showBody = true;
+
+ @CommandLine.Option(names = { "--only-body" }, defaultValue = "false",
+ description = "Show only message body in received
messages")
+ boolean onlyBody;
+
+ @CommandLine.Option(names = { "--logging-color" }, defaultValue = "true",
description = "Use colored logging")
+ boolean loggingColor = true;
+
+ @CommandLine.Option(names = { "--compact" }, defaultValue = "true",
+ description = "Compact output (no empty line
separating messages)")
+ boolean compact = true;
+
+ @CommandLine.Option(names = { "--short-uri" },
+ description = "List endpoint URI without query
parameters (short)")
+ boolean shortUri;
+
+ @CommandLine.Option(names = { "--wide-uri" },
+ description = "List endpoint URI in full details")
+ boolean wideUri;
+
+ @CommandLine.Option(names = { "--mask" },
+ description = "Whether to mask endpoint URIs to avoid
printing sensitive information such as password or access keys")
+ boolean mask;
+
+ @CommandLine.Option(names = { "--pretty" },
+ description = "Pretty print message body when using
JSon or XML format")
+ boolean pretty;
+
+ String findAnsi;
+ private int nameMaxWidth;
+ private boolean prefixShown;
+ private MessageTableHelper tableHelper;
+ private final Map<String, Ansi.Color> nameColors = new HashMap<>();
+
+ public CamelReceiveAction(CamelJBangMain main) {
+ super(main);
+ }
+
+ @Override
+ public Integer doCall() throws Exception {
+ boolean autoDump = false;
+ if (endpoint != null) {
+ // if using --endpoint then action should be start and auto-dump
+ action = "start";
+ autoDump = true;
+ }
+
+ if ("dump".equals(action)) {
+ return doDumpCall();
+ } else if ("status".equals(action)) {
+ return doStatusCall();
+ }
+
+ List<Long> pids = findPids(name);
+ for (long pid : pids) {
+ if ("clear".equals(action)) {
+ File f = getReceiveFile("" + pid);
+ if (f.exists()) {
+ IOHelper.writeText("{}", f);
+ }
+ } else {
+ // ensure output file is deleted before executing action
+ File outputFile = getOutputFile(Long.toString(pid));
+ FileUtil.deleteFile(outputFile);
+
+ JsonObject root = new JsonObject();
+ root.put("action", "receive");
+ if ("start".equals(action)) {
+ root.put("enabled", "true");
+ if (endpoint != null) {
+ root.put("endpoint", endpoint);
+ } else {
+ root.put("endpoint", "*");
+ }
+ } else if ("stop".equals(action)) {
+ root.put("enabled", "false");
+ }
+ File f = getActionFile(Long.toString(pid));
+ IOHelper.writeText(root.toJson(), f);
+
+ JsonObject jo = waitForOutputFile(outputFile);
+ if (jo != null) {
+ String error = jo.getString("error");
+ if (error != null) {
+ error = Jsoner.unescape(error);
+ String url = jo.getString("url");
+ List<String> stackTrace =
jo.getCollection("stackTrace");
+ if (url != null) {
+ printer().println("Error starting to receive
messages from: " + url + " due to: " + error);
+
+ } else {
+ printer().println("Error starting to receive
messages due to: " + error);
+ }
+ printer().println(StringHelper.fillChars('-', 120));
+ printer().println(StringHelper.padString(1, 55) +
"STACK-TRACE");
+ printer().println(StringHelper.fillChars('-', 120));
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < stackTrace.size(); i++) {
+ sb.append(String.format("\t%s%n",
stackTrace.get(i)));
+ }
+ printer().println(String.valueOf(sb));
+ }
+ }
+ }
+ }
+
+ if (autoDump) {
+ return doDumpCall();
+ }
+
+ return 0;
+ }
+
+ protected JsonObject waitForOutputFile(File outputFile) {
+ return getJsonObject(outputFile);
+ }
+
+ protected Integer doStatusCall() {
+ List<StatusRow> rows = new ArrayList<>();
+
+ List<Long> pids = findPids(name);
+ ProcessHandle.allProcesses()
+ .filter(ph -> pids.contains(ph.pid()))
+ .forEach(ph -> {
+ JsonObject root = loadStatus(ph.pid());
+ if (root != null) {
+ StatusRow row = new StatusRow();
+ JsonObject context = (JsonObject) root.get("context");
+ if (context == null) {
+ return;
+ }
+ row.name = context.getString("name");
+ if ("CamelJBang".equals(row.name)) {
+ row.name = ProcessHelper.extractName(root, ph);
+ }
+ row.pid = Long.toString(ph.pid());
+ row.uptime = extractSince(ph);
+ row.age = TimeUtils.printSince(row.uptime);
+ JsonObject jo = root.getMap("receive");
+ if (jo != null) {
+ row.enabled = jo.getBoolean("enabled");
+ row.counter = jo.getLong("total");
+ row.firstTimestamp =
jo.getLongOrDefault("firstTimestamp", 0);
+ row.lastTimestamp =
jo.getLongOrDefault("lastTimestamp", 0);
+ JsonArray arr = jo.getCollection("endpoints");
+ if (arr != null) {
+ for (Object e : arr) {
+ jo = (JsonObject) e;
+ row.uri = jo.getString("uri");
+ if (mask) {
+ row.uri =
URISupport.sanitizeUri(row.uri);
+ }
+ rows.add(row);
+ row = row.copy();
+ }
+ } else {
+ rows.add(row);
+ }
+ }
+ }
+ });
+
+ // sort rows
+ rows.sort(this::sortStatusRow);
+
+ if (!rows.isEmpty()) {
+ printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows,
Arrays.asList(
+ new
Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid),
+ new
Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30,
OverflowBehaviour.ELLIPSIS_RIGHT)
+ .with(r -> r.name),
+ new
Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age),
+ new Column().header("STATUS").with(this::getStatus),
+ new Column().header("TOTAL").with(r -> r.enabled ? "" +
r.counter : ""),
+ new
Column().header("SINCE").headerAlign(HorizontalAlign.CENTER)
+ .with(this::getMessageAgo),
+ new
Column().header("ENDPOINT").visible(!wideUri).dataAlign(HorizontalAlign.LEFT)
+ .maxWidth(90, OverflowBehaviour.ELLIPSIS_RIGHT)
+ .with(this::getEndpointUri),
+ new
Column().header("ENDPOINT").visible(wideUri).dataAlign(HorizontalAlign.LEFT)
+ .maxWidth(140, OverflowBehaviour.NEWLINE)
+ .with(r -> r.uri))));
+ }
+
+ return 0;
+ }
+
+ private String getStatus(StatusRow r) {
+ if (r.enabled) {
+ return "Enabled";
+ }
+ return "Disabled";
+ }
+
+ protected int sortStatusRow(StatusRow o1, StatusRow o2) {
+ String s = sort;
+ int negate = 1;
+ if (s.startsWith("-")) {
+ s = s.substring(1);
+ negate = -1;
+ }
+ switch (s) {
+ case "pid":
+ return Long.compare(Long.parseLong(o1.pid),
Long.parseLong(o2.pid)) * negate;
+ case "name":
+ return o1.name.compareToIgnoreCase(o2.name) * negate;
+ case "age":
+ return Long.compare(o1.uptime, o2.uptime) * negate;
+ default:
+ return 0;
+ }
+ }
+
+ protected Integer doDumpCall() throws Exception {
+ // setup table helper
+ tableHelper = new MessageTableHelper();
+ tableHelper.setPretty(pretty);
+ tableHelper.setLoggingColor(loggingColor);
+
+ Map<Long, Pid> pids = new LinkedHashMap<>();
+
+ // find new pids
+ updatePids(pids);
+ if (!pids.isEmpty()) {
+ // read existing received files (skip by tail/since)
+ if (find != null) {
+ findAnsi =
Ansi.ansi().fg(Ansi.Color.BLACK).bg(Ansi.Color.YELLOW).a("$0").reset().toString();
+ for (int i = 0; i < find.length; i++) {
+ String f = find[i];
+ f = Pattern.quote(f);
+ find[i] = f;
+ }
+ }
+ if (grep != null) {
+ findAnsi =
Ansi.ansi().fg(Ansi.Color.BLACK).bg(Ansi.Color.YELLOW).a("$0").reset().toString();
+ for (int i = 0; i < grep.length; i++) {
+ String f = grep[i];
+ f = Pattern.quote(f);
+ grep[i] = f;
+ }
+ }
+ Date limit = null;
+ if (since != null) {
+ long millis;
+ if (StringHelper.isDigit(since)) {
+ // is in seconds by default
+ millis = TimePatternConverter.toMilliSeconds(since) * 1000;
+ } else {
+ millis = TimePatternConverter.toMilliSeconds(since);
+ }
+ limit = new Date(System.currentTimeMillis() - millis);
+ }
+ // dump existing messages
+ if (tail != 0) {
+ tailReceiveFiles(pids, tail);
+ dumpReceiveFiles(pids, tail, limit);
+ }
+ }
+
+ if (follow) {
+ boolean waitMessage = true;
+ StopWatch watch = new StopWatch();
+ boolean more = true;
+ boolean init = true;
+ do {
+ if (pids.isEmpty()) {
+ if (waitMessage) {
+ printer().println("Waiting for messages ...");
+ waitMessage = false;
+ }
+ Thread.sleep(500);
+ updatePids(pids);
+ } else {
+ waitMessage = true;
+ if (watch.taken() > 500) {
+ // check for new messages
+ updatePids(pids);
+ watch.restart();
+ }
+ int lines = readReceiveFiles(pids);
+ if (lines > 0) {
+ more = dumpReceiveFiles(pids, 0, null);
+ init = false;
+ } else if (lines == 0) {
+ if (init) {
+ printer().println("Waiting for messages ...");
+ init = false;
+ }
+ Thread.sleep(100);
+ } else {
+ break;
+ }
+ }
+ } while (more);
+ }
+
+ return 0;
+ }
+
+ private void tailReceiveFiles(Map<Long, Pid> pids, int tail) throws
Exception {
+ for (Pid pid : pids.values()) {
+ File file = getReceiveFile(pid.pid);
+ if (file.exists() && file.length() > 0) {
+ pid.reader = new LineNumberReader(new FileReader(file));
+ String line;
+ if (tail <= 0) {
+ pid.fifo = new ArrayDeque<>();
+ } else {
+ pid.fifo = new ArrayBlockingQueue<>(tail);
+ }
+ do {
+ line = pid.reader.readLine();
+ if (line != null) {
+ while (!pid.fifo.offer(line)) {
+ pid.fifo.poll();
+ }
+ }
+ } while (line != null);
+ }
+ }
+ }
+
+ private void updatePids(Map<Long, Pid> rows) {
+ List<Long> pids = findPids(name);
+ ProcessHandle.allProcesses()
+ .filter(ph -> pids.contains(ph.pid()))
+ .forEach(ph -> {
+ JsonObject root = loadStatus(ph.pid());
+ if (root != null) {
+ Pid row = new Pid();
+ row.pid = Long.toString(ph.pid());
+ JsonObject context = (JsonObject) root.get("context");
+ if (context == null) {
+ return;
+ }
+ row.name = context.getString("name");
+ if ("CamelJBang".equals(row.name)) {
+ row.name = ProcessHelper.extractName(root, ph);
+ }
+ int len = row.name.length();
+ if (len < NAME_MIN_WIDTH) {
+ len = NAME_MIN_WIDTH;
+ }
+ if (len > NAME_MAX_WIDTH) {
+ len = NAME_MAX_WIDTH;
+ }
+ if (len > nameMaxWidth) {
+ nameMaxWidth = len;
+ }
+ if (!rows.containsKey(ph.pid())) {
+ rows.put(ph.pid(), row);
+ }
+ }
+ });
+
+ // remove pids that are no long active from the rows
+ Set<Long> remove = new HashSet<>();
+ for (long pid : rows.keySet()) {
+ if (!pids.contains(pid)) {
+ remove.add(pid);
+ }
+ }
+ for (long pid : remove) {
+ rows.remove(pid);
+ }
+ }
+
+ private int readReceiveFiles(Map<Long, Pid> pids) throws Exception {
+ int lines = 0;
+
+ for (Pid pid : pids.values()) {
+ if (pid.reader == null) {
+ File file = getReceiveFile(pid.pid);
+ if (file.exists()) {
+ pid.reader = new LineNumberReader(new FileReader(file));
+ if (tail == 0) {
+ // only read new lines so forward to end of reader
+ long size = file.length();
+ pid.reader.skip(size);
+ }
+ }
+ }
+ if (pid.reader != null) {
+ String line;
+ do {
+ try {
+ line = pid.reader.readLine();
+ if (line != null) {
+ lines++;
+ // switch fifo to be unlimited as we use it for
new messages
+ if (pid.fifo == null || pid.fifo instanceof
ArrayBlockingQueue) {
+ pid.fifo = new ArrayDeque<>();
+ }
+ pid.fifo.offer(line);
+ }
+ } catch (IOException e) {
+ // ignore
+ line = null;
+ }
+ } while (line != null);
+ }
+ }
+
+ return lines;
+ }
+
+ private List<Row> parseReceiveLine(Pid pid, String line) {
+ JsonObject root = null;
+ try {
+ root = (JsonObject) Jsoner.deserialize(line);
+ } catch (Exception e) {
+ // ignore
+ }
+ if (root != null) {
+ List<Row> rows = new ArrayList<>();
+ JsonArray arr = root.getCollection("messages");
+ if (arr != null) {
+ for (Object o : arr) {
+ Row row = new Row(pid);
+ row.pid = pid.pid;
+ row.name = pid.name;
+ JsonObject jo = (JsonObject) o;
+ row.uid = jo.getLong("uid");
+ String uri = jo.getString("endpointUri");
+ if (uri != null) {
+ row.endpoint = new JsonObject();
+ if (mask) {
+ uri = URISupport.sanitizeUri(uri);
+ }
+ row.endpoint.put("endpoint", uri);
+ row.endpoint.put("remote",
jo.getBooleanOrDefault("remoteEndpoint", true));
+ }
+ JsonObject es = jo.getMap("endpointService");
+ if (es != null) {
+ row.endpointService = es;
+ }
+ Long ts = jo.getLong("timestamp");
+ if (ts != null) {
+ row.timestamp = ts;
+ }
+ row.message = jo.getMap("message");
+ row.message.remove("exchangeId");
+ row.message.remove("exchangePattern");
+ row.message.remove("exchangeProperties");
+ if (onlyBody) {
+ row.message.remove("headers");
+ row.message.remove("messageType");
+ } else {
+ if (!showHeaders) {
+ row.message.remove("headers");
+ }
+ if (!showBody) {
+ row.message.remove("body");
+ }
+ }
+ rows.add(row);
+ }
+ }
+ return rows;
+ }
+ return null;
+ }
+
+ private boolean dumpReceiveFiles(Map<Long, Pid> pids, int tail, Date
limit) {
+ Set<String> names = new HashSet<>();
+ List<Row> rows = new ArrayList<>();
+ for (Pid pid : pids.values()) {
+ Queue<String> queue = pid.fifo;
+ if (queue != null) {
+ for (String l : queue) {
+ names.add(pid.name);
+ List<Row> parsed = parseReceiveLine(pid, l);
+ if (parsed != null && !parsed.isEmpty()) {
+ rows.addAll(parsed);
+ }
+ }
+ pid.fifo.clear();
+ }
+ }
+
+ // only sort if there are multiple Camels running
+ if (names.size() > 1) {
+ // sort lines
+ final Map<String, Long> lastTimestamp = new HashMap<>();
+ rows.sort((r1, r2) -> {
+ long t1 = r1.timestamp;
+ long t2 = r2.timestamp;
+ if (t1 == 0) {
+ t1 = lastTimestamp.get(r1.name);
+ }
+ if (t1 == 0) {
+ t1 = lastTimestamp.get(r2.name);
+ }
+ if (t1 == 0 && t2 == 0) {
+ return 0;
+ } else if (t1 == 0) {
+ return -1;
+ } else if (t2 == 0) {
+ return 1;
+ }
+ lastTimestamp.put(r1.name, t1);
+ lastTimestamp.put(r2.name, t2);
+ return Long.compare(t1, t2);
+ });
+ }
+ if (tail > 0) {
+ // cut according to tail
+ int pos = rows.size() - tail;
+ if (pos > 0) {
+ rows = rows.subList(pos, rows.size());
+ }
+ }
+
+ for (Row r : rows) {
+ printDump(r.name, pids.size(), r, limit);
+ }
+ return true;
+ }
+
+ private boolean isValidGrep(String line) {
+ if (grep == null) {
+ return true;
+ }
+ for (String g : grep) {
+ boolean m = Pattern.compile("(?i)" + g).matcher(line).find();
+ if (m) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isValidSince(Date limit, long timestamp) {
+ if (limit == null || timestamp == 0) {
+ return true;
+ }
+ Date row = new Date(timestamp);
+ return row.compareTo(limit) >= 0;
+ }
+
+ protected void printDump(String name, int pids, Row row, Date limit) {
+ if (!prefixShown) {
+ // compute whether to show prefix or not
+ if ("false".equals(prefix) || "auto".equals(prefix) && pids <= 1) {
+ name = null;
+ }
+ }
+ prefixShown = name != null;
+
+ String data = getDataAsTable(row);
+ boolean valid = isValidSince(limit, row.timestamp) &&
isValidGrep(data);
+ if (!valid) {
+ return;
+ }
+
+ String nameWithPrefix = null;
+ if (name != null) {
+ if (loggingColor) {
+ Ansi.Color color = nameColors.get(name);
+ if (color == null) {
+ // grab a new color
+ int idx = (nameColors.size() % 6) + 1;
+ color = Ansi.Color.values()[idx];
+ nameColors.put(name, color);
+ }
+ String n = String.format("%-" + nameMaxWidth + "s", name);
+ nameWithPrefix = Ansi.ansi().fg(color).a(n).a("|
").reset().toString();
+ } else {
+ nameWithPrefix = String.format("%-" + nameMaxWidth + "s",
name) + "| ";
+ }
+ printer().print(nameWithPrefix);
+ }
+ // header
+ String header = String.format("Received Message: (%s)", row.uid);
+ if (loggingColor) {
+
printer().println(Ansi.ansi().fgGreen().a(header).reset().toString());
+ } else {
+ printer().println(header);
+ }
+ String[] lines = data.split(System.lineSeparator());
+ if (lines.length > 0) {
+ for (String line : lines) {
+ if (find != null) {
+ for (String f : find) {
+ line = line.replaceAll("(?i)" + f, findAnsi);
+ }
+ }
+ if (grep != null) {
+ for (String g : grep) {
+ line = line.replaceAll("(?i)" + g, findAnsi);
+ }
+ }
+ if (nameWithPrefix != null) {
+ printer().print(nameWithPrefix);
+ }
+ printer().print(" ");
+ printer().println(line);
+ }
+ if (!compact) {
+ if (nameWithPrefix != null) {
+ printer().println(nameWithPrefix);
+ } else {
+ // empty line
+ printer().println();
+ }
+ }
+ }
+ }
+
+ private String getDataAsTable(Row r) {
+ return tableHelper.getDataAsTable(null, null, r.endpoint,
r.endpointService, r.message, null);
+ }
+
+ protected String getEndpointUri(StatusRow r) {
+ String u = r.uri;
+ if (shortUri) {
+ int pos = u.indexOf('?');
+ if (pos > 0) {
+ u = u.substring(0, pos);
+ }
+ }
+ return u;
+ }
+
+ protected String getMessageAgo(StatusRow r) {
+ if (r.lastTimestamp > 0) {
+ return TimeUtils.printSince(r.lastTimestamp);
+ }
+ return "";
+ }
+
+ private static class Pid {
+ String pid;
+ String name;
+ Queue<String> fifo;
+ LineNumberReader reader;
+ }
+
+ private static class Row {
+ Pid parent;
+ String pid;
+ String name;
+ long uid;
+ long timestamp;
+ JsonObject endpoint;
+ JsonObject endpointService;
+ JsonObject message;
+
+ Row(Pid parent) {
+ this.parent = parent;
+ }
+
+ }
+
+ private static class StatusRow {
+ String pid;
+ String name;
+ String age;
+ long uptime;
+ boolean enabled;
+ long counter;
+ long firstTimestamp;
+ long lastTimestamp;
+ String uri;
+
+ StatusRow copy() {
+ try {
+ return (StatusRow) clone();
+ } catch (CloneNotSupportedException e) {
+ return null;
+ }
+ }
+ }
+
+}