This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch nats in repository https://gitbox.apache.org/repos/asf/camel.git
commit c470acc40da70181143e0f35016e59740d32d1e6 Author: Claus Ibsen <[email protected]> AuthorDate: Thu Feb 26 10:29:35 2026 +0100 CAMEL-23032: camel-nats - Add AckPolicy so messages can be retried when NACK due to routing failure. Fix infra-nats to run nats broker with jetstream enabled. --- .../org/apache/camel/catalog/components/nats.json | 46 ++++---- .../camel/catalog/main/important-headers.json | 1 + .../component/nats/NatsEndpointConfigurer.java | 18 +++ .../component/nats/NatsEndpointUriFactory.java | 5 +- .../org/apache/camel/component/nats/nats.json | 46 ++++---- .../camel/component/nats/NatsConfiguration.java | 50 +++++++- .../apache/camel/component/nats/NatsConstants.java | 4 + .../apache/camel/component/nats/NatsConsumer.java | 57 ++++++++- .../nats/jetstream/NatsJetstreamConsumerIT.java | 62 ++++++++++ .../NatsJetstreamConsumerRedeliveryIT.java | 89 ++++++++++++++ .../apache/camel/util/ImportantHeaderUtils.java | 1 + .../endpoint/dsl/NatsEndpointBuilderFactory.java | 128 +++++++++++++++++++++ .../services/NatsLocalContainerAuthService.java | 2 +- .../NatsLocalContainerAuthTokenService.java | 2 +- .../services/NatsLocalContainerInfraService.java | 3 +- .../services/NatsLocalContainerTLSAuthService.java | 3 +- 16 files changed, 468 insertions(+), 49 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json index 23791824fbe9..446eeba966b6 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/nats.json @@ -39,7 +39,8 @@ "CamelNatsSubject": { "index": 3, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Subject of a consumed message.", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_SUBJECT" }, "CamelNatsQueueName": { "index": 4, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Queue name of a consumed message (may be null).", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_QUEUE_NAME" }, "CamelNatsStatusCode": { "index": 5, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "important": true, "description": "Status message code", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_STATUS_CODE" }, - "CamelNatsStatusError": { "index": 6, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "important": true, "description": "Status message error message", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_STATUS_ERROR" } + "CamelNatsStatusError": { "index": 6, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "important": true, "description": "Status message error message", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_STATUS_ERROR" }, + "CamelNatsDeliveryCounter": { "index": 7, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "important": true, "description": "Number of times this message has been delivered (1 = first, 1 then message has been redelivered)", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_DELIVERY_COUNTER" } }, "properties": { "topic": { "index": 0, "kind": "path", "displayName": "Topic", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The name of topic we want to use" }, @@ -59,25 +60,28 @@ "requestCleanupInterval": { "index": 14, "kind": "parameter", "displayName": "Request Cleanup Interval", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Interval to clean up cancelled\/timed out requests." }, "servers": { "index": 15, "kind": "parameter", "displayName": "Servers", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "URLs to one or more NAT servers. Use comma to separate URLs when specifying multiple servers." }, "verbose": { "index": 16, "kind": "parameter", "displayName": "Verbose", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not running in verbose mode" }, - "maxMessages": { "index": 17, "kind": "parameter", "displayName": "Max Messages", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Stop receiving messages from a topic we are subscribing to after maxMessages" }, - "poolSize": { "index": 18, "kind": "parameter", "displayName": "Pool Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Consumer thread pool size (default is 10)" }, - "queueName": { "index": 19, "kind": "parameter", "displayName": "Queue Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The Queue name if we are using nats for a queue configuration" }, - "replyToDisabled": { "index": 20, "kind": "parameter", "displayName": "Reply To Disabled", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Can be used to turn off sending back reply message in the consumer." }, - "bridgeErrorHandler": { "index": 21, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] - "exceptionHandler": { "index": 22, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By de [...] - "exchangePattern": { "index": 23, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, - "replySubject": { "index": 24, "kind": "parameter", "displayName": "Reply Subject", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "the subject to which subscribers should send response" }, - "requestTimeout": { "index": 25, "kind": "parameter", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Request timeout in milliseconds" }, - "lazyStartProducer": { "index": 26, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] - "connection": { "index": 27, "kind": "parameter", "displayName": "Connection", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Reference an already instantiated connection to Nats server" }, - "consumerConfiguration": { "index": 28, "kind": "parameter", "displayName": "Consumer Configuration", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets a custom ConsumerConfiguration object for the JetStream co [...] - "durableName": { "index": 29, "kind": "parameter", "displayName": "Durable Name", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the name to assign to the JetStream durable consumer. Setting this value makes the consumer durable. T [...] - "headerFilterStrategy": { "index": 30, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To use a custom header filter strategy." }, - "jetstreamAsync": { "index": 31, "kind": "parameter", "displayName": "Jetstream Async", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets whether to operate JetStream requests asynchronously." }, - "pullSubscription": { "index": 32, "kind": "parameter", "displayName": "Pull Subscription", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the consumer subscription type for JetStream. Set to true to use a Pull Subscr [...] - "traceConnection": { "index": 33, "kind": "parameter", "displayName": "Trace Connection", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not connection trace messages should be printed to standard out for fine [...] - "credentialsFilePath": { "index": 34, "kind": "parameter", "displayName": "Credentials File Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "If we use useCredentialsFile to true we'll need to set the credentialsFilePath option. It [...] - "secure": { "index": 35, "kind": "parameter", "displayName": "Secure", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set secure option indicating TLS is required" }, - "sslContextParameters": { "index": 36, "kind": "parameter", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" } + "ackPolicy": { "index": 17, "kind": "parameter", "displayName": "Ack Policy", "group": "consumer", "label": "consumer", "required": false, "type": "enum", "javaType": "io.nats.client.api.AckPolicy", "enum": [ "none", "all", "explicit" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Acknowledgement mode. none = Messages [...] + "ackWait": { "index": 18, "kind": "parameter", "displayName": "Ack Wait", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "After a message is delivered to a consumer, the server waits 30 seconds (default) for an acknowledgemen [...] + "maxMessages": { "index": 19, "kind": "parameter", "displayName": "Max Messages", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Stop receiving messages from a topic we are subscribing to after maxMessages" }, + "nackWait": { "index": 20, "kind": "parameter", "displayName": "Nack Wait", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "For negative acknowledgements (NAK), redelivery is delayed by 5 seconds (default). Setting this to 0 o [...] + "poolSize": { "index": 21, "kind": "parameter", "displayName": "Pool Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Consumer thread pool size (default is 10)" }, + "queueName": { "index": 22, "kind": "parameter", "displayName": "Queue Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The Queue name if we are using nats for a queue configuration" }, + "replyToDisabled": { "index": 23, "kind": "parameter", "displayName": "Reply To Disabled", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Can be used to turn off sending back reply message in the consumer." }, + "bridgeErrorHandler": { "index": 24, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] + "exceptionHandler": { "index": 25, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By de [...] + "exchangePattern": { "index": 26, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, + "replySubject": { "index": 27, "kind": "parameter", "displayName": "Reply Subject", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "the subject to which subscribers should send response" }, + "requestTimeout": { "index": 28, "kind": "parameter", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Request timeout in milliseconds" }, + "lazyStartProducer": { "index": 29, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "connection": { "index": 30, "kind": "parameter", "displayName": "Connection", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Reference an already instantiated connection to Nats server" }, + "consumerConfiguration": { "index": 31, "kind": "parameter", "displayName": "Consumer Configuration", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets a custom ConsumerConfiguration object for the JetStream co [...] + "durableName": { "index": 32, "kind": "parameter", "displayName": "Durable Name", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the name to assign to the JetStream durable consumer. Setting this value makes the consumer durable. T [...] + "headerFilterStrategy": { "index": 33, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To use a custom header filter strategy." }, + "jetstreamAsync": { "index": 34, "kind": "parameter", "displayName": "Jetstream Async", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets whether to operate JetStream requests asynchronously." }, + "pullSubscription": { "index": 35, "kind": "parameter", "displayName": "Pull Subscription", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the consumer subscription type for JetStream. Set to true to use a Pull Subscr [...] + "traceConnection": { "index": 36, "kind": "parameter", "displayName": "Trace Connection", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not connection trace messages should be printed to standard out for fine [...] + "credentialsFilePath": { "index": 37, "kind": "parameter", "displayName": "Credentials File Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "If we use useCredentialsFile to true we'll need to set the credentialsFilePath option. It [...] + "secure": { "index": 38, "kind": "parameter", "displayName": "Secure", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set secure option indicating TLS is required" }, + "sslContextParameters": { "index": 39, "kind": "parameter", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" } } } diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/important-headers.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/important-headers.json index 0de189334617..e88ef5eb65d7 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/important-headers.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/main/important-headers.json @@ -10,6 +10,7 @@ "CamelHttpResponseCode", "CamelHttpResponseText", "CamelMqttTopic", + "CamelNatsDeliveryCounter", "CamelNatsSID", "CamelNatsStatusCode", "CamelNatsStatusError", diff --git a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java index 34022aaccfcd..38fb727f20db 100644 --- a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java +++ b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointConfigurer.java @@ -23,6 +23,10 @@ public class NatsEndpointConfigurer extends PropertyConfigurerSupport implements public boolean configure(CamelContext camelContext, Object obj, String name, Object value, boolean ignoreCase) { NatsEndpoint target = (NatsEndpoint) obj; switch (ignoreCase ? name.toLowerCase() : name) { + case "ackpolicy": + case "ackPolicy": target.getConfiguration().setAckPolicy(property(camelContext, io.nats.client.api.AckPolicy.class, value)); return true; + case "ackwait": + case "ackWait": target.getConfiguration().setAckWait(property(camelContext, long.class, value)); return true; case "bridgeerrorhandler": case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true; case "connection": target.getConfiguration().setConnection(property(camelContext, io.nats.client.Connection.class, value)); return true; @@ -58,6 +62,8 @@ public class NatsEndpointConfigurer extends PropertyConfigurerSupport implements case "maxPingsOut": target.getConfiguration().setMaxPingsOut(property(camelContext, int.class, value)); return true; case "maxreconnectattempts": case "maxReconnectAttempts": target.getConfiguration().setMaxReconnectAttempts(property(camelContext, int.class, value)); return true; + case "nackwait": + case "nackWait": target.getConfiguration().setNackWait(property(camelContext, long.class, value)); return true; case "noecho": case "noEcho": target.getConfiguration().setNoEcho(property(camelContext, boolean.class, value)); return true; case "norandomizeservers": @@ -96,6 +102,10 @@ public class NatsEndpointConfigurer extends PropertyConfigurerSupport implements @Override public Class<?> getOptionType(String name, boolean ignoreCase) { switch (ignoreCase ? name.toLowerCase() : name) { + case "ackpolicy": + case "ackPolicy": return io.nats.client.api.AckPolicy.class; + case "ackwait": + case "ackWait": return long.class; case "bridgeerrorhandler": case "bridgeErrorHandler": return boolean.class; case "connection": return io.nats.client.Connection.class; @@ -131,6 +141,8 @@ public class NatsEndpointConfigurer extends PropertyConfigurerSupport implements case "maxPingsOut": return int.class; case "maxreconnectattempts": case "maxReconnectAttempts": return int.class; + case "nackwait": + case "nackWait": return long.class; case "noecho": case "noEcho": return boolean.class; case "norandomizeservers": @@ -170,6 +182,10 @@ public class NatsEndpointConfigurer extends PropertyConfigurerSupport implements public Object getOptionValue(Object obj, String name, boolean ignoreCase) { NatsEndpoint target = (NatsEndpoint) obj; switch (ignoreCase ? name.toLowerCase() : name) { + case "ackpolicy": + case "ackPolicy": return target.getConfiguration().getAckPolicy(); + case "ackwait": + case "ackWait": return target.getConfiguration().getAckWait(); case "bridgeerrorhandler": case "bridgeErrorHandler": return target.isBridgeErrorHandler(); case "connection": return target.getConfiguration().getConnection(); @@ -205,6 +221,8 @@ public class NatsEndpointConfigurer extends PropertyConfigurerSupport implements case "maxPingsOut": return target.getConfiguration().getMaxPingsOut(); case "maxreconnectattempts": case "maxReconnectAttempts": return target.getConfiguration().getMaxReconnectAttempts(); + case "nackwait": + case "nackWait": return target.getConfiguration().getNackWait(); case "noecho": case "noEcho": return target.getConfiguration().isNoEcho(); case "norandomizeservers": diff --git a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java index 9b44559a79da..1b0f20aad58f 100644 --- a/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java +++ b/components/camel-nats/src/generated/java/org/apache/camel/component/nats/NatsEndpointUriFactory.java @@ -23,7 +23,9 @@ public class NatsEndpointUriFactory extends org.apache.camel.support.component.E private static final Set<String> SECRET_PROPERTY_NAMES; private static final Map<String, String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(37); + Set<String> props = new HashSet<>(40); + props.add("ackPolicy"); + props.add("ackWait"); props.add("bridgeErrorHandler"); props.add("connection"); props.add("connectionTimeout"); @@ -42,6 +44,7 @@ public class NatsEndpointUriFactory extends org.apache.camel.support.component.E props.add("maxMessages"); props.add("maxPingsOut"); props.add("maxReconnectAttempts"); + props.add("nackWait"); props.add("noEcho"); props.add("noRandomizeServers"); props.add("pedantic"); diff --git a/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json b/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json index 23791824fbe9..446eeba966b6 100644 --- a/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json +++ b/components/camel-nats/src/generated/resources/META-INF/org/apache/camel/component/nats/nats.json @@ -39,7 +39,8 @@ "CamelNatsSubject": { "index": 3, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Subject of a consumed message.", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_SUBJECT" }, "CamelNatsQueueName": { "index": 4, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Queue name of a consumed message (may be null).", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_QUEUE_NAME" }, "CamelNatsStatusCode": { "index": 5, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "important": true, "description": "Status message code", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_STATUS_CODE" }, - "CamelNatsStatusError": { "index": 6, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "important": true, "description": "Status message error message", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_STATUS_ERROR" } + "CamelNatsStatusError": { "index": 6, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "important": true, "description": "Status message error message", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_STATUS_ERROR" }, + "CamelNatsDeliveryCounter": { "index": 7, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "important": true, "description": "Number of times this message has been delivered (1 = first, 1 then message has been redelivered)", "constantName": "org.apache.camel.component.nats.NatsConstants#NATS_DELIVERY_COUNTER" } }, "properties": { "topic": { "index": 0, "kind": "path", "displayName": "Topic", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The name of topic we want to use" }, @@ -59,25 +60,28 @@ "requestCleanupInterval": { "index": 14, "kind": "parameter", "displayName": "Request Cleanup Interval", "group": "common", "label": "common", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Interval to clean up cancelled\/timed out requests." }, "servers": { "index": 15, "kind": "parameter", "displayName": "Servers", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "URLs to one or more NAT servers. Use comma to separate URLs when specifying multiple servers." }, "verbose": { "index": 16, "kind": "parameter", "displayName": "Verbose", "group": "common", "label": "", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not running in verbose mode" }, - "maxMessages": { "index": 17, "kind": "parameter", "displayName": "Max Messages", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Stop receiving messages from a topic we are subscribing to after maxMessages" }, - "poolSize": { "index": 18, "kind": "parameter", "displayName": "Pool Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Consumer thread pool size (default is 10)" }, - "queueName": { "index": 19, "kind": "parameter", "displayName": "Queue Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The Queue name if we are using nats for a queue configuration" }, - "replyToDisabled": { "index": 20, "kind": "parameter", "displayName": "Reply To Disabled", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Can be used to turn off sending back reply message in the consumer." }, - "bridgeErrorHandler": { "index": 21, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] - "exceptionHandler": { "index": 22, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By de [...] - "exchangePattern": { "index": 23, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, - "replySubject": { "index": 24, "kind": "parameter", "displayName": "Reply Subject", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "the subject to which subscribers should send response" }, - "requestTimeout": { "index": 25, "kind": "parameter", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Request timeout in milliseconds" }, - "lazyStartProducer": { "index": 26, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] - "connection": { "index": 27, "kind": "parameter", "displayName": "Connection", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Reference an already instantiated connection to Nats server" }, - "consumerConfiguration": { "index": 28, "kind": "parameter", "displayName": "Consumer Configuration", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets a custom ConsumerConfiguration object for the JetStream co [...] - "durableName": { "index": 29, "kind": "parameter", "displayName": "Durable Name", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the name to assign to the JetStream durable consumer. Setting this value makes the consumer durable. T [...] - "headerFilterStrategy": { "index": 30, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To use a custom header filter strategy." }, - "jetstreamAsync": { "index": 31, "kind": "parameter", "displayName": "Jetstream Async", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets whether to operate JetStream requests asynchronously." }, - "pullSubscription": { "index": 32, "kind": "parameter", "displayName": "Pull Subscription", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the consumer subscription type for JetStream. Set to true to use a Pull Subscr [...] - "traceConnection": { "index": 33, "kind": "parameter", "displayName": "Trace Connection", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not connection trace messages should be printed to standard out for fine [...] - "credentialsFilePath": { "index": 34, "kind": "parameter", "displayName": "Credentials File Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "If we use useCredentialsFile to true we'll need to set the credentialsFilePath option. It [...] - "secure": { "index": 35, "kind": "parameter", "displayName": "Secure", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set secure option indicating TLS is required" }, - "sslContextParameters": { "index": 36, "kind": "parameter", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" } + "ackPolicy": { "index": 17, "kind": "parameter", "displayName": "Ack Policy", "group": "consumer", "label": "consumer", "required": false, "type": "enum", "javaType": "io.nats.client.api.AckPolicy", "enum": [ "none", "all", "explicit" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Acknowledgement mode. none = Messages [...] + "ackWait": { "index": 18, "kind": "parameter", "displayName": "Ack Wait", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 30000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "After a message is delivered to a consumer, the server waits 30 seconds (default) for an acknowledgemen [...] + "maxMessages": { "index": 19, "kind": "parameter", "displayName": "Max Messages", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Stop receiving messages from a topic we are subscribing to after maxMessages" }, + "nackWait": { "index": 20, "kind": "parameter", "displayName": "Nack Wait", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "For negative acknowledgements (NAK), redelivery is delayed by 5 seconds (default). Setting this to 0 o [...] + "poolSize": { "index": 21, "kind": "parameter", "displayName": "Pool Size", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Consumer thread pool size (default is 10)" }, + "queueName": { "index": 22, "kind": "parameter", "displayName": "Queue Name", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "The Queue name if we are using nats for a queue configuration" }, + "replyToDisabled": { "index": 23, "kind": "parameter", "displayName": "Reply To Disabled", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Can be used to turn off sending back reply message in the consumer." }, + "bridgeErrorHandler": { "index": 24, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] + "exceptionHandler": { "index": 25, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By de [...] + "exchangePattern": { "index": 26, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, + "replySubject": { "index": 27, "kind": "parameter", "displayName": "Reply Subject", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "the subject to which subscribers should send response" }, + "requestTimeout": { "index": 28, "kind": "parameter", "displayName": "Request Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 20000, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Request timeout in milliseconds" }, + "lazyStartProducer": { "index": 29, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "connection": { "index": 30, "kind": "parameter", "displayName": "Connection", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.Connection", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Reference an already instantiated connection to Nats server" }, + "consumerConfiguration": { "index": 31, "kind": "parameter", "displayName": "Consumer Configuration", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "io.nats.client.api.ConsumerConfiguration", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets a custom ConsumerConfiguration object for the JetStream co [...] + "durableName": { "index": 32, "kind": "parameter", "displayName": "Durable Name", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the name to assign to the JetStream durable consumer. Setting this value makes the consumer durable. T [...] + "headerFilterStrategy": { "index": 33, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To use a custom header filter strategy." }, + "jetstreamAsync": { "index": 34, "kind": "parameter", "displayName": "Jetstream Async", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets whether to operate JetStream requests asynchronously." }, + "pullSubscription": { "index": 35, "kind": "parameter", "displayName": "Pull Subscription", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Sets the consumer subscription type for JetStream. Set to true to use a Pull Subscr [...] + "traceConnection": { "index": 36, "kind": "parameter", "displayName": "Trace Connection", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Whether or not connection trace messages should be printed to standard out for fine [...] + "credentialsFilePath": { "index": 37, "kind": "parameter", "displayName": "Credentials File Path", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "If we use useCredentialsFile to true we'll need to set the credentialsFilePath option. It [...] + "secure": { "index": 38, "kind": "parameter", "displayName": "Secure", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "Set secure option indicating TLS is required" }, + "sslContextParameters": { "index": 39, "kind": "parameter", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.nats.NatsConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" } } } diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java index d5e0e0d5fbf0..e4aa0f5a64c2 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java @@ -20,9 +20,9 @@ import java.security.NoSuchAlgorithmException; import java.time.Duration; import io.nats.client.Connection; -import io.nats.client.Nats; import io.nats.client.Options; import io.nats.client.Options.Builder; +import io.nats.client.api.AckPolicy; import io.nats.client.api.ConsumerConfiguration; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.Metadata; @@ -75,6 +75,12 @@ public class NatsConfiguration { private boolean replyToDisabled; @UriParam(label = "consumer") private String maxMessages; + @UriParam(label = "consumer", defaultValue = "none", enums = "none,all,explicit") + private AckPolicy ackPolicy; + @UriParam(label = "consumer", defaultValue = "30000") + private long ackWait; + @UriParam(label = "consumer", defaultValue = "5000") + private long nackWait = 5000; @UriParam(label = "consumer", defaultValue = "10") private int poolSize = 10; @UriParam(label = "common", defaultValue = "true") @@ -574,4 +580,46 @@ public class NatsConfiguration { public void setDurableName(String durableName) { this.durableName = durableName; } + + public AckPolicy getAckPolicy() { + return ackPolicy; + } + + /** + * Acknowledgement mode. + * + * none = Messages are acknowledged as soon as the server sends them. Clients do not need to ack. all = All messages + * with a sequence number less than the message acked are also acknowledged. E.g. reading a batch of messages + * 1..100. Ack on message 100 will acknowledge 1..99 as well. explicit = Each message must be acknowledged + * individually. Message can be acked out of sequence and create gaps of unacknowledged messages in the consumer. + */ + public void setAckPolicy(AckPolicy ackPolicy) { + this.ackPolicy = ackPolicy; + } + + public long getAckWait() { + return ackWait; + } + + /** + * After a message is delivered to a consumer, the server waits 30 seconds (default) for an acknowledgement. If none + * arrives (timeout), the message becomes eligible for redelivery. + */ + public void setAckWait(long ackWait) { + this.ackWait = ackWait; + } + + public long getNackWait() { + return nackWait; + } + + /** + * For negative acknowledgements (NAK), redelivery is delayed by 5 seconds (default). + * + * Setting this to 0 or negative makes the redelivery immediately. Be careful as this can cause the consumer to keep + * re-processing the same message over and over again due to intermediate error that last a while. + */ + public void setNackWait(long nackWait) { + this.nackWait = nackWait; + } } diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java index 5d2575a13488..a897652ca6d8 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java @@ -34,6 +34,10 @@ public interface NatsConstants { String NATS_STATUS_CODE = "CamelNatsStatusCode"; @Metadata(label = "consumer", description = "Status message error message", javaType = "String", important = true) String NATS_STATUS_ERROR = "CamelNatsStatusError"; + @Metadata(label = "consumer", + description = "Number of times this message has been delivered (1 = first, > 1 then message has been redelivered)", + javaType = "long", important = true) + String NATS_DELIVERY_COUNTER = "CamelNatsDeliveryCounter"; String NATS_REQUEST_TIMEOUT_THREAD_PROFILE_NAME = "CamelNatsRequestTimeoutExecutor"; } diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java index 4035868c103a..75bb40bb3162 100644 --- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java +++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java @@ -30,12 +30,16 @@ import io.nats.client.Message; import io.nats.client.MessageHandler; import io.nats.client.PullSubscribeOptions; import io.nats.client.PushSubscribeOptions; +import io.nats.client.api.AckPolicy; import io.nats.client.api.ConsumerConfiguration; import io.nats.client.api.StreamConfiguration; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.DefaultConsumer; +import org.apache.camel.support.SynchronizationAdapter; +import org.apache.camel.trait.message.MessageTrait; +import org.apache.camel.trait.message.RedeliveryTraitPayload; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -184,6 +188,12 @@ public class NatsConsumer extends DefaultConsumer { if (durableName != null) { ccBuilder.durable(durableName + "-durable"); } + if (configuration.getAckPolicy() != null) { + ccBuilder.ackPolicy(configuration.getAckPolicy()); + } + if (configuration.getAckWait() > 0) { + ccBuilder.ackWait(configuration.getAckWait()); + } cc = ccBuilder.build(); } @@ -206,12 +216,14 @@ public class NatsConsumer extends DefaultConsumer { .deliverGroup(queueName) .build(); + boolean autoAck = configuration.getAckPolicy() == null || configuration.getAckPolicy() == AckPolicy.None; + NatsConsumer.this.jetStreamSubscription = this.connection.jetStream().subscribe( NatsConsumer.this.getEndpoint().getConfiguration().getTopic(), queueName, dispatcher, messageHandler, - true, + autoAck, pushOptions); } @@ -240,6 +252,31 @@ public class NatsConsumer extends DefaultConsumer { public void onMessage(Message msg) throws InterruptedException { LOG.debug("Received Message: {}", msg); final Exchange exchange = NatsConsumer.this.createExchange(false); + // ensure we either ACK or NACK the message + boolean autoAck = configuration.getAckPolicy() == null || configuration.getAckPolicy() == AckPolicy.None; + + if (!autoAck) { + long wait = configuration.getAckWait() == 0 ? 30000 : configuration.getAckWait(); + LOG.info("Consuming from topic: {} using AckPolicy: {} (ackWait:{} nackWait:{})", + configuration.getTopic(), configuration.getAckPolicy(), wait, configuration.getNackWait()); + exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() { + @Override + public void onComplete(Exchange exchange) { + LOG.debug("ACK (delay:{})", configuration.getAckWait()); + msg.ack(); + } + + @Override + public void onFailure(Exchange exchange) { + LOG.debug("NACK (delay:{})", configuration.getNackWait()); + if (configuration.getNackWait() <= 0) { + msg.nak(); + } else { + msg.nakWithDelay(configuration.getNackWait()); + } + } + }); + } try { exchange.getIn().setBody(msg.getData()); exchange.getIn().setHeader(NatsConstants.NATS_REPLY_TO, msg.getReplyTo()); @@ -269,6 +306,11 @@ public class NatsConsumer extends DefaultConsumer { } }); } + + // redelivery information + exchange.getMessage().setPayloadForTrait(MessageTrait.REDELIVERY, + evalRedeliveryMessageTrait(msg, exchange)); + NatsConsumer.this.processor.process(exchange); // is there a reply? @@ -302,4 +344,17 @@ public class NatsConsumer extends DefaultConsumer { } } + private static RedeliveryTraitPayload evalRedeliveryMessageTrait(Message msg, Exchange exchange) { + long counter = 1; + if (msg.isJetStream()) { + counter = msg.metaData().deliveredCount(); + } + exchange.getIn().setHeader(NatsConstants.NATS_DELIVERY_COUNTER, counter); + + if (counter > 1) { + return RedeliveryTraitPayload.IS_REDELIVERY; + } + return RedeliveryTraitPayload.NON_REDELIVERY; + } + } diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/jetstream/NatsJetstreamConsumerIT.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/jetstream/NatsJetstreamConsumerIT.java new file mode 100644 index 000000000000..fb8fac836abc --- /dev/null +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/jetstream/NatsJetstreamConsumerIT.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.nats.jetstream; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.nats.NatsConstants; +import org.apache.camel.component.nats.integration.NatsITSupport; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; +import org.junit.jupiter.api.parallel.Isolated; + +@DisabledIfSystemProperty(named = "ci.env.name", matches = ".*", disabledReason = "Flaky on GitHub Actions") +@Isolated +public class NatsJetstreamConsumerIT extends NatsITSupport { + + @EndpointInject("mock:result") + protected MockEndpoint mockResultEndpoint; + + @Test + public void testConsumer() throws Exception { + mockResultEndpoint.expectedBodiesReceived("Hello World"); + mockResultEndpoint.expectedHeaderReceived(NatsConstants.NATS_SUBJECT, "mytopic"); + + template.sendBody("direct:send", "Hello World"); + + mockResultEndpoint.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + String uri + = "nats:mytopic?jetstreamEnabled=true&jetstreamName=mystream&jetstreamAsync=false&durableName=camel&pullSubscription=false"; + + from("direct:send") + // when running full test suite then send can fail due to nats server setup/tearndown + .errorHandler(defaultErrorHandler().maximumRedeliveries(5)) + .to(uri); + + from(uri).to(mockResultEndpoint); + } + }; + } +} diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/jetstream/NatsJetstreamConsumerRedeliveryIT.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/jetstream/NatsJetstreamConsumerRedeliveryIT.java new file mode 100644 index 000000000000..48d0d1f64f23 --- /dev/null +++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/jetstream/NatsJetstreamConsumerRedeliveryIT.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.nats.jetstream; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.nats.NatsConstants; +import org.apache.camel.component.nats.integration.NatsITSupport; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfSystemProperty; +import org.junit.jupiter.api.parallel.Isolated; + +@DisabledIfSystemProperty(named = "ci.env.name", matches = ".*", disabledReason = "Flaky on GitHub Actions") +@Isolated +public class NatsJetstreamConsumerRedeliveryIT extends NatsITSupport { + + @EndpointInject("mock:result") + protected MockEndpoint mockResultEndpoint; + + @EndpointInject("mock:input") + protected MockEndpoint mockInputEndpoint; + + @Test + public void testConsumer() throws Exception { + mockResultEndpoint.expectedBodiesReceived("Hello World"); + mockResultEndpoint.expectedHeaderReceived(NatsConstants.NATS_SUBJECT, "mytopic2"); + mockResultEndpoint.expectedHeaderReceived("counter", 3); + mockResultEndpoint.expectedHeaderReceived(NatsConstants.NATS_DELIVERY_COUNTER, 3); + + mockInputEndpoint.expectedMessageCount(3); + mockInputEndpoint.expectedHeaderReceived(NatsConstants.NATS_SUBJECT, "mytopic2"); + mockInputEndpoint.message(0).header(NatsConstants.NATS_DELIVERY_COUNTER).isEqualTo(1); + mockInputEndpoint.message(1).header(NatsConstants.NATS_DELIVERY_COUNTER).isEqualTo(2); + mockInputEndpoint.message(2).header(NatsConstants.NATS_DELIVERY_COUNTER).isEqualTo(3); + + template.sendBody("direct:send", "Hello World"); + + mockResultEndpoint.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + String uri + = "nats:mytopic2?jetstreamEnabled=true&jetstreamName=mystream2&jetstreamAsync=false&durableName=camel2&pullSubscription=false&ackPolicy=explicit&nackWait=10"; + + from("direct:send") + // when running full test suite then send can fail due to nats server setup/tearndown + .errorHandler(defaultErrorHandler().maximumRedeliveries(5)) + .to(uri); + + final AtomicInteger counter = new AtomicInteger(); + from(uri) + .to("mock:input") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + if (counter.incrementAndGet() < 3) { + throw new IllegalArgumentException("Forced"); + } + exchange.getMessage().setHeader("counter", counter.intValue()); + } + }) + .to(mockResultEndpoint); + } + }; + } +} diff --git a/core/camel-util/src/main/java/org/apache/camel/util/ImportantHeaderUtils.java b/core/camel-util/src/main/java/org/apache/camel/util/ImportantHeaderUtils.java index 815558277eb2..e3c243efd5a1 100644 --- a/core/camel-util/src/main/java/org/apache/camel/util/ImportantHeaderUtils.java +++ b/core/camel-util/src/main/java/org/apache/camel/util/ImportantHeaderUtils.java @@ -38,6 +38,7 @@ public final class ImportantHeaderUtils { "CamelHttpResponseCode", "CamelHttpResponseText", "CamelMqttTopic", + "CamelNatsDeliveryCounter", "CamelNatsSID", "CamelNatsStatusCode", "CamelNatsStatusError", diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java index 47f966102fae..bbebfccdb51a 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/NatsEndpointBuilderFactory.java @@ -504,6 +504,83 @@ public interface NatsEndpointBuilderFactory { doSetProperty("verbose", verbose); return this; } + /** + * Acknowledgement mode. none = Messages are acknowledged as soon as the + * server sends them. Clients do not need to ack. all = All messages + * with a sequence number less than the message acked are also + * acknowledged. E.g. reading a batch of messages 1..100. Ack on message + * 100 will acknowledge 1..99 as well. explicit = Each message must be + * acknowledged individually. Message can be acked out of sequence and + * create gaps of unacknowledged messages in the consumer. + * + * The option is a: <code>io.nats.client.api.AckPolicy</code> type. + * + * Default: none + * Group: consumer + * + * @param ackPolicy the value to set + * @return the dsl builder + */ + default NatsEndpointConsumerBuilder ackPolicy(io.nats.client.api.AckPolicy ackPolicy) { + doSetProperty("ackPolicy", ackPolicy); + return this; + } + /** + * Acknowledgement mode. none = Messages are acknowledged as soon as the + * server sends them. Clients do not need to ack. all = All messages + * with a sequence number less than the message acked are also + * acknowledged. E.g. reading a batch of messages 1..100. Ack on message + * 100 will acknowledge 1..99 as well. explicit = Each message must be + * acknowledged individually. Message can be acked out of sequence and + * create gaps of unacknowledged messages in the consumer. + * + * The option will be converted to a + * <code>io.nats.client.api.AckPolicy</code> type. + * + * Default: none + * Group: consumer + * + * @param ackPolicy the value to set + * @return the dsl builder + */ + default NatsEndpointConsumerBuilder ackPolicy(String ackPolicy) { + doSetProperty("ackPolicy", ackPolicy); + return this; + } + /** + * After a message is delivered to a consumer, the server waits 30 + * seconds (default) for an acknowledgement. If none arrives (timeout), + * the message becomes eligible for redelivery. + * + * The option is a: <code>long</code> type. + * + * Default: 30000 + * Group: consumer + * + * @param ackWait the value to set + * @return the dsl builder + */ + default NatsEndpointConsumerBuilder ackWait(long ackWait) { + doSetProperty("ackWait", ackWait); + return this; + } + /** + * After a message is delivered to a consumer, the server waits 30 + * seconds (default) for an acknowledgement. If none arrives (timeout), + * the message becomes eligible for redelivery. + * + * The option will be converted to a <code>long</code> type. + * + * Default: 30000 + * Group: consumer + * + * @param ackWait the value to set + * @return the dsl builder + */ + default NatsEndpointConsumerBuilder ackWait(String ackWait) { + doSetProperty("ackWait", ackWait); + return this; + } /** * Stop receiving messages from a topic we are subscribing to after * maxMessages. @@ -519,6 +596,44 @@ public interface NatsEndpointBuilderFactory { doSetProperty("maxMessages", maxMessages); return this; } + /** + * For negative acknowledgements (NAK), redelivery is delayed by 5 + * seconds (default). Setting this to 0 or negative makes the redelivery + * immediately. Be careful as this can cause the consumer to keep + * re-processing the same message over and over again due to + * intermediate error that last a while. + * + * The option is a: <code>long</code> type. + * + * Default: 5000 + * Group: consumer + * + * @param nackWait the value to set + * @return the dsl builder + */ + default NatsEndpointConsumerBuilder nackWait(long nackWait) { + doSetProperty("nackWait", nackWait); + return this; + } + /** + * For negative acknowledgements (NAK), redelivery is delayed by 5 + * seconds (default). Setting this to 0 or negative makes the redelivery + * immediately. Be careful as this can cause the consumer to keep + * re-processing the same message over and over again due to + * intermediate error that last a while. + * + * The option will be converted to a <code>long</code> type. + * + * Default: 5000 + * Group: consumer + * + * @param nackWait the value to set + * @return the dsl builder + */ + default NatsEndpointConsumerBuilder nackWait(String nackWait) { + doSetProperty("nackWait", nackWait); + return this; + } /** * Consumer thread pool size (default is 10). * @@ -2797,6 +2912,19 @@ public interface NatsEndpointBuilderFactory { public String natsStatusError() { return "CamelNatsStatusError"; } + /** + * Number of times this message has been delivered (1 = first, 1 then + * message has been redelivered). + * + * The option is a: {@code long} type. + * + * Group: consumer + * + * @return the name of the header {@code NatsDeliveryCounter}. + */ + public String natsDeliveryCounter() { + return "CamelNatsDeliveryCounter"; + } } static NatsEndpointBuilder endpointBuilder(String componentName, String path) { class NatsEndpointBuilderImpl extends AbstractEndpointBuilder implements NatsEndpointBuilder, AdvancedNatsEndpointBuilder { diff --git a/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerAuthService.java b/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerAuthService.java index fd384fd7ff64..1c8cd672dd48 100644 --- a/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerAuthService.java +++ b/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerAuthService.java @@ -29,7 +29,7 @@ public class NatsLocalContainerAuthService extends NatsLocalContainerService imp container .waitingFor(Wait.forLogMessage(".*Server.*is.*ready.*", 1)) - .withCommand("-DV", "--user", USERNAME, "--pass", PASSWORD); + .withCommand("--jetstream", "-DV", "--user", USERNAME, "--pass", PASSWORD); return container; } diff --git a/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerAuthTokenService.java b/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerAuthTokenService.java index 4a2c18f8e30d..e1d8a3bc45d0 100644 --- a/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerAuthTokenService.java +++ b/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerAuthTokenService.java @@ -28,7 +28,7 @@ public class NatsLocalContainerAuthTokenService extends NatsLocalContainerServic container .waitingFor(Wait.forLogMessage(".*Server.*is.*ready.*", 1)) - .withCommand("-DV", "-auth", TOKEN); + .withCommand("--jetstream", "-DV", "-auth", TOKEN); return container; } diff --git a/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerInfraService.java b/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerInfraService.java index 2158a5aa1759..43dde14c2fe8 100644 --- a/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerInfraService.java +++ b/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerInfraService.java @@ -54,7 +54,8 @@ public class NatsLocalContainerInfraService implements NatsInfraService, Contain super(imageName); withNetworkAliases(containerName) - .waitingFor(Wait.forLogMessage(".*Listening.*for.*route.*connections.*", 1)); + .waitingFor(Wait.forLogMessage(".*Server.*is.*ready.*", 1)) + .withCommand("--jetstream"); ContainerEnvironmentUtil.configurePort(this, fixedPort, PORT); } diff --git a/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerTLSAuthService.java b/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerTLSAuthService.java index e6c4bad5da3b..4e165c874db4 100644 --- a/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerTLSAuthService.java +++ b/test-infra/camel-test-infra-nats/src/main/java/org/apache/camel/test/infra/nats/services/NatsLocalContainerTLSAuthService.java @@ -29,7 +29,8 @@ public class NatsLocalContainerTLSAuthService extends NatsLocalContainerService container .waitingFor(Wait.forLogMessage(".*Server.*is.*ready.*", 1)) .withClasspathResourceMapping("org/apache/camel/test/infra/nats/services", "/nats", BindMode.READ_ONLY) - .withCommand("--tls", + .withCommand("--jetstream", + "--tls", "--tlscert=/nats/server.pem", "--tlskey=/nats/key.pem", "--tlsverify",
