This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch rcmd in repository https://gitbox.apache.org/repos/asf/camel.git
commit faa57db7821232fb836efef06347391a35c38711 Author: Claus Ibsen <[email protected]> AuthorDate: Thu Oct 10 06:56:59 2024 +0200 CAMEL-21193: camel-jbang - Add listen command --- .../apache/camel/catalog/dev-consoles.properties | 1 + .../apache/camel/catalog/dev-consoles/receive.json | 15 ++++++ .../org/apache/camel/impl/engine/DefaultRoute.java | 4 ++ .../camel/impl/console/ReceiveDevConsole.java | 62 ++++++++++++++-------- .../core/commands/action/CamelListenAction.java | 8 +-- 5 files changed, 66 insertions(+), 24 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties index d8d1929214d..280b182c430 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties @@ -29,6 +29,7 @@ platform-http properties protocol quartz +receive reload resilience4j rest diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/receive.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/receive.json new file mode 100644 index 00000000000..978a1f23579 --- /dev/null +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/receive.json @@ -0,0 +1,15 @@ +{ + "console": { + "kind": "console", + "group": "camel", + "name": "receive", + "title": "Camel Receive", + "description": "Consume messages from endpoints", + "deprecated": false, + "javaType": "org.apache.camel.impl.console.ReceiveDevConsole", + "groupId": "org.apache.camel", + "artifactId": "camel-console", + "version": "4.9.0-SNAPSHOT" + } +} + diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java index 567443d3302..446671ba875 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.camel.CamelContext; +import org.apache.camel.Channel; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.ErrorHandlerFactory; @@ -733,6 +734,9 @@ public class DefaultRoute extends ServiceSupport implements Route { List<Processor> list = nav.next(); if (list != null) { for (Processor proc : list) { + if (proc instanceof Channel channel) { + proc = channel.getNextProcessor(); + } String id = null; if (proc instanceof IdAware idAware) { id = idAware.getId(); diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java index 2a74dd49a66..88d32b4e246 100644 --- a/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/ReceiveDevConsole.java @@ -24,17 +24,21 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import netscape.javascript.JSObject; import org.apache.camel.CamelContext; +import org.apache.camel.Channel; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; +import org.apache.camel.EndpointAware; import org.apache.camel.Exchange; +import org.apache.camel.Navigate; +import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.spi.Configurer; +import org.apache.camel.spi.IdAware; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.annotations.DevConsole; -import org.apache.camel.support.EndpointHelper; import org.apache.camel.support.MessageHelper; +import org.apache.camel.support.PatternHelper; import org.apache.camel.support.console.AbstractDevConsole; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.json.JsonArray; @@ -202,34 +206,23 @@ public class ReceiveDevConsole extends AbstractDevConsole { } protected Endpoint findMatchingEndpoint(CamelContext camelContext, String endpoint) { - // TODO: find all processors that are endpoint aware and match pattern + // TODO: Use ManagedProducerMBean as those are also toD (dynamic) + Endpoint target = null; - // is the endpoint a pattern or route id boolean scheme = endpoint.contains(":"); boolean pattern = endpoint.endsWith("*"); if (!scheme || pattern) { - if (!scheme) { + if (!scheme && !endpoint.endsWith("*")) { endpoint = endpoint + "*"; } + List<EndpointAware> match = new ArrayList<>(); for (Route route : camelContext.getRoutes()) { - // find last output - Endpoint e = route.getEndpoint(); - if (EndpointHelper.matchEndpoint(camelContext, e.getEndpointUri(), endpoint)) { - target = e; - break; - } + doFilter(endpoint, route.navigate(), match); } - if (target == null) { - // okay it may refer to a route id - for (Route route : camelContext.getRoutes()) { - String id = route.getRouteId(); - Endpoint e = route.getEndpoint(); - if (EndpointHelper.matchEndpoint(camelContext, id, endpoint)) { - target = e; - break; - } - } + // grab last matched processor that sends to an endpoint + if (!match.isEmpty()) { + target = match.get(match.size() - 1).getEndpoint(); } } else { target = camelContext.getEndpoint(endpoint); @@ -302,4 +295,31 @@ public class ReceiveDevConsole extends AbstractDevConsole { super.doStop(); stopConsumers(); } + + @SuppressWarnings("unchecked") + private void doFilter(String pattern, Navigate<Processor> nav, List<EndpointAware> match) { + List<Processor> list = nav.next(); + if (list != null) { + for (Processor proc : list) { + if (proc instanceof Channel channel) { + proc = channel.getNextProcessor(); + } + if (proc instanceof EndpointAware ea) { + String id = null; + if (proc instanceof IdAware idAware) { + id = idAware.getId(); + } + String uri = ea.getEndpoint().getEndpointUri(); + if (PatternHelper.matchPattern(id, pattern) || PatternHelper.matchPattern(uri, pattern)) { + match.add(ea); + } + } + if (proc instanceof Navigate) { + Navigate<Processor> child = (Navigate<Processor>) proc; + doFilter(pattern, child, match); + } + } + } + } + } diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelListenAction.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelListenAction.java index 30b213f9b23..b09bb50809e 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelListenAction.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelListenAction.java @@ -197,11 +197,13 @@ public class CamelListenAction extends ActionBaseCommand { } else { JsonObject root = new JsonObject(); root.put("action", "receive"); - if (endpoint != null) { - root.put("endpoint", endpoint); - } if ("start".equals(action)) { root.put("enabled", "true"); + if (endpoint != null) { + root.put("endpoint", endpoint); + } else { + root.put("endpoint", "*"); + } } else if ("stop".equals(action)) { root.put("enabled", "false"); }
