This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new e5f15cb6f15 CAMEL-21136: Added --poll to poll messages via jbang and
/q/send HTTP server
e5f15cb6f15 is described below
commit e5f15cb6f150bc30de73eb46443c21a7f3790e74
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Sep 5 14:36:01 2024 +0200
CAMEL-21136: Added --poll to poll messages via jbang and /q/send HTTP server
---
.../platform/http/main/MainHttpServer.java | 47 ++++++++++++++++------
.../modules/ROOT/pages/camel-jbang.adoc | 18 +++++++++
.../camel/cli/connector/LocalCliConnector.java | 42 +++++++++++++------
.../core/commands/action/CamelSendAction.java | 38 ++++++++++++++---
4 files changed, 115 insertions(+), 30 deletions(-)
diff --git
a/components/camel-platform-http-main/src/main/java/org/apache/camel/component/platform/http/main/MainHttpServer.java
b/components/camel-platform-http-main/src/main/java/org/apache/camel/component/platform/http/main/MainHttpServer.java
index fcf9ac76969..5a57c98a440 100644
---
a/components/camel-platform-http-main/src/main/java/org/apache/camel/component/platform/http/main/MainHttpServer.java
+++
b/components/camel-platform-http-main/src/main/java/org/apache/camel/component/platform/http/main/MainHttpServer.java
@@ -48,6 +48,7 @@ import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.impl.BlockingHandlerDecorator;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
+import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
@@ -105,9 +106,11 @@ public class MainHttpServer extends ServiceSupport
implements CamelContextAware,
private static final Logger LOG =
LoggerFactory.getLogger(MainHttpServer.class);
private static final int BODY_MAX_CHARS = 128 * 1024;
+ private static final int DEFAULT_POLL_TIMEOUT = 20000;
private final HeaderFilterStrategy filter = new
HttpProtocolHeaderFilterStrategy();
private ProducerTemplate producer;
+ private ConsumerTemplate consumer;
private VertxPlatformHttpServer server;
private VertxPlatformHttpRouter router;
@@ -357,6 +360,9 @@ public class MainHttpServer extends ServiceSupport
implements CamelContextAware,
if (sendEnabled && producer == null) {
producer = camelContext.createProducerTemplate();
}
+ if (sendEnabled && consumer == null) {
+ consumer = camelContext.createConsumerTemplate();
+ }
server = new VertxPlatformHttpServer(configuration);
// adding server to camel-context which will manage shutdown the
server, so we should not do this here
@@ -368,14 +374,14 @@ public class MainHttpServer extends ServiceSupport
implements CamelContextAware,
pluginRegistry.setCamelContext(getCamelContext());
getCamelContext().getCamelContextExtension().addContextPlugin(PlatformHttpPluginRegistry.class,
pluginRegistry);
}
- ServiceHelper.initService(pluginRegistry, producer);
+ ServiceHelper.initService(pluginRegistry, producer, consumer);
}
@Override
protected void doStart() throws Exception {
ObjectHelper.notNull(camelContext, "CamelContext");
- ServiceHelper.startService(server, pluginRegistry, producer);
+ ServiceHelper.startService(server, pluginRegistry, producer, consumer);
router = VertxPlatformHttpRouter.lookup(camelContext);
platformHttpComponent = camelContext.getComponent("platform-http",
PlatformHttpComponent.class);
@@ -385,7 +391,7 @@ public class MainHttpServer extends ServiceSupport
implements CamelContextAware,
@Override
protected void doShutdown() throws Exception {
- ServiceHelper.stopAndShutdownServices(pluginRegistry, producer);
+ ServiceHelper.stopAndShutdownServices(pluginRegistry, producer,
consumer);
}
private boolean pluginsEnabled() {
@@ -1235,11 +1241,14 @@ public class MainHttpServer extends ServiceSupport
implements CamelContextAware,
String endpoint = ctx.request().getHeader("endpoint");
String exchangePattern = ctx.request().getHeader("exchangePattern");
String resultType = ctx.request().getHeader("resultType");
+ String poll = ctx.request().getHeader("poll");
+ String pollTimeout = ctx.request().getHeader("pollTimeout");
final Map<String, Object> headers = new LinkedHashMap<>();
for (var entry : ctx.request().headers()) {
String k = entry.getKey();
boolean exclude
- = "endpoint".equals(k) || "exchangePattern".equals(k) ||
"resultType".equals(k) || "Accept".equals(k)
+ = "endpoint".equals(k) || "exchangePattern".equals(k) ||
"poll".equals(k)
+ || "pollTimeout".equals(k) ||
"resultType".equals(k) || "Accept".equals(k)
||
filter.applyFilterToExternalHeaders(entry.getKey(), entry.getValue(), null);
if (!exclude) {
headers.put(entry.getKey(), entry.getValue());
@@ -1301,18 +1310,24 @@ public class MainHttpServer extends ServiceSupport
implements CamelContextAware,
exchangePattern = "InOnly"; // use in-only by default
}
final ExchangePattern mep =
ExchangePattern.valueOf(exchangePattern);
- out = producer.send(target, exchange -> {
- exchange.setPattern(mep);
- exchange.getMessage().setBody(body);
- if (!headers.isEmpty()) {
- exchange.getMessage().setHeaders(headers);
- }
- });
- if (clazz != null) {
+ long timeout = pollTimeout != null ?
Long.parseLong(pollTimeout) : DEFAULT_POLL_TIMEOUT;
+ if ("true".equals(poll)) {
+ exchangePattern = "InOut"; // we want to receive the data
so enable out mode
+ out = consumer.receive(target, timeout);
+ } else {
+ out = producer.send(target, exchange -> {
+ exchange.setPattern(mep);
+ exchange.getMessage().setBody(body);
+ if (!headers.isEmpty()) {
+ exchange.getMessage().setHeaders(headers);
+ }
+ });
+ }
+ if (clazz != null && out != null) {
Object b = out.getMessage().getBody(clazz);
out.getMessage().setBody(b);
}
- } catch (ClassNotFoundException e) {
+ } catch (Exception e) {
jo.put("endpoint", target.getEndpointUri());
jo.put("exchangePattern", exchangePattern);
jo.put("timestamp", timestamp);
@@ -1353,6 +1368,12 @@ public class MainHttpServer extends ServiceSupport
implements CamelContextAware,
jo.put("timestamp", timestamp);
jo.put("elapsed", watch.taken());
jo.put("status", "success");
+ } else {
+ // timeout as there is no data
+ jo.put("endpoint", target.getEndpointUri());
+ jo.put("timestamp", timestamp);
+ jo.put("elapsed", watch.taken());
+ jo.put("status", "timeout");
}
} else {
// there is no valid endpoint
diff --git a/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc
b/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc
index 1a201c620f6..90f8e0fb1f8 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-jbang.adoc
@@ -1382,6 +1382,24 @@ TIP: See more options with `camel cmd send --help`.
The source for this example is provided on GitHub at
https://github.com/apache/camel-kamelets-examples/tree/main/jbang/mqtt)[camel-jbang
MQTT example].
+==== Poll messages via Camel
+
+*Available since Camel 4.8*
+
+The `camel cmd send` command has been improved to also _poll_ messages from
Camel. This is needed
+if you want to poll the latest messages from a Kafka topic, JMS queue, or
download a file from FTP etc.
+
+The poll uses Camel consumer to poll the message (timeout if no message
received) instead of producer.
+
+For example to poll a message from a ActiveMQ queue named cheese you can do:
+
+[source,bash]
+----
+$ camel cmd send --poll --endpoint=activemq:cheese
+----
+
+When you poll then you do not send any payload (body or headers).
+
=== Controlling local Camel integrations
To list the currently running Camel integrations you use the `ps` command:
diff --git
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
index 203520a614d..7ad6ecff054 100644
---
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
+++
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
@@ -43,12 +43,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
+import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Expression;
import org.apache.camel.NoSuchEndpointException;
-import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.Route;
import org.apache.camel.RoutesBuilder;
@@ -108,6 +108,7 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
private ScheduledExecutorService executor;
private volatile ExecutorService terminateExecutor;
private ProducerTemplate producer;
+ private ConsumerTemplate consumer;
private File lockFile;
private File statusFile;
private File actionFile;
@@ -163,6 +164,7 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
}
platformVersion = cliConnectorFactory.getRuntimeVersion();
producer = camelContext.createProducerTemplate();
+ consumer = camelContext.createConsumerTemplate();
// create thread from JDK so it is not managed by Camel because we
want the pool to be independent when
// camel is being stopped which otherwise can lead to stopping the
thread pool while the task is running
@@ -500,8 +502,12 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
StopWatch watch = new StopWatch();
long timestamp = System.currentTimeMillis();
String endpoint = root.getString("endpoint");
- String body = root.getString("body");
+ String body = root.getStringOrDefault("body", "");
String exchangePattern = root.getString("exchangePattern");
+ boolean poll = root.getBooleanOrDefault("poll", false);
+ long pollTimeout = root.getLongOrDefault("pollTimeout", 20000L);
+ // give extra time as jbang need to have response
+ pollTimeout += 5000;
Collection<JsonObject> headers = root.getCollection("headers");
if (body != null) {
InputStream is = null;
@@ -560,20 +566,23 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
}
if (target != null) {
- out = producer.send(target, new Processor() {
- @Override
- public void process(Exchange exchange) throws Exception {
+ if (poll) {
+ exchangePattern = "InOut";
+ out = consumer.receive(target, pollTimeout);
+ } else {
+ final String mep = exchangePattern;
+ out = producer.send(target, exchange -> {
exchange.getMessage().setBody(inputBody);
if (inputHeaders != null) {
exchange.getMessage().setHeaders(inputHeaders);
}
exchange.setPattern(
- "InOut".equals(exchangePattern) ?
ExchangePattern.InOut : ExchangePattern.InOnly);
- }
- });
+ "InOut".equals(mep) ? ExchangePattern.InOut :
ExchangePattern.InOnly);
+ });
+ }
IOHelper.close(is);
LOG.trace("Updating output file: {}", outputFile);
- if (out.getException() != null) {
+ if (out != null && out.getException() != null) {
JsonObject jo = new JsonObject();
jo.put("endpoint", target.getEndpointUri());
jo.put("exchangeId", out.getExchangeId());
@@ -585,7 +594,7 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
jo.put("exception",
MessageHelper.dumpExceptionAsJSonObject(out.getException()).getMap("exception"));
IOHelper.writeText(jo.toJson(), outputFile);
- } else if ("InOut".equals(exchangePattern)) {
+ } else if (out != null && "InOut".equals(exchangePattern)) {
JsonObject jo = new JsonObject();
jo.put("endpoint", target.getEndpointUri());
jo.put("exchangeId", out.getExchangeId());
@@ -597,7 +606,7 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
jo.put("message",
MessageHelper.dumpAsJSonObject(out.getMessage(), true, true, true, true, true,
true,
BODY_MAX_CHARS).getMap("message"));
IOHelper.writeText(jo.toJson(), outputFile);
- } else {
+ } else if (out != null) {
JsonObject jo = new JsonObject();
jo.put("endpoint", target.getEndpointUri());
jo.put("exchangeId", out.getExchangeId());
@@ -606,6 +615,15 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
jo.put("elapsed", watch.taken());
jo.put("status", "success");
IOHelper.writeText(jo.toJson(), outputFile);
+ } else {
+ JsonObject jo = new JsonObject();
+ jo.put("endpoint", target.getEndpointUri());
+ jo.put("exchangeId", "");
+ jo.put("exchangePattern", exchangePattern);
+ jo.put("timestamp", timestamp);
+ jo.put("elapsed", watch.taken());
+ jo.put("status", "timeout");
+ IOHelper.writeText(jo.toJson(), outputFile);
}
} else {
// there is no valid endpoint
@@ -1267,7 +1285,7 @@ public class LocalCliConnector extends ServiceSupport
implements CliConnector, C
camelContext.getExecutorServiceManager().shutdown(executor);
executor = null;
}
- ServiceHelper.stopService(producer);
+ ServiceHelper.stopService(producer, consumer);
}
private static String getPid() {
diff --git
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelSendAction.java
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelSendAction.java
index dfdb67dfc79..e9578d82661 100644
---
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelSendAction.java
+++
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/action/CamelSendAction.java
@@ -46,6 +46,10 @@ public class CamelSendAction extends ActionBaseCommand {
description = "Endpoint where to send the message (can
be uri, pattern, or refer to a route id)")
String endpoint;
+ @CommandLine.Option(names = { "--poll" },
+ description = "Poll instead of sending a message. This
can be used to receive latest message from a Kafka topic or JMS queue.")
+ boolean poll;
+
@CommandLine.Option(names = { "--reply" },
description = "Whether to expect a reply message
(InOut vs InOut messaging style)")
boolean reply;
@@ -54,7 +58,7 @@ public class CamelSendAction extends ActionBaseCommand {
description = "Saves reply message to the file with
the given name (override if exists)")
String replyFile;
- @CommandLine.Option(names = { "--body" }, required = true,
+ @CommandLine.Option(names = { "--body" },
description = "Message body to send (prefix with file:
to refer to loading message body from file)")
String body;
@@ -117,9 +121,17 @@ public class CamelSendAction extends ActionBaseCommand {
JsonObject root = new JsonObject();
root.put("action", "send");
root.put("endpoint", endpoint);
+ root.put("poll", poll);
+ // timeout cannot be too low
+ if (timeout < 5000) {
+ timeout = 5000;
+ }
+ root.put("pollTimeout", Math.min(1000, timeout - 1000)); // poll
timeout should be shorter than jbang timeout
String mep = (reply || replyFile != null) ? "InOut" : "InOnly";
root.put("exchangePattern", mep);
- root.put("body", body);
+ if (body != null) {
+ root.put("body", body);
+ }
if (headers != null) {
JsonArray arr = new JsonArray();
for (String h : headers) {
@@ -230,19 +242,35 @@ public class CamelSendAction extends ActionBaseCommand {
private String getStatus(JsonObject r) {
boolean failed = "failed".equals(r.getString("status"));
+ boolean timeout = "timeout".equals(r.getString("status"));
boolean reply = r.containsKey("message");
String status;
+ Ansi.Color c = Ansi.Color.GREEN;
if (failed) {
status = "Failed (exception)";
+ c = Ansi.Color.RED;
} else if (replyFile != null) {
- status = "Reply saved to file (success)";
+ if (poll) {
+ status = "Poll save to fill (success)";
+ } else {
+ status = "Reply save to file (success)";
+ }
} else if (reply) {
- status = "Reply received (success)";
+ if (poll) {
+ status = "Poll received (success)";
+ } else {
+ status = "Reply received (success)";
+ }
+ } else if (timeout) {
+ status = "Timeout";
+ c = Ansi.Color.YELLOW;
+ } else if (poll) {
+ status = "Poll (success)";
} else {
status = "Sent (success)";
}
if (loggingColor) {
- return Ansi.ansi().fg(failed ? Ansi.Color.RED :
Ansi.Color.GREEN).a(status).reset().toString();
+ return Ansi.ansi().fg(c).a(status).reset().toString();
} else {
return status;
}