This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new e6f4cafa97e3 CAMEL-22439 Support ConsumerConfiguration for 
NatsConsumer with Jetstream (#19541)
e6f4cafa97e3 is described below

commit e6f4cafa97e3eebe3b5e88eb83e54aaa9bb8c866
Author: Vineet Saurabh <[email protected]>
AuthorDate: Tue Oct 14 09:57:01 2025 +0200

    CAMEL-22439 Support ConsumerConfiguration for NatsConsumer with Jetstream 
(#19541)
    
    Provided the below support for NatsConsumer with Jetstream.:
    
    * Ephemeral and Durable Consumer
    * Pull and Push Subscription
    * ConsumerConfiguration
    
    Co-authored-by: Vineet Saurabh <[email protected]>
---
 .../org/apache/camel/catalog/components/nats.json  |  15 +-
 .../component/nats/NatsEndpointConfigurer.java     |  18 ++
 .../component/nats/NatsEndpointUriFactory.java     |   5 +-
 .../org/apache/camel/component/nats/nats.json      |  15 +-
 .../camel/component/nats/NatsConfiguration.java    |  71 ++++++
 .../apache/camel/component/nats/NatsConsumer.java  |  64 +++--
 .../endpoint/dsl/NatsEndpointBuilderFactory.java   | 270 +++++++++++++++++++++
 7 files changed, 425 insertions(+), 33 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json
index 19658a85f8e3..e6e9c2816d8f 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json
@@ -70,11 +70,14 @@
     "requestTimeout": { "index": 25, "kind": "parameter", "displayName": 
"Request Timeout", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 20000, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "Request timeout in milliseconds" },
     "lazyStartProducer": { "index": 26, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
     "connection": { "index": 27, "kind": "parameter", "displayName": 
"Connection", "group": "advanced", "label": "advanced", "required": false, 
"type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "Reference an already instantiated connection 
to Nats server" },
-    "headerFilterStrategy": { "index": 28, "kind": "parameter", "displayName": 
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required": 
false, "type": "object", "javaType": 
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "To use a custom header filter strategy." },
-    "jetstreamAsync": { "index": 29, "kind": "parameter", "displayName": 
"Jetstream Async", "group": "advanced", "label": "advanced", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "Sets whether to operate JetStream requests 
asynchronously." },
-    "traceConnection": { "index": 30, "kind": "parameter", "displayName": 
"Trace Connection", "group": "advanced", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, 
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration", 
"configurationField": "configuration", "description": "Whether or not 
connection trace messages should be printed to standard out for fine  [...]
-    "credentialsFilePath": { "index": 31, "kind": "parameter", "displayName": 
"Credentials File Path", "group": "security", "label": "security", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "If we use useCredentialsFile to true we'll 
need to set the credentialsFilePath option. It  [...]
-    "secure": { "index": 32, "kind": "parameter", "displayName": "Secure", 
"group": "security", "label": "security", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "Set secure option indicating TLS is required" 
},
-    "sslContextParameters": { "index": 33, "kind": "parameter", "displayName": 
"Ssl Context Parameters", "group": "security", "label": "security", "required": 
false, "type": "object", "javaType": 
"org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "To configure security using 
SSLContextParameters" }
+    "consumerConfiguration": { "index": 28, "kind": "parameter", 
"displayName": "Consumer Configuration", "group": "advanced", "label": 
"advanced", "required": false, "type": "object", "javaType": 
"io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "Sets a custom ConsumerConfiguration object for 
the JetStream co [...]
+    "durableName": { "index": 29, "kind": "parameter", "displayName": "Durable 
Name", "group": "advanced", "label": "advanced", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "Sets the name to assign to the JetStream 
durable consumer. Setting this value makes the consumer durable. T [...]
+    "headerFilterStrategy": { "index": 30, "kind": "parameter", "displayName": 
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required": 
false, "type": "object", "javaType": 
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "To use a custom header filter strategy." },
+    "jetstreamAsync": { "index": 31, "kind": "parameter", "displayName": 
"Jetstream Async", "group": "advanced", "label": "advanced", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "Sets whether to operate JetStream requests 
asynchronously." },
+    "pullSubscription": { "index": 32, "kind": "parameter", "displayName": 
"Pull Subscription", "group": "advanced", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": true, 
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration", 
"configurationField": "configuration", "description": "Sets the consumer 
subscription type for JetStream. Set to true to use a Pull Subscr [...]
+    "traceConnection": { "index": 33, "kind": "parameter", "displayName": 
"Trace Connection", "group": "advanced", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, 
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration", 
"configurationField": "configuration", "description": "Whether or not 
connection trace messages should be printed to standard out for fine  [...]
+    "credentialsFilePath": { "index": 34, "kind": "parameter", "displayName": 
"Credentials File Path", "group": "security", "label": "security", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "If we use useCredentialsFile to true we'll 
need to set the credentialsFilePath option. It  [...]
+    "secure": { "index": 35, "kind": "parameter", "displayName": "Secure", 
"group": "security", "label": "security", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "Set secure option indicating TLS is required" 
},
+    "sslContextParameters": { "index": 36, "kind": "parameter", "displayName": 
"Ssl Context Parameters", "group": "security", "label": "security", "required": 
false, "type": "object", "javaType": 
"org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "To configure security using 
SSLContextParameters" }
   }
 }
diff --git 
a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java
 
b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java
index aebff95666e7..34022aaccfcd 100644
--- 
a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java
+++ 
b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java
@@ -28,8 +28,12 @@ public class NatsEndpointConfigurer extends 
PropertyConfigurerSupport implements
         case "connection": 
target.getConfiguration().setConnection(property(camelContext, 
io.nats.client.Connection.class, value)); return true;
         case "connectiontimeout":
         case "connectionTimeout": 
target.getConfiguration().setConnectionTimeout(property(camelContext, 
int.class, value)); return true;
+        case "consumerconfiguration":
+        case "consumerConfiguration": 
target.getConfiguration().setConsumerConfiguration(property(camelContext, 
io.nats.client.api.ConsumerConfiguration.class, value)); return true;
         case "credentialsfilepath":
         case "credentialsFilePath": 
target.getConfiguration().setCredentialsFilePath(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "durablename":
+        case "durableName": 
target.getConfiguration().setDurableName(property(camelContext, 
java.lang.String.class, value)); return true;
         case "exceptionhandler":
         case "exceptionHandler": 
target.setExceptionHandler(property(camelContext, 
org.apache.camel.spi.ExceptionHandler.class, value)); return true;
         case "exchangepattern":
@@ -63,6 +67,8 @@ public class NatsEndpointConfigurer extends 
PropertyConfigurerSupport implements
         case "pingInterval": 
target.getConfiguration().setPingInterval(property(camelContext, int.class, 
value)); return true;
         case "poolsize":
         case "poolSize": 
target.getConfiguration().setPoolSize(property(camelContext, int.class, 
value)); return true;
+        case "pullsubscription":
+        case "pullSubscription": 
target.getConfiguration().setPullSubscription(property(camelContext, 
boolean.class, value)); return true;
         case "queuename":
         case "queueName": 
target.getConfiguration().setQueueName(property(camelContext, 
java.lang.String.class, value)); return true;
         case "reconnect": 
target.getConfiguration().setReconnect(property(camelContext, boolean.class, 
value)); return true;
@@ -95,8 +101,12 @@ public class NatsEndpointConfigurer extends 
PropertyConfigurerSupport implements
         case "connection": return io.nats.client.Connection.class;
         case "connectiontimeout":
         case "connectionTimeout": return int.class;
+        case "consumerconfiguration":
+        case "consumerConfiguration": return 
io.nats.client.api.ConsumerConfiguration.class;
         case "credentialsfilepath":
         case "credentialsFilePath": return java.lang.String.class;
+        case "durablename":
+        case "durableName": return java.lang.String.class;
         case "exceptionhandler":
         case "exceptionHandler": return 
org.apache.camel.spi.ExceptionHandler.class;
         case "exchangepattern":
@@ -130,6 +140,8 @@ public class NatsEndpointConfigurer extends 
PropertyConfigurerSupport implements
         case "pingInterval": return int.class;
         case "poolsize":
         case "poolSize": return int.class;
+        case "pullsubscription":
+        case "pullSubscription": return boolean.class;
         case "queuename":
         case "queueName": return java.lang.String.class;
         case "reconnect": return boolean.class;
@@ -163,8 +175,12 @@ public class NatsEndpointConfigurer extends 
PropertyConfigurerSupport implements
         case "connection": return target.getConfiguration().getConnection();
         case "connectiontimeout":
         case "connectionTimeout": return 
target.getConfiguration().getConnectionTimeout();
+        case "consumerconfiguration":
+        case "consumerConfiguration": return 
target.getConfiguration().getConsumerConfiguration();
         case "credentialsfilepath":
         case "credentialsFilePath": return 
target.getConfiguration().getCredentialsFilePath();
+        case "durablename":
+        case "durableName": return target.getConfiguration().getDurableName();
         case "exceptionhandler":
         case "exceptionHandler": return target.getExceptionHandler();
         case "exchangepattern":
@@ -198,6 +214,8 @@ public class NatsEndpointConfigurer extends 
PropertyConfigurerSupport implements
         case "pingInterval": return 
target.getConfiguration().getPingInterval();
         case "poolsize":
         case "poolSize": return target.getConfiguration().getPoolSize();
+        case "pullsubscription":
+        case "pullSubscription": return 
target.getConfiguration().isPullSubscription();
         case "queuename":
         case "queueName": return target.getConfiguration().getQueueName();
         case "reconnect": return target.getConfiguration().isReconnect();
diff --git 
a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java
 
b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java
index 8d55bf606005..9b44559a79da 100644
--- 
a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java
+++ 
b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java
@@ -23,11 +23,13 @@ public class NatsEndpointUriFactory extends 
org.apache.camel.support.component.E
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Map<String, String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(34);
+        Set<String> props = new HashSet<>(37);
         props.add("bridgeErrorHandler");
         props.add("connection");
         props.add("connectionTimeout");
+        props.add("consumerConfiguration");
         props.add("credentialsFilePath");
+        props.add("durableName");
         props.add("exceptionHandler");
         props.add("exchangePattern");
         props.add("flushConnection");
@@ -45,6 +47,7 @@ public class NatsEndpointUriFactory extends 
org.apache.camel.support.component.E
         props.add("pedantic");
         props.add("pingInterval");
         props.add("poolSize");
+        props.add("pullSubscription");
         props.add("queueName");
         props.add("reconnect");
         props.add("reconnectTimeWait");
diff --git 
a/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json
 
b/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json
index 19658a85f8e3..e6e9c2816d8f 100644
--- 
a/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json
+++ 
b/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json
@@ -70,11 +70,14 @@
     "requestTimeout": { "index": 25, "kind": "parameter", "displayName": 
"Request Timeout", "group": "producer", "label": "producer", "required": false, 
"type": "integer", "javaType": "long", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 20000, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "Request timeout in milliseconds" },
     "lazyStartProducer": { "index": 26, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
     "connection": { "index": 27, "kind": "parameter", "displayName": 
"Connection", "group": "advanced", "label": "advanced", "required": false, 
"type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "Reference an already instantiated connection 
to Nats server" },
-    "headerFilterStrategy": { "index": 28, "kind": "parameter", "displayName": 
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required": 
false, "type": "object", "javaType": 
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "To use a custom header filter strategy." },
-    "jetstreamAsync": { "index": 29, "kind": "parameter", "displayName": 
"Jetstream Async", "group": "advanced", "label": "advanced", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "Sets whether to operate JetStream requests 
asynchronously." },
-    "traceConnection": { "index": 30, "kind": "parameter", "displayName": 
"Trace Connection", "group": "advanced", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, 
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration", 
"configurationField": "configuration", "description": "Whether or not 
connection trace messages should be printed to standard out for fine  [...]
-    "credentialsFilePath": { "index": 31, "kind": "parameter", "displayName": 
"Credentials File Path", "group": "security", "label": "security", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "If we use useCredentialsFile to true we'll 
need to set the credentialsFilePath option. It  [...]
-    "secure": { "index": 32, "kind": "parameter", "displayName": "Secure", 
"group": "security", "label": "security", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "Set secure option indicating TLS is required" 
},
-    "sslContextParameters": { "index": 33, "kind": "parameter", "displayName": 
"Ssl Context Parameters", "group": "security", "label": "security", "required": 
false, "type": "object", "javaType": 
"org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "To configure security using 
SSLContextParameters" }
+    "consumerConfiguration": { "index": 28, "kind": "parameter", 
"displayName": "Consumer Configuration", "group": "advanced", "label": 
"advanced", "required": false, "type": "object", "javaType": 
"io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "Sets a custom ConsumerConfiguration object for 
the JetStream co [...]
+    "durableName": { "index": 29, "kind": "parameter", "displayName": "Durable 
Name", "group": "advanced", "label": "advanced", "required": false, "type": 
"string", "javaType": "java.lang.String", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "Sets the name to assign to the JetStream 
durable consumer. Setting this value makes the consumer durable. T [...]
+    "headerFilterStrategy": { "index": 30, "kind": "parameter", "displayName": 
"Header Filter Strategy", "group": "advanced", "label": "advanced", "required": 
false, "type": "object", "javaType": 
"org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "To use a custom header filter strategy." },
+    "jetstreamAsync": { "index": 31, "kind": "parameter", "displayName": 
"Jetstream Async", "group": "advanced", "label": "advanced", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": true, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "Sets whether to operate JetStream requests 
asynchronously." },
+    "pullSubscription": { "index": 32, "kind": "parameter", "displayName": 
"Pull Subscription", "group": "advanced", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": true, 
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration", 
"configurationField": "configuration", "description": "Sets the consumer 
subscription type for JetStream. Set to true to use a Pull Subscr [...]
+    "traceConnection": { "index": 33, "kind": "parameter", "displayName": 
"Trace Connection", "group": "advanced", "label": "advanced", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, 
"configurationClass": "org.apache.camel.component.nats.NatsConfiguration", 
"configurationField": "configuration", "description": "Whether or not 
connection trace messages should be printed to standard out for fine  [...]
+    "credentialsFilePath": { "index": 34, "kind": "parameter", "displayName": 
"Credentials File Path", "group": "security", "label": "security", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "If we use useCredentialsFile to true we'll 
need to set the credentialsFilePath option. It  [...]
+    "secure": { "index": 35, "kind": "parameter", "displayName": "Secure", 
"group": "security", "label": "security", "required": false, "type": "boolean", 
"javaType": "boolean", "deprecated": false, "autowired": false, "secret": 
false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "Set secure option indicating TLS is required" 
},
+    "sslContextParameters": { "index": 36, "kind": "parameter", "displayName": 
"Ssl Context Parameters", "group": "security", "label": "security", "required": 
false, "type": "object", "javaType": 
"org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.nats.NatsConfiguration", "configurationField": 
"configuration", "description": "To configure security using 
SSLContextParameters" }
   }
 }
diff --git 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
index 1f9d0d3fded8..d5e0e0d5fbf0 100644
--- 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
+++ 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
@@ -23,6 +23,7 @@ import io.nats.client.Connection;
 import io.nats.client.Nats;
 import io.nats.client.Options;
 import io.nats.client.Options.Builder;
+import io.nats.client.api.ConsumerConfiguration;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
@@ -96,6 +97,12 @@ public class NatsConfiguration {
     private String jetstreamName;
     @UriParam(label = "advanced", defaultValue = "true")
     private boolean jetstreamAsync = true;
+    @UriParam(label = "advanced")
+    private ConsumerConfiguration consumerConfiguration;
+    @UriParam(label = "advanced", defaultValue = "true")
+    private boolean pullSubscription = true;
+    @UriParam(label = "advanced")
+    private String durableName;
 
     /**
      * URLs to one or more NAT servers. Use comma to separate URLs when 
specifying multiple servers.
@@ -503,4 +510,68 @@ public class NatsConfiguration {
     public void setJetstreamAsync(boolean jetstreamAsync) {
         this.jetstreamAsync = jetstreamAsync;
     }
+
+    /**
+     * Allows the entire NATS JetStream Consumer Configuration object to be 
provided directly.
+     * <p>
+     * This provides fine-grained control over all consumer properties (e.g., 
ack policies, max deliver, replay
+     * policies) and overrides any individual consumer-related options set 
separately via the Camel URI parameters.
+     */
+    public ConsumerConfiguration getConsumerConfiguration() {
+        return consumerConfiguration;
+    }
+
+    /**
+     * Sets a custom {@code ConsumerConfiguration} object for the JetStream 
consumer.
+     * <p>
+     * This is an advanced option typically used when you need to configure 
properties not exposed as simple Camel URI
+     * parameters. When set, this object will be used to build the final 
consumer subscription options.
+     */
+    public void setConsumerConfiguration(ConsumerConfiguration 
consumerConfiguration) {
+        this.consumerConfiguration = consumerConfiguration;
+    }
+
+    /**
+     * Whether the consumer subscription type is a **Pull Subscription** 
({@code true}) or a **Push Subscription**
+     * ({@code false}) when using JetStream.
+     * <p>
+     * **Pull Subscriptions** require the consumer to explicitly fetch 
messages. **Push Subscriptions** automatically
+     * deliver messages to the consumer. The default setting is {@code true} 
(Pull Subscription).
+     */
+    public boolean isPullSubscription() {
+        return pullSubscription;
+    }
+
+    /**
+     * Sets the consumer subscription type for JetStream.
+     * <p>
+     * Set to {@code true} to use a **Pull Subscription** (consumer explicitly 
requests messages). Set to {@code false}
+     * to use a **Push Subscription** (messages are automatically delivered).
+     */
+    public void setPullSubscription(boolean pullSubscription) {
+        this.pullSubscription = pullSubscription;
+    }
+
+    /**
+     * The name to assign to the JetStream durable consumer.
+     * <p>
+     * If set, the consumer becomes **durable**, allowing subscriptions to 
bind to it and resume message processing
+     * until the consumer is explicitly deleted. This name is crucial for 
resilient message processing.
+     * <p>
+     * A durable name cannot contain whitespace, '.', '*', '>', path 
separators (forward or backward slash), or
+     * non-printable characters.
+     */
+    public String getDurableName() {
+        return durableName;
+    }
+
+    /**
+     * Sets the name to assign to the JetStream durable consumer.
+     * <p>
+     * Setting this value makes the consumer durable. The value is used to set 
the {@code durable()} field in the
+     * underlying NATS {@code ConsumerConfiguration.Builder}.
+     */
+    public void setDurableName(String durableName) {
+        this.durableName = durableName;
+    }
 }
diff --git 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index 13c4bf5f4d29..5febdea9b82b 100644
--- 
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ 
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -28,6 +28,7 @@ import io.nats.client.JetStreamManagement;
 import io.nats.client.JetStreamSubscription;
 import io.nats.client.Message;
 import io.nats.client.MessageHandler;
+import io.nats.client.PullSubscribeOptions;
 import io.nats.client.PushSubscribeOptions;
 import io.nats.client.api.ConsumerConfiguration;
 import io.nats.client.api.StreamConfiguration;
@@ -153,10 +154,15 @@ public class NatsConsumer extends DefaultConsumer {
 
         private void setupJetStreamConsumer(String topic, String queueName) 
throws IOException, JetStreamApiException {
             String streamName = this.configuration.getJetstreamName();
-            String consumerName
-                    = ObjectHelper.isNotEmpty(queueName) ? queueName : 
"consumer-" + System.currentTimeMillis(); // Generate a default consumer name 
if queueName is not provided
-            LOG.debug("Setting up JetStream PUSH consumer for stream: '{}', 
durable: '{}', topic: {} ", streamName,
-                    consumerName, this.configuration.getTopic());
+            String durableName = this.configuration.getDurableName();
+
+            String subscriptionType = this.configuration.isPullSubscription() 
? "PULL" : "PUSH";
+            LOG.debug("Setting up JetStream {}/{} consumer for stream: '{}', 
subject: {}",
+                    subscriptionType,
+                    ObjectHelper.isNotEmpty(durableName)
+                            ? String.format("DURABLE, durableName: '%s'", 
durableName) : "EPHEMERAL",
+                    streamName,
+                    this.configuration.getTopic());
 
             JetStreamManagement jsm = connection.jetStreamManagement();
             StreamConfiguration streamConfig = StreamConfiguration.builder()
@@ -165,30 +171,48 @@ public class NatsConsumer extends DefaultConsumer {
                     .build();
             jsm.addStream(streamConfig);
 
-            ConsumerConfiguration.Builder ccBuilder = 
ConsumerConfiguration.builder()
-                    .durable(consumerName);
-            ccBuilder.deliverSubject(null);
-            ConsumerConfiguration cc = ccBuilder.build();
+            ConsumerConfiguration cc = 
configuration.getConsumerConfiguration();
+            if (cc == null) {
+                ConsumerConfiguration.Builder ccBuilder = 
ConsumerConfiguration.builder();
+                ccBuilder.deliverSubject(null);
+                if (durableName != null) {
+                    ccBuilder.durable(durableName + "-durable");
+                }
+                cc = ccBuilder.build();
+            }
 
-            PushSubscribeOptions pushOptions = PushSubscribeOptions.builder()
-                    .configuration(cc)
-                    .build();
+            CamelNatsMessageHandler messageHandler = new 
CamelNatsMessageHandler();
+            NatsConsumer.this.dispatcher = 
this.connection.createDispatcher(messageHandler);
 
-            NatsConsumer.this.dispatcher = 
this.connection.createDispatcher(new CamelNatsMessageHandler());
+            if (this.configuration.isPullSubscription()) {
+                PullSubscribeOptions pullOptions = 
PullSubscribeOptions.builder()
+                        .configuration(cc)
+                        .build();
 
-            NatsConsumer.this.jetStreamSubscription = 
this.connection.jetStream().subscribe(
-                    
NatsConsumer.this.getEndpoint().getConfiguration().getTopic(),
-                    queueName,
-                    dispatcher,
-                    new CamelNatsMessageHandler(),
-                    true,
-                    pushOptions);
+                NatsConsumer.this.jetStreamSubscription = 
this.connection.jetStream().subscribe(
+                        
NatsConsumer.this.getEndpoint().getConfiguration().getTopic(),
+                        dispatcher,
+                        messageHandler,
+                        pullOptions);
+            } else {
+                PushSubscribeOptions pushOptions = 
PushSubscribeOptions.builder()
+                        .configuration(cc)
+                        .build();
+
+                NatsConsumer.this.jetStreamSubscription = 
this.connection.jetStream().subscribe(
+                        
NatsConsumer.this.getEndpoint().getConfiguration().getTopic(),
+                        queueName,
+                        dispatcher,
+                        messageHandler,
+                        true,
+                        pushOptions);
+            }
 
             NatsConsumer.this.setActive(true);
         }
 
         private void setupStandardNatsConsumer(String topic, String queueName, 
Integer maxMessages) {
-            LOG.debug("Setting up standard NATS consumer for topic: {}", 
topic);
+            LOG.debug("Setting up standard NATS consumer for subject: {}", 
topic);
             NatsConsumer.this.dispatcher = connection.createDispatcher(new 
CamelNatsMessageHandler());
             if (ObjectHelper.isNotEmpty(queueName)) {
                 NatsConsumer.this.dispatcher = 
NatsConsumer.this.dispatcher.subscribe(topic, queueName);
diff --git 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
index 8d49ad500d53..47f966102fae 100644
--- 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
+++ 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java
@@ -827,6 +827,60 @@ public interface NatsEndpointBuilderFactory {
             doSetProperty("connection", connection);
             return this;
         }
+        /**
+         * Sets a custom ConsumerConfiguration object for the JetStream
+         * consumer. This is an advanced option typically used when you need to
+         * configure properties not exposed as simple Camel URI parameters. 
When
+         * set, this object will be used to build the final consumer
+         * subscription options.
+         * 
+         * The option is a:
+         * <code>io.nats.client.api.ConsumerConfiguration</code> type.
+         * 
+         * Group: advanced
+         * 
+         * @param consumerConfiguration the value to set
+         * @return the dsl builder
+         */
+        default AdvancedNatsEndpointConsumerBuilder 
consumerConfiguration(io.nats.client.api.ConsumerConfiguration 
consumerConfiguration) {
+            doSetProperty("consumerConfiguration", consumerConfiguration);
+            return this;
+        }
+        /**
+         * Sets a custom ConsumerConfiguration object for the JetStream
+         * consumer. This is an advanced option typically used when you need to
+         * configure properties not exposed as simple Camel URI parameters. 
When
+         * set, this object will be used to build the final consumer
+         * subscription options.
+         * 
+         * The option will be converted to a
+         * <code>io.nats.client.api.ConsumerConfiguration</code> type.
+         * 
+         * Group: advanced
+         * 
+         * @param consumerConfiguration the value to set
+         * @return the dsl builder
+         */
+        default AdvancedNatsEndpointConsumerBuilder 
consumerConfiguration(String consumerConfiguration) {
+            doSetProperty("consumerConfiguration", consumerConfiguration);
+            return this;
+        }
+        /**
+         * Sets the name to assign to the JetStream durable consumer. Setting
+         * this value makes the consumer durable. The value is used to set the
+         * durable() field in the underlying NATS 
ConsumerConfiguration.Builder.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: advanced
+         * 
+         * @param durableName the value to set
+         * @return the dsl builder
+         */
+        default AdvancedNatsEndpointConsumerBuilder durableName(String 
durableName) {
+            doSetProperty("durableName", durableName);
+            return this;
+        }
         /**
          * To use a custom header filter strategy.
          * 
@@ -887,6 +941,42 @@ public interface NatsEndpointBuilderFactory {
             doSetProperty("jetstreamAsync", jetstreamAsync);
             return this;
         }
+        /**
+         * Sets the consumer subscription type for JetStream. Set to true to 
use
+         * a Pull Subscription (consumer explicitly requests messages). Set to
+         * false to use a Push Subscription (messages are automatically
+         * delivered).
+         * 
+         * The option is a: <code>boolean</code> type.
+         * 
+         * Default: true
+         * Group: advanced
+         * 
+         * @param pullSubscription the value to set
+         * @return the dsl builder
+         */
+        default AdvancedNatsEndpointConsumerBuilder pullSubscription(boolean 
pullSubscription) {
+            doSetProperty("pullSubscription", pullSubscription);
+            return this;
+        }
+        /**
+         * Sets the consumer subscription type for JetStream. Set to true to 
use
+         * a Pull Subscription (consumer explicitly requests messages). Set to
+         * false to use a Push Subscription (messages are automatically
+         * delivered).
+         * 
+         * The option will be converted to a <code>boolean</code> type.
+         * 
+         * Default: true
+         * Group: advanced
+         * 
+         * @param pullSubscription the value to set
+         * @return the dsl builder
+         */
+        default AdvancedNatsEndpointConsumerBuilder pullSubscription(String 
pullSubscription) {
+            doSetProperty("pullSubscription", pullSubscription);
+            return this;
+        }
         /**
          * Whether or not connection trace messages should be printed to
          * standard out for fine grained debugging of connection issues.
@@ -1598,6 +1688,60 @@ public interface NatsEndpointBuilderFactory {
             doSetProperty("connection", connection);
             return this;
         }
+        /**
+         * Sets a custom ConsumerConfiguration object for the JetStream
+         * consumer. This is an advanced option typically used when you need to
+         * configure properties not exposed as simple Camel URI parameters. 
When
+         * set, this object will be used to build the final consumer
+         * subscription options.
+         * 
+         * The option is a:
+         * <code>io.nats.client.api.ConsumerConfiguration</code> type.
+         * 
+         * Group: advanced
+         * 
+         * @param consumerConfiguration the value to set
+         * @return the dsl builder
+         */
+        default AdvancedNatsEndpointProducerBuilder 
consumerConfiguration(io.nats.client.api.ConsumerConfiguration 
consumerConfiguration) {
+            doSetProperty("consumerConfiguration", consumerConfiguration);
+            return this;
+        }
+        /**
+         * Sets a custom ConsumerConfiguration object for the JetStream
+         * consumer. This is an advanced option typically used when you need to
+         * configure properties not exposed as simple Camel URI parameters. 
When
+         * set, this object will be used to build the final consumer
+         * subscription options.
+         * 
+         * The option will be converted to a
+         * <code>io.nats.client.api.ConsumerConfiguration</code> type.
+         * 
+         * Group: advanced
+         * 
+         * @param consumerConfiguration the value to set
+         * @return the dsl builder
+         */
+        default AdvancedNatsEndpointProducerBuilder 
consumerConfiguration(String consumerConfiguration) {
+            doSetProperty("consumerConfiguration", consumerConfiguration);
+            return this;
+        }
+        /**
+         * Sets the name to assign to the JetStream durable consumer. Setting
+         * this value makes the consumer durable. The value is used to set the
+         * durable() field in the underlying NATS 
ConsumerConfiguration.Builder.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: advanced
+         * 
+         * @param durableName the value to set
+         * @return the dsl builder
+         */
+        default AdvancedNatsEndpointProducerBuilder durableName(String 
durableName) {
+            doSetProperty("durableName", durableName);
+            return this;
+        }
         /**
          * To use a custom header filter strategy.
          * 
@@ -1658,6 +1802,42 @@ public interface NatsEndpointBuilderFactory {
             doSetProperty("jetstreamAsync", jetstreamAsync);
             return this;
         }
+        /**
+         * Sets the consumer subscription type for JetStream. Set to true to 
use
+         * a Pull Subscription (consumer explicitly requests messages). Set to
+         * false to use a Push Subscription (messages are automatically
+         * delivered).
+         * 
+         * The option is a: <code>boolean</code> type.
+         * 
+         * Default: true
+         * Group: advanced
+         * 
+         * @param pullSubscription the value to set
+         * @return the dsl builder
+         */
+        default AdvancedNatsEndpointProducerBuilder pullSubscription(boolean 
pullSubscription) {
+            doSetProperty("pullSubscription", pullSubscription);
+            return this;
+        }
+        /**
+         * Sets the consumer subscription type for JetStream. Set to true to 
use
+         * a Pull Subscription (consumer explicitly requests messages). Set to
+         * false to use a Push Subscription (messages are automatically
+         * delivered).
+         * 
+         * The option will be converted to a <code>boolean</code> type.
+         * 
+         * Default: true
+         * Group: advanced
+         * 
+         * @param pullSubscription the value to set
+         * @return the dsl builder
+         */
+        default AdvancedNatsEndpointProducerBuilder pullSubscription(String 
pullSubscription) {
+            doSetProperty("pullSubscription", pullSubscription);
+            return this;
+        }
         /**
          * Whether or not connection trace messages should be printed to
          * standard out for fine grained debugging of connection issues.
@@ -2283,6 +2463,60 @@ public interface NatsEndpointBuilderFactory {
             doSetProperty("connection", connection);
             return this;
         }
+        /**
+         * Sets a custom ConsumerConfiguration object for the JetStream
+         * consumer. This is an advanced option typically used when you need to
+         * configure properties not exposed as simple Camel URI parameters. 
When
+         * set, this object will be used to build the final consumer
+         * subscription options.
+         * 
+         * The option is a:
+         * <code>io.nats.client.api.ConsumerConfiguration</code> type.
+         * 
+         * Group: advanced
+         * 
+         * @param consumerConfiguration the value to set
+         * @return the dsl builder
+         */
+        default AdvancedNatsEndpointBuilder 
consumerConfiguration(io.nats.client.api.ConsumerConfiguration 
consumerConfiguration) {
+            doSetProperty("consumerConfiguration", consumerConfiguration);
+            return this;
+        }
+        /**
+         * Sets a custom ConsumerConfiguration object for the JetStream
+         * consumer. This is an advanced option typically used when you need to
+         * configure properties not exposed as simple Camel URI parameters. 
When
+         * set, this object will be used to build the final consumer
+         * subscription options.
+         * 
+         * The option will be converted to a
+         * <code>io.nats.client.api.ConsumerConfiguration</code> type.
+         * 
+         * Group: advanced
+         * 
+         * @param consumerConfiguration the value to set
+         * @return the dsl builder
+         */
+        default AdvancedNatsEndpointBuilder consumerConfiguration(String 
consumerConfiguration) {
+            doSetProperty("consumerConfiguration", consumerConfiguration);
+            return this;
+        }
+        /**
+         * Sets the name to assign to the JetStream durable consumer. Setting
+         * this value makes the consumer durable. The value is used to set the
+         * durable() field in the underlying NATS 
ConsumerConfiguration.Builder.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: advanced
+         * 
+         * @param durableName the value to set
+         * @return the dsl builder
+         */
+        default AdvancedNatsEndpointBuilder durableName(String durableName) {
+            doSetProperty("durableName", durableName);
+            return this;
+        }
         /**
          * To use a custom header filter strategy.
          * 
@@ -2343,6 +2577,42 @@ public interface NatsEndpointBuilderFactory {
             doSetProperty("jetstreamAsync", jetstreamAsync);
             return this;
         }
+        /**
+         * Sets the consumer subscription type for JetStream. Set to true to 
use
+         * a Pull Subscription (consumer explicitly requests messages). Set to
+         * false to use a Push Subscription (messages are automatically
+         * delivered).
+         * 
+         * The option is a: <code>boolean</code> type.
+         * 
+         * Default: true
+         * Group: advanced
+         * 
+         * @param pullSubscription the value to set
+         * @return the dsl builder
+         */
+        default AdvancedNatsEndpointBuilder pullSubscription(boolean 
pullSubscription) {
+            doSetProperty("pullSubscription", pullSubscription);
+            return this;
+        }
+        /**
+         * Sets the consumer subscription type for JetStream. Set to true to 
use
+         * a Pull Subscription (consumer explicitly requests messages). Set to
+         * false to use a Push Subscription (messages are automatically
+         * delivered).
+         * 
+         * The option will be converted to a <code>boolean</code> type.
+         * 
+         * Default: true
+         * Group: advanced
+         * 
+         * @param pullSubscription the value to set
+         * @return the dsl builder
+         */
+        default AdvancedNatsEndpointBuilder pullSubscription(String 
pullSubscription) {
+            doSetProperty("pullSubscription", pullSubscription);
+            return this;
+        }
         /**
          * Whether or not connection trace messages should be printed to
          * standard out for fine grained debugging of connection issues.

Reply via email to