This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new e6f4cafa97e3 CAMEL-22439 Support ConsumerConfiguration for
NatsConsumer with Jetstream (#19541)
e6f4cafa97e3 is described below
commit e6f4cafa97e3eebe3b5e88eb83e54aaa9bb8c866
Author: Vineet Saurabh <[email protected]>
AuthorDate: Tue Oct 14 09:57:01 2025 +0200
CAMEL-22439 Support ConsumerConfiguration for NatsConsumer with Jetstream
(#19541)
Provided the below support for NatsConsumer with Jetstream.:
* Ephemeral and Durable Consumer
* Pull and Push Subscription
* ConsumerConfiguration
Co-authored-by: Vineet Saurabh <[email protected]>
---
.../org/apache/camel/catalog/components/nats.json | 15 +-
.../component/nats/NatsEndpointConfigurer.java | 18 ++
.../component/nats/NatsEndpointUriFactory.java | 5 +-
.../org/apache/camel/component/nats/nats.json | 15 +-
.../camel/component/nats/NatsConfiguration.java | 71 ++++++
.../apache/camel/component/nats/NatsConsumer.java | 64 +++--
.../endpoint/dsl/NatsEndpointBuilderFactory.java | 270 +++++++++++++++++++++
7 files changed, 425 insertions(+), 33 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json
index 19658a85f8e3..e6e9c2816d8f 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json
@@ -70,11 +70,14 @@
"requestTimeout": { "index": 25, "kind": "parameter", "displayName":
"Request Timeout", "group": "producer", "label": "producer", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 20000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Request timeout in milliseconds" },
"lazyStartProducer": { "index": 26, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
"connection": { "index": 27, "kind": "parameter", "displayName":
"Connection", "group": "advanced", "label": "advanced", "required": false,
"type": "object", "javaType": "io.nats.client.Connection", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Reference an already instantiated connection
to Nats server" },
- "headerFilterStrategy": { "index": 28, "kind": "parameter", "displayName":
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To use a custom header filter strategy." },
- "jetstreamAsync": { "index": 29, "kind": "parameter", "displayName":
"Jetstream Async", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets whether to operate JetStream requests
asynchronously." },
- "traceConnection": { "index": 30, "kind": "parameter", "displayName":
"Trace Connection", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Whether or not
connection trace messages should be printed to standard out for fine [...]
- "credentialsFilePath": { "index": 31, "kind": "parameter", "displayName":
"Credentials File Path", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "If we use useCredentialsFile to true we'll
need to set the credentialsFilePath option. It [...]
- "secure": { "index": 32, "kind": "parameter", "displayName": "Secure",
"group": "security", "label": "security", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Set secure option indicating TLS is required"
},
- "sslContextParameters": { "index": 33, "kind": "parameter", "displayName":
"Ssl Context Parameters", "group": "security", "label": "security", "required":
false, "type": "object", "javaType":
"org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To configure security using
SSLContextParameters" }
+ "consumerConfiguration": { "index": 28, "kind": "parameter",
"displayName": "Consumer Configuration", "group": "advanced", "label":
"advanced", "required": false, "type": "object", "javaType":
"io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets a custom ConsumerConfiguration object for
the JetStream co [...]
+ "durableName": { "index": 29, "kind": "parameter", "displayName": "Durable
Name", "group": "advanced", "label": "advanced", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets the name to assign to the JetStream
durable consumer. Setting this value makes the consumer durable. T [...]
+ "headerFilterStrategy": { "index": 30, "kind": "parameter", "displayName":
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To use a custom header filter strategy." },
+ "jetstreamAsync": { "index": 31, "kind": "parameter", "displayName":
"Jetstream Async", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets whether to operate JetStream requests
asynchronously." },
+ "pullSubscription": { "index": 32, "kind": "parameter", "displayName":
"Pull Subscription", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Sets the consumer
subscription type for JetStream. Set to true to use a Pull Subscr [...]
+ "traceConnection": { "index": 33, "kind": "parameter", "displayName":
"Trace Connection", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Whether or not
connection trace messages should be printed to standard out for fine [...]
+ "credentialsFilePath": { "index": 34, "kind": "parameter", "displayName":
"Credentials File Path", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "If we use useCredentialsFile to true we'll
need to set the credentialsFilePath option. It [...]
+ "secure": { "index": 35, "kind": "parameter", "displayName": "Secure",
"group": "security", "label": "security", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Set secure option indicating TLS is required"
},
+ "sslContextParameters": { "index": 36, "kind": "parameter", "displayName":
"Ssl Context Parameters", "group": "security", "label": "security", "required":
false, "type": "object", "javaType":
"org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To configure security using
SSLContextParameters" }
}
}
diff --git
a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java
b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java
index aebff95666e7..34022aaccfcd 100644
---
a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java
+++
b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java
@@ -28,8 +28,12 @@ public class NatsEndpointConfigurer extends
PropertyConfigurerSupport implements
case "connection":
target.getConfiguration().setConnection(property(camelContext,
io.nats.client.Connection.class, value)); return true;
case "connectiontimeout":
case "connectionTimeout":
target.getConfiguration().setConnectionTimeout(property(camelContext,
int.class, value)); return true;
+ case "consumerconfiguration":
+ case "consumerConfiguration":
target.getConfiguration().setConsumerConfiguration(property(camelContext,
io.nats.client.api.ConsumerConfiguration.class, value)); return true;
case "credentialsfilepath":
case "credentialsFilePath":
target.getConfiguration().setCredentialsFilePath(property(camelContext,
java.lang.String.class, value)); return true;
+ case "durablename":
+ case "durableName":
target.getConfiguration().setDurableName(property(camelContext,
java.lang.String.class, value)); return true;
case "exceptionhandler":
case "exceptionHandler":
target.setExceptionHandler(property(camelContext,
org.apache.camel.spi.ExceptionHandler.class, value)); return true;
case "exchangepattern":
@@ -63,6 +67,8 @@ public class NatsEndpointConfigurer extends
PropertyConfigurerSupport implements
case "pingInterval":
target.getConfiguration().setPingInterval(property(camelContext, int.class,
value)); return true;
case "poolsize":
case "poolSize":
target.getConfiguration().setPoolSize(property(camelContext, int.class,
value)); return true;
+ case "pullsubscription":
+ case "pullSubscription":
target.getConfiguration().setPullSubscription(property(camelContext,
boolean.class, value)); return true;
case "queuename":
case "queueName":
target.getConfiguration().setQueueName(property(camelContext,
java.lang.String.class, value)); return true;
case "reconnect":
target.getConfiguration().setReconnect(property(camelContext, boolean.class,
value)); return true;
@@ -95,8 +101,12 @@ public class NatsEndpointConfigurer extends
PropertyConfigurerSupport implements
case "connection": return io.nats.client.Connection.class;
case "connectiontimeout":
case "connectionTimeout": return int.class;
+ case "consumerconfiguration":
+ case "consumerConfiguration": return
io.nats.client.api.ConsumerConfiguration.class;
case "credentialsfilepath":
case "credentialsFilePath": return java.lang.String.class;
+ case "durablename":
+ case "durableName": return java.lang.String.class;
case "exceptionhandler":
case "exceptionHandler": return
org.apache.camel.spi.ExceptionHandler.class;
case "exchangepattern":
@@ -130,6 +140,8 @@ public class NatsEndpointConfigurer extends
PropertyConfigurerSupport implements
case "pingInterval": return int.class;
case "poolsize":
case "poolSize": return int.class;
+ case "pullsubscription":
+ case "pullSubscription": return boolean.class;
case "queuename":
case "queueName": return java.lang.String.class;
case "reconnect": return boolean.class;
@@ -163,8 +175,12 @@ public class NatsEndpointConfigurer extends
PropertyConfigurerSupport implements
case "connection": return target.getConfiguration().getConnection();
case "connectiontimeout":
case "connectionTimeout": return
target.getConfiguration().getConnectionTimeout();
+ case "consumerconfiguration":
+ case "consumerConfiguration": return
target.getConfiguration().getConsumerConfiguration();
case "credentialsfilepath":
case "credentialsFilePath": return
target.getConfiguration().getCredentialsFilePath();
+ case "durablename":
+ case "durableName": return target.getConfiguration().getDurableName();
case "exceptionhandler":
case "exceptionHandler": return target.getExceptionHandler();
case "exchangepattern":
@@ -198,6 +214,8 @@ public class NatsEndpointConfigurer extends
PropertyConfigurerSupport implements
case "pingInterval": return
target.getConfiguration().getPingInterval();
case "poolsize":
case "poolSize": return target.getConfiguration().getPoolSize();
+ case "pullsubscription":
+ case "pullSubscription": return
target.getConfiguration().isPullSubscription();
case "queuename":
case "queueName": return target.getConfiguration().getQueueName();
case "reconnect": return target.getConfiguration().isReconnect();
diff --git
a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java
b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java
index 8d55bf606005..9b44559a79da 100644
---
a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java
+++
b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java
@@ -23,11 +23,13 @@ public class NatsEndpointUriFactory extends
org.apache.camel.support.component.E
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Map<String, String> MULTI_VALUE_PREFIXES;
static {
- Set<String> props = new HashSet<>(34);
+ Set<String> props = new HashSet<>(37);
props.add("bridgeErrorHandler");
props.add("connection");
props.add("connectionTimeout");
+ props.add("consumerConfiguration");
props.add("credentialsFilePath");
+ props.add("durableName");
props.add("exceptionHandler");
props.add("exchangePattern");
props.add("flushConnection");
@@ -45,6 +47,7 @@ public class NatsEndpointUriFactory extends
org.apache.camel.support.component.E
props.add("pedantic");
props.add("pingInterval");
props.add("poolSize");
+ props.add("pullSubscription");
props.add("queueName");
props.add("reconnect");
props.add("reconnectTimeWait");
diff --git
a/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json
b/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json
index 19658a85f8e3..e6e9c2816d8f 100644
---
a/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json
+++
b/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json
@@ -70,11 +70,14 @@
"requestTimeout": { "index": 25, "kind": "parameter", "displayName":
"Request Timeout", "group": "producer", "label": "producer", "required": false,
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": 20000, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Request timeout in milliseconds" },
"lazyStartProducer": { "index": 26, "kind": "parameter", "displayName":
"Lazy Start Producer", "group": "producer (advanced)", "label":
"producer,advanced", "required": false, "type": "boolean", "javaType":
"boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether the producer should be started
lazy (on the first message). By starting lazy you can use this to allow
CamelContext and routes to startup in situations where a produ [...]
"connection": { "index": 27, "kind": "parameter", "displayName":
"Connection", "group": "advanced", "label": "advanced", "required": false,
"type": "object", "javaType": "io.nats.client.Connection", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Reference an already instantiated connection
to Nats server" },
- "headerFilterStrategy": { "index": 28, "kind": "parameter", "displayName":
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To use a custom header filter strategy." },
- "jetstreamAsync": { "index": 29, "kind": "parameter", "displayName":
"Jetstream Async", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets whether to operate JetStream requests
asynchronously." },
- "traceConnection": { "index": 30, "kind": "parameter", "displayName":
"Trace Connection", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Whether or not
connection trace messages should be printed to standard out for fine [...]
- "credentialsFilePath": { "index": 31, "kind": "parameter", "displayName":
"Credentials File Path", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "If we use useCredentialsFile to true we'll
need to set the credentialsFilePath option. It [...]
- "secure": { "index": 32, "kind": "parameter", "displayName": "Secure",
"group": "security", "label": "security", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Set secure option indicating TLS is required"
},
- "sslContextParameters": { "index": 33, "kind": "parameter", "displayName":
"Ssl Context Parameters", "group": "security", "label": "security", "required":
false, "type": "object", "javaType":
"org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To configure security using
SSLContextParameters" }
+ "consumerConfiguration": { "index": 28, "kind": "parameter",
"displayName": "Consumer Configuration", "group": "advanced", "label":
"advanced", "required": false, "type": "object", "javaType":
"io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets a custom ConsumerConfiguration object for
the JetStream co [...]
+ "durableName": { "index": 29, "kind": "parameter", "displayName": "Durable
Name", "group": "advanced", "label": "advanced", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets the name to assign to the JetStream
durable consumer. Setting this value makes the consumer durable. T [...]
+ "headerFilterStrategy": { "index": 30, "kind": "parameter", "displayName":
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required":
false, "type": "object", "javaType":
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired":
false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To use a custom header filter strategy." },
+ "jetstreamAsync": { "index": 31, "kind": "parameter", "displayName":
"Jetstream Async", "group": "advanced", "label": "advanced", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": true, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Sets whether to operate JetStream requests
asynchronously." },
+ "pullSubscription": { "index": 32, "kind": "parameter", "displayName":
"Pull Subscription", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Sets the consumer
subscription type for JetStream. Set to true to use a Pull Subscr [...]
+ "traceConnection": { "index": 33, "kind": "parameter", "displayName":
"Trace Connection", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false,
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration",
"configurationField": "configuration", "description": "Whether or not
connection trace messages should be printed to standard out for fine [...]
+ "credentialsFilePath": { "index": 34, "kind": "parameter", "displayName":
"Credentials File Path", "group": "security", "label": "security", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "If we use useCredentialsFile to true we'll
need to set the credentialsFilePath option. It [...]
+ "secure": { "index": 35, "kind": "parameter", "displayName": "Secure",
"group": "security", "label": "security", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "autowired": false, "secret":
false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "Set secure option indicating TLS is required"
},
+ "sslContextParameters": { "index": 36, "kind": "parameter", "displayName":
"Ssl Context Parameters", "group": "security", "label": "security", "required":
false, "type": "object", "javaType":
"org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false,
"autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.nats.NatsConfiguration", "configurationField":
"configuration", "description": "To configure security using
SSLContextParameters" }
}
}
diff --git
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
index 1f9d0d3fded8..d5e0e0d5fbf0 100644
---
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
+++
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
@@ -23,6 +23,7 @@ import io.nats.client.Connection;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.Options.Builder;
+import io.nats.client.api.ConsumerConfiguration;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriParam;
@@ -96,6 +97,12 @@ public class NatsConfiguration {
private String jetstreamName;
@UriParam(label = "advanced", defaultValue = "true")
private boolean jetstreamAsync = true;
+ @UriParam(label = "advanced")
+ private ConsumerConfiguration consumerConfiguration;
+ @UriParam(label = "advanced", defaultValue = "true")
+ private boolean pullSubscription = true;
+ @UriParam(label = "advanced")
+ private String durableName;
/**
* URLs to one or more NAT servers. Use comma to separate URLs when
specifying multiple servers.
@@ -503,4 +510,68 @@ public class NatsConfiguration {
public void setJetstreamAsync(boolean jetstreamAsync) {
this.jetstreamAsync = jetstreamAsync;
}
+
+ /**
+ * Allows the entire NATS JetStream Consumer Configuration object to be
provided directly.
+ * <p>
+ * This provides fine-grained control over all consumer properties (e.g.,
ack policies, max deliver, replay
+ * policies) and overrides any individual consumer-related options set
separately via the Camel URI parameters.
+ */
+ public ConsumerConfiguration getConsumerConfiguration() {
+ return consumerConfiguration;
+ }
+
+ /**
+ * Sets a custom {@code ConsumerConfiguration} object for the JetStream
consumer.
+ * <p>
+ * This is an advanced option typically used when you need to configure
properties not exposed as simple Camel URI
+ * parameters. When set, this object will be used to build the final
consumer subscription options.
+ */
+ public void setConsumerConfiguration(ConsumerConfiguration
consumerConfiguration) {
+ this.consumerConfiguration = consumerConfiguration;
+ }
+
+ /**
+ * Whether the consumer subscription type is a **Pull Subscription**
({@code true}) or a **Push Subscription**
+ * ({@code false}) when using JetStream.
+ * <p>
+ * **Pull Subscriptions** require the consumer to explicitly fetch
messages. **Push Subscriptions** automatically
+ * deliver messages to the consumer. The default setting is {@code true}
(Pull Subscription).
+ */
+ public boolean isPullSubscription() {
+ return pullSubscription;
+ }
+
+ /**
+ * Sets the consumer subscription type for JetStream.
+ * <p>
+ * Set to {@code true} to use a **Pull Subscription** (consumer explicitly
requests messages). Set to {@code false}
+ * to use a **Push Subscription** (messages are automatically delivered).
+ */
+ public void setPullSubscription(boolean pullSubscription) {
+ this.pullSubscription = pullSubscription;
+ }
+
+ /**
+ * The name to assign to the JetStream durable consumer.
+ * <p>
+ * If set, the consumer becomes **durable**, allowing subscriptions to
bind to it and resume message processing
+ * until the consumer is explicitly deleted. This name is crucial for
resilient message processing.
+ * <p>
+ * A durable name cannot contain whitespace, '.', '*', '>', path
separators (forward or backward slash), or
+ * non-printable characters.
+ */
+ public String getDurableName() {
+ return durableName;
+ }
+
+ /**
+ * Sets the name to assign to the JetStream durable consumer.
+ * <p>
+ * Setting this value makes the consumer durable. The value is used to set
the {@code durable()} field in the
+ * underlying NATS {@code ConsumerConfiguration.Builder}.
+ */
+ public void setDurableName(String durableName) {
+ this.durableName = durableName;
+ }
}
diff --git
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index 13c4bf5f4d29..5febdea9b82b 100644
---
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -28,6 +28,7 @@ import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
+import io.nats.client.PullSubscribeOptions;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.StreamConfiguration;
@@ -153,10 +154,15 @@ public class NatsConsumer extends DefaultConsumer {
private void setupJetStreamConsumer(String topic, String queueName)
throws IOException, JetStreamApiException {
String streamName = this.configuration.getJetstreamName();
- String consumerName
- = ObjectHelper.isNotEmpty(queueName) ? queueName :
"consumer-" + System.currentTimeMillis(); // Generate a default consumer name
if queueName is not provided
- LOG.debug("Setting up JetStream PUSH consumer for stream: '{}',
durable: '{}', topic: {} ", streamName,
- consumerName, this.configuration.getTopic());
+ String durableName = this.configuration.getDurableName();
+
+ String subscriptionType = this.configuration.isPullSubscription()
? "PULL" : "PUSH";
+ LOG.debug("Setting up JetStream {}/{} consumer for stream: '{}',
subject: {}",
+ subscriptionType,
+ ObjectHelper.isNotEmpty(durableName)
+ ? String.format("DURABLE, durableName: '%s'",
durableName) : "EPHEMERAL",
+ streamName,
+ this.configuration.getTopic());
JetStreamManagement jsm = connection.jetStreamManagement();
StreamConfiguration streamConfig = StreamConfiguration.builder()
@@ -165,30 +171,48 @@ public class NatsConsumer extends DefaultConsumer {
.build();
jsm.addStream(streamConfig);
- ConsumerConfiguration.Builder ccBuilder =
ConsumerConfiguration.builder()
- .durable(consumerName);
- ccBuilder.deliverSubject(null);
- ConsumerConfiguration cc = ccBuilder.build();
+ ConsumerConfiguration cc =
configuration.getConsumerConfiguration();
+ if (cc == null) {
+ ConsumerConfiguration.Builder ccBuilder =
ConsumerConfiguration.builder();
+ ccBuilder.deliverSubject(null);
+ if (durableName != null) {
+ ccBuilder.durable(durableName + "-durable");
+ }
+ cc = ccBuilder.build();
+ }
- PushSubscribeOptions pushOptions = PushSubscribeOptions.builder()
- .configuration(cc)
- .build();
+ CamelNatsMessageHandler messageHandler = new
CamelNatsMessageHandler();
+ NatsConsumer.this.dispatcher =
this.connection.createDispatcher(messageHandler);
- NatsConsumer.this.dispatcher =
this.connection.createDispatcher(new CamelNatsMessageHandler());
+ if (this.configuration.isPullSubscription()) {
+ PullSubscribeOptions pullOptions =
PullSubscribeOptions.builder()
+ .configuration(cc)
+ .build();
- NatsConsumer.this.jetStreamSubscription =
this.connection.jetStream().subscribe(
-
NatsConsumer.this.getEndpoint().getConfiguration().getTopic(),
- queueName,
- dispatcher,
- new CamelNatsMessageHandler(),
- true,
- pushOptions);
+ NatsConsumer.this.jetStreamSubscription =
this.connection.jetStream().subscribe(
+
NatsConsumer.this.getEndpoint().getConfiguration().getTopic(),
+ dispatcher,
+ messageHandler,
+ pullOptions);
+ } else {
+ PushSubscribeOptions pushOptions =
PushSubscribeOptions.builder()
+ .configuration(cc)
+ .build();
+
+ NatsConsumer.this.jetStreamSubscription =
this.connection.jetStream().subscribe(
+
NatsConsumer.this.getEndpoint().getConfiguration().getTopic(),
+ queueName,
+ dispatcher,
+ messageHandler,
+ true,
+ pushOptions);
+ }
NatsConsumer.this.setActive(true);
}
private void setupStandardNatsConsumer(String topic, String queueName,
Integer maxMessages) {
- LOG.debug("Setting up standard NATS consumer for topic: {}",
topic);
+ LOG.debug("Setting up standard NATS consumer for subject: {}",
topic);
NatsConsumer.this.dispatcher = connection.createDispatcher(new
CamelNatsMessageHandler());
if (ObjectHelper.isNotEmpty(queueName)) {
NatsConsumer.this.dispatcher =
NatsConsumer.this.dispatcher.subscribe(topic, queueName);
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
index 8d49ad500d53..47f966102fae 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
@@ -827,6 +827,60 @@ public interface NatsEndpointBuilderFactory {
doSetProperty("connection", connection);
return this;
}
+ /**
+ * Sets a custom ConsumerConfiguration object for the JetStream
+ * consumer. This is an advanced option typically used when you need to
+ * configure properties not exposed as simple Camel URI parameters.
When
+ * set, this object will be used to build the final consumer
+ * subscription options.
+ *
+ * The option is a:
+ * <code>io.nats.client.api.ConsumerConfiguration</code> type.
+ *
+ * Group: advanced
+ *
+ * @param consumerConfiguration the value to set
+ * @return the dsl builder
+ */
+ default AdvancedNatsEndpointConsumerBuilder
consumerConfiguration(io.nats.client.api.ConsumerConfiguration
consumerConfiguration) {
+ doSetProperty("consumerConfiguration", consumerConfiguration);
+ return this;
+ }
+ /**
+ * Sets a custom ConsumerConfiguration object for the JetStream
+ * consumer. This is an advanced option typically used when you need to
+ * configure properties not exposed as simple Camel URI parameters.
When
+ * set, this object will be used to build the final consumer
+ * subscription options.
+ *
+ * The option will be converted to a
+ * <code>io.nats.client.api.ConsumerConfiguration</code> type.
+ *
+ * Group: advanced
+ *
+ * @param consumerConfiguration the value to set
+ * @return the dsl builder
+ */
+ default AdvancedNatsEndpointConsumerBuilder
consumerConfiguration(String consumerConfiguration) {
+ doSetProperty("consumerConfiguration", consumerConfiguration);
+ return this;
+ }
+ /**
+ * Sets the name to assign to the JetStream durable consumer. Setting
+ * this value makes the consumer durable. The value is used to set the
+ * durable() field in the underlying NATS
ConsumerConfiguration.Builder.
+ *
+ * The option is a: <code>java.lang.String</code> type.
+ *
+ * Group: advanced
+ *
+ * @param durableName the value to set
+ * @return the dsl builder
+ */
+ default AdvancedNatsEndpointConsumerBuilder durableName(String
durableName) {
+ doSetProperty("durableName", durableName);
+ return this;
+ }
/**
* To use a custom header filter strategy.
*
@@ -887,6 +941,42 @@ public interface NatsEndpointBuilderFactory {
doSetProperty("jetstreamAsync", jetstreamAsync);
return this;
}
+ /**
+ * Sets the consumer subscription type for JetStream. Set to true to
use
+ * a Pull Subscription (consumer explicitly requests messages). Set to
+ * false to use a Push Subscription (messages are automatically
+ * delivered).
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: true
+ * Group: advanced
+ *
+ * @param pullSubscription the value to set
+ * @return the dsl builder
+ */
+ default AdvancedNatsEndpointConsumerBuilder pullSubscription(boolean
pullSubscription) {
+ doSetProperty("pullSubscription", pullSubscription);
+ return this;
+ }
+ /**
+ * Sets the consumer subscription type for JetStream. Set to true to
use
+ * a Pull Subscription (consumer explicitly requests messages). Set to
+ * false to use a Push Subscription (messages are automatically
+ * delivered).
+ *
+ * The option will be converted to a <code>boolean</code> type.
+ *
+ * Default: true
+ * Group: advanced
+ *
+ * @param pullSubscription the value to set
+ * @return the dsl builder
+ */
+ default AdvancedNatsEndpointConsumerBuilder pullSubscription(String
pullSubscription) {
+ doSetProperty("pullSubscription", pullSubscription);
+ return this;
+ }
/**
* Whether or not connection trace messages should be printed to
* standard out for fine grained debugging of connection issues.
@@ -1598,6 +1688,60 @@ public interface NatsEndpointBuilderFactory {
doSetProperty("connection", connection);
return this;
}
+ /**
+ * Sets a custom ConsumerConfiguration object for the JetStream
+ * consumer. This is an advanced option typically used when you need to
+ * configure properties not exposed as simple Camel URI parameters.
When
+ * set, this object will be used to build the final consumer
+ * subscription options.
+ *
+ * The option is a:
+ * <code>io.nats.client.api.ConsumerConfiguration</code> type.
+ *
+ * Group: advanced
+ *
+ * @param consumerConfiguration the value to set
+ * @return the dsl builder
+ */
+ default AdvancedNatsEndpointProducerBuilder
consumerConfiguration(io.nats.client.api.ConsumerConfiguration
consumerConfiguration) {
+ doSetProperty("consumerConfiguration", consumerConfiguration);
+ return this;
+ }
+ /**
+ * Sets a custom ConsumerConfiguration object for the JetStream
+ * consumer. This is an advanced option typically used when you need to
+ * configure properties not exposed as simple Camel URI parameters.
When
+ * set, this object will be used to build the final consumer
+ * subscription options.
+ *
+ * The option will be converted to a
+ * <code>io.nats.client.api.ConsumerConfiguration</code> type.
+ *
+ * Group: advanced
+ *
+ * @param consumerConfiguration the value to set
+ * @return the dsl builder
+ */
+ default AdvancedNatsEndpointProducerBuilder
consumerConfiguration(String consumerConfiguration) {
+ doSetProperty("consumerConfiguration", consumerConfiguration);
+ return this;
+ }
+ /**
+ * Sets the name to assign to the JetStream durable consumer. Setting
+ * this value makes the consumer durable. The value is used to set the
+ * durable() field in the underlying NATS
ConsumerConfiguration.Builder.
+ *
+ * The option is a: <code>java.lang.String</code> type.
+ *
+ * Group: advanced
+ *
+ * @param durableName the value to set
+ * @return the dsl builder
+ */
+ default AdvancedNatsEndpointProducerBuilder durableName(String
durableName) {
+ doSetProperty("durableName", durableName);
+ return this;
+ }
/**
* To use a custom header filter strategy.
*
@@ -1658,6 +1802,42 @@ public interface NatsEndpointBuilderFactory {
doSetProperty("jetstreamAsync", jetstreamAsync);
return this;
}
+ /**
+ * Sets the consumer subscription type for JetStream. Set to true to
use
+ * a Pull Subscription (consumer explicitly requests messages). Set to
+ * false to use a Push Subscription (messages are automatically
+ * delivered).
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: true
+ * Group: advanced
+ *
+ * @param pullSubscription the value to set
+ * @return the dsl builder
+ */
+ default AdvancedNatsEndpointProducerBuilder pullSubscription(boolean
pullSubscription) {
+ doSetProperty("pullSubscription", pullSubscription);
+ return this;
+ }
+ /**
+ * Sets the consumer subscription type for JetStream. Set to true to
use
+ * a Pull Subscription (consumer explicitly requests messages). Set to
+ * false to use a Push Subscription (messages are automatically
+ * delivered).
+ *
+ * The option will be converted to a <code>boolean</code> type.
+ *
+ * Default: true
+ * Group: advanced
+ *
+ * @param pullSubscription the value to set
+ * @return the dsl builder
+ */
+ default AdvancedNatsEndpointProducerBuilder pullSubscription(String
pullSubscription) {
+ doSetProperty("pullSubscription", pullSubscription);
+ return this;
+ }
/**
* Whether or not connection trace messages should be printed to
* standard out for fine grained debugging of connection issues.
@@ -2283,6 +2463,60 @@ public interface NatsEndpointBuilderFactory {
doSetProperty("connection", connection);
return this;
}
+ /**
+ * Sets a custom ConsumerConfiguration object for the JetStream
+ * consumer. This is an advanced option typically used when you need to
+ * configure properties not exposed as simple Camel URI parameters.
When
+ * set, this object will be used to build the final consumer
+ * subscription options.
+ *
+ * The option is a:
+ * <code>io.nats.client.api.ConsumerConfiguration</code> type.
+ *
+ * Group: advanced
+ *
+ * @param consumerConfiguration the value to set
+ * @return the dsl builder
+ */
+ default AdvancedNatsEndpointBuilder
consumerConfiguration(io.nats.client.api.ConsumerConfiguration
consumerConfiguration) {
+ doSetProperty("consumerConfiguration", consumerConfiguration);
+ return this;
+ }
+ /**
+ * Sets a custom ConsumerConfiguration object for the JetStream
+ * consumer. This is an advanced option typically used when you need to
+ * configure properties not exposed as simple Camel URI parameters.
When
+ * set, this object will be used to build the final consumer
+ * subscription options.
+ *
+ * The option will be converted to a
+ * <code>io.nats.client.api.ConsumerConfiguration</code> type.
+ *
+ * Group: advanced
+ *
+ * @param consumerConfiguration the value to set
+ * @return the dsl builder
+ */
+ default AdvancedNatsEndpointBuilder consumerConfiguration(String
consumerConfiguration) {
+ doSetProperty("consumerConfiguration", consumerConfiguration);
+ return this;
+ }
+ /**
+ * Sets the name to assign to the JetStream durable consumer. Setting
+ * this value makes the consumer durable. The value is used to set the
+ * durable() field in the underlying NATS
ConsumerConfiguration.Builder.
+ *
+ * The option is a: <code>java.lang.String</code> type.
+ *
+ * Group: advanced
+ *
+ * @param durableName the value to set
+ * @return the dsl builder
+ */
+ default AdvancedNatsEndpointBuilder durableName(String durableName) {
+ doSetProperty("durableName", durableName);
+ return this;
+ }
/**
* To use a custom header filter strategy.
*
@@ -2343,6 +2577,42 @@ public interface NatsEndpointBuilderFactory {
doSetProperty("jetstreamAsync", jetstreamAsync);
return this;
}
+ /**
+ * Sets the consumer subscription type for JetStream. Set to true to
use
+ * a Pull Subscription (consumer explicitly requests messages). Set to
+ * false to use a Push Subscription (messages are automatically
+ * delivered).
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: true
+ * Group: advanced
+ *
+ * @param pullSubscription the value to set
+ * @return the dsl builder
+ */
+ default AdvancedNatsEndpointBuilder pullSubscription(boolean
pullSubscription) {
+ doSetProperty("pullSubscription", pullSubscription);
+ return this;
+ }
+ /**
+ * Sets the consumer subscription type for JetStream. Set to true to
use
+ * a Pull Subscription (consumer explicitly requests messages). Set to
+ * false to use a Push Subscription (messages are automatically
+ * delivered).
+ *
+ * The option will be converted to a <code>boolean</code> type.
+ *
+ * Default: true
+ * Group: advanced
+ *
+ * @param pullSubscription the value to set
+ * @return the dsl builder
+ */
+ default AdvancedNatsEndpointBuilder pullSubscription(String
pullSubscription) {
+ doSetProperty("pullSubscription", pullSubscription);
+ return this;
+ }
/**
* Whether or not connection trace messages should be printed to
* standard out for fine grained debugging of connection issues.