This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch rcmd in repository https://gitbox.apache.org/repos/asf/camel.git
commit 47c1d35e2259c40a2dfda64a4fe3f8f17a509d4b Author: Claus Ibsen <[email protected]> AuthorDate: Thu Oct 10 12:16:12 2024 +0200 CAMEL-21193: camel-jbang - Add listen command --- .../camel/impl/console/ReceiveDevConsole.java | 198 ++++++++++----------- .../core/commands/action/CamelListenAction.java | 36 +++- 2 files changed, 127 insertions(+), 107 deletions(-) diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java index c9a7e291cd6..79898f98aa4 100644 --- a/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java @@ -17,6 +17,7 @@ package org.apache.camel.impl.console; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Queue; @@ -29,17 +30,13 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.camel.CamelContext; -import org.apache.camel.Channel; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; -import org.apache.camel.EndpointAware; import org.apache.camel.Exchange; -import org.apache.camel.Navigate; -import org.apache.camel.Processor; import org.apache.camel.spi.Configurer; -import org.apache.camel.spi.IdAware; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.annotations.DevConsole; +import org.apache.camel.support.ExceptionHelper; import org.apache.camel.support.MessageHelper; import org.apache.camel.support.PatternHelper; import org.apache.camel.support.console.AbstractDevConsole; @@ -47,6 +44,7 @@ import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.json.JsonArray; import org.apache.camel.util.json.JsonObject; +import org.apache.camel.util.json.Jsoner; @DevConsole(name = "receive", displayName = "Camel Receive", description = "Consume messages from endpoints") @Configurer(bootstrap = true, extended = true) @@ -78,7 +76,6 @@ public class ReceiveDevConsole extends AbstractDevConsole { public static final String ENDPOINT = "endpoint"; private final List<Consumer> consumers = new ArrayList<>(); - private final AtomicBoolean enabled = new AtomicBoolean(); private final AtomicLong uuid = new AtomicLong(); private Queue<JsonObject> queue; @@ -121,6 +118,11 @@ public class ReceiveDevConsole extends AbstractDevConsole { this.queue = new LinkedBlockingQueue<>(capacity); } + @Override + protected void doStop() throws Exception { + stopConsumers(); + } + protected void stopConsumers() { for (Consumer c : consumers) { ServiceHelper.stopAndShutdownServices(c); @@ -162,7 +164,7 @@ public class ReceiveDevConsole extends AbstractDevConsole { Endpoint target = findMatchingEndpoint(getCamelContext(), pattern); if (target != null) { try { - Consumer consumer = createConsumer(getCamelContext(), target); + Consumer consumer = createConsumer(target); if (!consumers.contains(consumer)) { consumers.add(consumer); ServiceHelper.startService(consumer); @@ -181,7 +183,74 @@ public class ReceiveDevConsole extends AbstractDevConsole { return sb.toString(); } - private Consumer createConsumer(CamelContext camelContext, Endpoint target) throws Exception { + protected JsonObject doCallJson(Map<String, Object> options) { + JsonObject root = new JsonObject(); + + String dump = (String) options.get(DUMP); + if ("true".equals(dump)) { + JsonArray arr = new JsonArray(); + arr.addAll(queue); + if (removeOnDump) { + queue.clear(); + } + root.put("messages", arr); + JsonObject jo = (JsonObject) arr.get(0); + firstTimestamp = jo.getLongOrDefault("timestamp", 0); + jo = (JsonObject) arr.get(arr.size() - 1); + lastTimestamp = jo.getLongOrDefault("timestamp", 0); + return root; + } + + String enabled = (String) options.get(ENABLED); + if ("false".equals(enabled)) { + // turn off all consumers + stopConsumers(); + this.enabled.set(false); + root.put("enabled", false); + return root; + } + + String pattern = (String) options.get(ENDPOINT); + if (pattern != null) { + try { + Endpoint target = findMatchingEndpoint(getCamelContext(), pattern); + if (target != null) { + root.put("url", target.getEndpointUri()); + Consumer consumer = createConsumer(target); + if (!consumers.contains(consumer)) { + consumers.add(consumer); + ServiceHelper.startService(consumer); + } + } + this.enabled.set(true); + } catch (Exception e) { + root.put("error", Jsoner.escape(e.getMessage())); + JsonArray arr2 = new JsonArray(); + final String trace = ExceptionHelper.stackTraceToString(e); + root.put("stackTrace", arr2); + Collections.addAll(arr2, trace.split("\n")); + } + } + + root.put("enabled", this.enabled.get()); + root.put("total", uuid.get()); + root.put("firstTimestamp", firstTimestamp); + root.put("lastTimestamp", lastTimestamp); + + JsonArray arr = new JsonArray(); + for (Consumer c : consumers) { + JsonObject jo = new JsonObject(); + jo.put("uri", c.getEndpoint().toString()); + jo.put("remote", c.getEndpoint().isRemote()); + arr.add(jo); + } + if (!arr.isEmpty()) { + root.put("endpoints", arr); + } + return root; + } + + private Consumer createConsumer(Endpoint target) throws Exception { for (Consumer c : consumers) { if (c.getEndpoint() == target) { return c; @@ -209,7 +278,7 @@ public class ReceiveDevConsole extends AbstractDevConsole { queue.add(json); } - protected Endpoint findMatchingEndpoint(CamelContext camelContext, String endpoint) { + protected static Endpoint findMatchingEndpoint(CamelContext camelContext, String endpoint) { Endpoint target = null; boolean scheme = endpoint.contains(":"); boolean pattern = endpoint.endsWith("*"); @@ -217,23 +286,23 @@ public class ReceiveDevConsole extends AbstractDevConsole { if (!scheme && !endpoint.endsWith("*")) { endpoint = endpoint + "*"; } - MBeanServer mbeanServer = getCamelContext().getManagementStrategy().getManagementAgent().getMBeanServer(); + // find all producers for this camel context via JMX mbeans (this allows to find also producers created via dynamic EIPs) + MBeanServer mbeanServer = camelContext.getManagementStrategy().getManagementAgent().getMBeanServer(); if (mbeanServer != null) { try { - // find all producers for this camel context mbean String jmxDomain - = getCamelContext().getManagementStrategy().getManagementAgent().getMBeanObjectDomainName(); + = camelContext.getManagementStrategy().getManagementAgent().getMBeanObjectDomainName(); String prefix - = getCamelContext().getManagementStrategy().getManagementAgent().getIncludeHostName() ? "*/" : ""; + = camelContext.getManagementStrategy().getManagementAgent().getIncludeHostName() ? "*/" : ""; ObjectName query = ObjectName.getInstance( - jmxDomain + ":context=" + prefix + getCamelContext().getManagementName() + ",type=producers,*"); + jmxDomain + ":context=" + prefix + camelContext.getManagementName() + ",type=producers,*"); Set<ObjectName> set = mbeanServer.queryNames(query, null); if (set != null && !set.isEmpty()) { for (ObjectName on : set) { String uri = (String) mbeanServer.getAttribute(on, "EndpointUri"); if (PatternHelper.matchPattern(uri, endpoint)) { // is the endpoint able to create a consumer - target = getCamelContext().getEndpoint(uri); + target = camelContext.getEndpoint(uri); // is the target able to create a consumer org.apache.camel.spi.UriEndpoint ann = ObjectHelper.getAnnotationDeep(target, org.apache.camel.spi.UriEndpoint.class); @@ -259,100 +328,17 @@ public class ReceiveDevConsole extends AbstractDevConsole { } } else { target = camelContext.getEndpoint(endpoint); - } - return target; - } - - protected JsonObject doCallJson(Map<String, Object> options) { - JsonObject root = new JsonObject(); - - String dump = (String) options.get(DUMP); - if ("true".equals(dump)) { - JsonArray arr = new JsonArray(); - arr.addAll(queue); - if (removeOnDump) { - queue.clear(); - } - root.put("messages", arr); - JsonObject jo = (JsonObject) arr.get(0); - firstTimestamp = jo.getLongOrDefault("timestamp", 0); - jo = (JsonObject) arr.get(arr.size() - 1); - lastTimestamp = jo.getLongOrDefault("timestamp", 0); - return root; - } - - String enabled = (String) options.get(ENABLED); - if ("false".equals(enabled)) { - // turn off all consumers - stopConsumers(); - this.enabled.set(false); - root.put("enabled", false); - return root; - } - - String pattern = (String) options.get(ENDPOINT); - if (pattern != null) { - this.enabled.set(true); - Endpoint target = findMatchingEndpoint(getCamelContext(), pattern); - if (target != null) { - try { - Consumer consumer = createConsumer(getCamelContext(), target); - if (!consumers.contains(consumer)) { - consumers.add(consumer); - ServiceHelper.startService(consumer); - } - } catch (Exception e) { - // ignore - } - } - } - - root.put("enabled", this.enabled.get()); - root.put("total", uuid.get()); - root.put("firstTimestamp", firstTimestamp); - root.put("lastTimestamp", lastTimestamp); - - JsonArray arr = new JsonArray(); - for (Consumer c : consumers) { - JsonObject jo = new JsonObject(); - jo.put("uri", c.getEndpoint().toString()); - jo.put("remote", c.getEndpoint().isRemote()); - arr.add(jo); - } - root.put("endpoints", arr); - return root; - } - - @Override - protected void doStop() throws Exception { - super.doStop(); - stopConsumers(); - } - - @SuppressWarnings("unchecked") - private void doFilter(String pattern, Navigate<Processor> nav, List<EndpointAware> match) { - List<Processor> list = nav.next(); - if (list != null) { - for (Processor proc : list) { - if (proc instanceof Channel channel) { - proc = channel.getNextProcessor(); - } - if (proc instanceof EndpointAware ea) { - String id = null; - if (proc instanceof IdAware idAware) { - id = idAware.getId(); - } - String uri = ea.getEndpoint().getEndpointUri(); - if (PatternHelper.matchPattern(id, pattern) || PatternHelper.matchPattern(uri, pattern)) { - match.add(ea); - } - } - if (proc instanceof Navigate) { - Navigate<Processor> child = (Navigate<Processor>) proc; - doFilter(pattern, child, match); + // is the target able to create a consumer + org.apache.camel.spi.UriEndpoint ann + = ObjectHelper.getAnnotationDeep(target, org.apache.camel.spi.UriEndpoint.class); + if (ann != null) { + if (ann.producerOnly()) { + // skip if the endpoint cannot consume (we need to be able to consume to receive) + throw new IllegalArgumentException("Cannot consume from endpoint: " + endpoint); } } } + return target; } } diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelListenAction.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelListenAction.java index b09bb50809e..f392ea3d6a6 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelListenAction.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelListenAction.java @@ -43,8 +43,10 @@ import com.github.freva.asciitable.HorizontalAlign; import com.github.freva.asciitable.OverflowBehaviour; import org.apache.camel.catalog.impl.TimePatternConverter; import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain; +import org.apache.camel.dsl.jbang.core.common.JSonHelper; import org.apache.camel.dsl.jbang.core.common.PidNameAgeCompletionCandidates; import org.apache.camel.dsl.jbang.core.common.ProcessHelper; +import org.apache.camel.util.FileUtil; import org.apache.camel.util.IOHelper; import org.apache.camel.util.StopWatch; import org.apache.camel.util.StringHelper; @@ -195,6 +197,10 @@ public class CamelListenAction extends ActionBaseCommand { IOHelper.writeText("{}", f); } } else { + // ensure output file is deleted before executing action + File outputFile = getOutputFile(Long.toString(pid)); + FileUtil.deleteFile(outputFile); + JsonObject root = new JsonObject(); root.put("action", "receive"); if ("start".equals(action)) { @@ -209,12 +215,40 @@ public class CamelListenAction extends ActionBaseCommand { } File f = getActionFile(Long.toString(pid)); IOHelper.writeText(root.toJson(), f); + + JsonObject jo = waitForOutputFile(outputFile); + if (jo != null) { + String error = jo.getString("error"); + if (error != null) { + error = Jsoner.unescape(error); + String url = jo.getString("url"); + List<String> stackTrace = jo.getCollection("stackTrace"); + if (url != null) { + printer().println("Error starting listening on: " + url + " due to: " + error); + + } else { + printer().println("Error starting listening due to: " + error); + } + printer().println(StringHelper.fillChars('-', 120)); + printer().println(StringHelper.padString(1, 55) + "STACK-TRACE"); + printer().println(StringHelper.fillChars('-', 120)); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < stackTrace.size(); i++) { + sb.append(String.format("\t%s%n", stackTrace.get(i))); + } + printer().println(String.valueOf(sb)); + } + } } } return 0; } + protected JsonObject waitForOutputFile(File outputFile) { + return getJsonObject(outputFile); + } + protected Integer doStatusCall() { List<StatusRow> rows = new ArrayList<>(); @@ -270,7 +304,7 @@ public class CamelListenAction extends ActionBaseCommand { .with(r -> r.name), new Column().header("AGE").headerAlign(HorizontalAlign.CENTER).with(r -> r.age), new Column().header("STATUS").with(this::getStatus), - new Column().header("TOTAL").with(r -> "" + r.counter), + new Column().header("TOTAL").with(r -> r.enabled ? "" + r.counter : ""), new Column().header("SINCE").headerAlign(HorizontalAlign.CENTER) .with(this::getMessageAgo), new Column().header("ENDPOINT").visible(!wideUri).dataAlign(HorizontalAlign.LEFT)
