This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 0f9f1f0 CAMEL-15639: Add requeue option to camel-rabbitmq consumer to
use to control requeue behaviour. Thanks to Ayyanar for patch.
0f9f1f0 is described below
commit 0f9f1f0bd9deb42c6f2a454a56684e3a8c690090
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Oct 8 18:54:03 2020 +0200
CAMEL-15639: Add requeue option to camel-rabbitmq consumer to use to
control requeue behaviour. Thanks to Ayyanar for patch.
---
.../camel/catalog/docs/rabbitmq-component.adoc | 3 +-
.../rabbitmq/RabbitMQEndpointConfigurer.java | 5 +++
.../rabbitmq/RabbitMQEndpointUriFactory.java | 3 +-
.../apache/camel/component/rabbitmq/rabbitmq.json | 1 +
.../src/main/docs/rabbitmq-component.adoc | 3 +-
.../camel/component/rabbitmq/RabbitConsumer.java | 7 ++--
.../camel/component/rabbitmq/RabbitMQEndpoint.java | 18 +++++++++-
.../dsl/RabbitMQEndpointBuilderFactory.java | 38 ++++++++++++++++++++++
.../modules/ROOT/pages/rabbitmq-component.adoc | 3 +-
9 files changed, 73 insertions(+), 8 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc
index 48ffda6..6bf4caa 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/rabbitmq-component.adoc
@@ -142,7 +142,7 @@ with the following path and query parameters:
|===
-=== Query Parameters (66 parameters):
+=== Query Parameters (67 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -179,6 +179,7 @@ with the following path and query parameters:
| *prefetchEnabled* (consumer) | Enables the quality of service on the
RabbitMQConsumer side. You need to specify the option of prefetchSize,
prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchGlobal* (consumer) | If the settings should be applied to the
entire channel rather than each consumer You need to specify the option of
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchSize* (consumer) | The maximum amount of content (measured in
octets) that the server will deliver, 0 if unlimited. You need to specify the
option of prefetchSize, prefetchCount, prefetchGlobal at the same time | | int
+| *reQueue* (consumer) | This is used by the consumer to control rejection of
the message. When the consumer is complete processing the exchange, and if the
exchange failed, then the consumer is going to reject the message from the
RabbitMQ broker. If the header CamelRabbitmqRequeue is present then the value
of the header will be used, otherwise this endpoint value is used as fallback.
If the value is false (by default) then the message is discarded/dead-lettered.
If the value is true, t [...]
| *exceptionHandler* (consumer) | To let the consumer use a custom
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this
option is not in use. By default the consumer will deal with exceptions, that
will be logged at WARN or ERROR level and ignored. | | ExceptionHandler
| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer
creates an exchange. There are 3 enums and the value can be one of: InOnly,
InOut, InOptionalOut | | ExchangePattern
| *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with
a fixed number of threads. This setting allows you to set that number of
threads. | 10 | int
diff --git
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java
index b92cc95..513dee0 100644
---
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java
+++
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointConfigurer.java
@@ -50,6 +50,7 @@ public class RabbitMQEndpointConfigurer extends
PropertyConfigurerSupport implem
map.put("prefetchEnabled", boolean.class);
map.put("prefetchGlobal", boolean.class);
map.put("prefetchSize", int.class);
+ map.put("reQueue", boolean.class);
map.put("exceptionHandler",
org.apache.camel.spi.ExceptionHandler.class);
map.put("exchangePattern", org.apache.camel.ExchangePattern.class);
map.put("threadPoolSize", int.class);
@@ -177,6 +178,8 @@ public class RabbitMQEndpointConfigurer extends
PropertyConfigurerSupport implem
case "publisheracknowledgementstimeout":
case "publisherAcknowledgementsTimeout":
target.setPublisherAcknowledgementsTimeout(property(camelContext, long.class,
value)); return true;
case "queue": target.setQueue(property(camelContext,
java.lang.String.class, value)); return true;
+ case "requeue":
+ case "reQueue": target.setReQueue(property(camelContext,
boolean.class, value)); return true;
case "requesttimeout":
case "requestTimeout": target.setRequestTimeout(property(camelContext,
long.class, value)); return true;
case "requesttimeoutcheckerinterval":
@@ -308,6 +311,8 @@ public class RabbitMQEndpointConfigurer extends
PropertyConfigurerSupport implem
case "publisheracknowledgementstimeout":
case "publisherAcknowledgementsTimeout": return
target.getPublisherAcknowledgementsTimeout();
case "queue": return target.getQueue();
+ case "requeue":
+ case "reQueue": return target.isReQueue();
case "requesttimeout":
case "requestTimeout": return target.getRequestTimeout();
case "requesttimeoutcheckerinterval":
diff --git
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java
index 6751325..4b90572 100644
---
a/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java
+++
b/components/camel-rabbitmq/src/generated/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointUriFactory.java
@@ -18,7 +18,7 @@ public class RabbitMQEndpointUriFactory extends
org.apache.camel.support.compone
private static final Set<String> PROPERTY_NAMES;
static {
- Set<String> set = new HashSet<>(67);
+ Set<String> set = new HashSet<>(68);
set.add("exchangeName");
set.add("addresses");
set.add("autoDelete");
@@ -51,6 +51,7 @@ public class RabbitMQEndpointUriFactory extends
org.apache.camel.support.compone
set.add("prefetchEnabled");
set.add("prefetchGlobal");
set.add("prefetchSize");
+ set.add("reQueue");
set.add("exceptionHandler");
set.add("exchangePattern");
set.add("threadPoolSize");
diff --git
a/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json
b/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json
index 3a028c1..a8fdaa6 100644
---
a/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json
+++
b/components/camel-rabbitmq/src/generated/resources/org/apache/camel/component/rabbitmq/rabbitmq.json
@@ -111,6 +111,7 @@
"prefetchEnabled": { "kind": "parameter", "displayName": "Prefetch
Enabled", "group": "consumer", "label": "consumer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "secret": false,
"defaultValue": false, "description": "Enables the quality of service on the
RabbitMQConsumer side. You need to specify the option of prefetchSize,
prefetchCount, prefetchGlobal at the same time" },
"prefetchGlobal": { "kind": "parameter", "displayName": "Prefetch Global",
"group": "consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "secret": false, "defaultValue":
false, "description": "If the settings should be applied to the entire channel
rather than each consumer You need to specify the option of prefetchSize,
prefetchCount, prefetchGlobal at the same time" },
"prefetchSize": { "kind": "parameter", "displayName": "Prefetch Size",
"group": "consumer", "label": "consumer", "required": false, "type": "integer",
"javaType": "int", "deprecated": false, "secret": false, "description": "The
maximum amount of content (measured in octets) that the server will deliver, 0
if unlimited. You need to specify the option of prefetchSize, prefetchCount,
prefetchGlobal at the same time" },
+ "reQueue": { "kind": "parameter", "displayName": "Re Queue", "group":
"consumer", "label": "consumer", "required": false, "type": "boolean",
"javaType": "boolean", "deprecated": false, "secret": false, "defaultValue":
false, "description": "This is used by the consumer to control rejection of the
message. When the consumer is complete processing the exchange, and if the
exchange failed, then the consumer is going to reject the message from the
RabbitMQ broker. If the header CamelRabb [...]
"exceptionHandler": { "kind": "parameter", "displayName": "Exception
Handler", "group": "consumer (advanced)", "label": "consumer,advanced",
"required": false, "type": "object", "javaType":
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.",
"deprecated": false, "secret": false, "description": "To let the consumer use a
custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled
then this option is not in use. By default the consumer will deal with [...]
"exchangePattern": { "kind": "parameter", "displayName": "Exchange
Pattern", "group": "consumer (advanced)", "label": "consumer,advanced",
"required": false, "type": "object", "javaType":
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut",
"InOptionalOut" ], "deprecated": false, "secret": false, "description": "Sets
the exchange pattern when the consumer creates an exchange." },
"threadPoolSize": { "kind": "parameter", "displayName": "Thread Pool
Size", "group": "consumer (advanced)", "label": "consumer,advanced",
"required": false, "type": "integer", "javaType": "int", "deprecated": false,
"secret": false, "defaultValue": "10", "description": "The consumer uses a
Thread Pool Executor with a fixed number of threads. This setting allows you to
set that number of threads." },
diff --git a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
index 48ffda6..6bf4caa 100644
--- a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
+++ b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
@@ -142,7 +142,7 @@ with the following path and query parameters:
|===
-=== Query Parameters (66 parameters):
+=== Query Parameters (67 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -179,6 +179,7 @@ with the following path and query parameters:
| *prefetchEnabled* (consumer) | Enables the quality of service on the
RabbitMQConsumer side. You need to specify the option of prefetchSize,
prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchGlobal* (consumer) | If the settings should be applied to the
entire channel rather than each consumer You need to specify the option of
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchSize* (consumer) | The maximum amount of content (measured in
octets) that the server will deliver, 0 if unlimited. You need to specify the
option of prefetchSize, prefetchCount, prefetchGlobal at the same time | | int
+| *reQueue* (consumer) | This is used by the consumer to control rejection of
the message. When the consumer is complete processing the exchange, and if the
exchange failed, then the consumer is going to reject the message from the
RabbitMQ broker. If the header CamelRabbitmqRequeue is present then the value
of the header will be used, otherwise this endpoint value is used as fallback.
If the value is false (by default) then the message is discarded/dead-lettered.
If the value is true, t [...]
| *exceptionHandler* (consumer) | To let the consumer use a custom
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this
option is not in use. By default the consumer will deal with exceptions, that
will be logged at WARN or ERROR level and ignored. | | ExceptionHandler
| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer
creates an exchange. There are 3 enums and the value can be one of: InOnly,
InOut, InOptionalOut | | ExchangePattern
| *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with
a fixed number of threads. This setting allows you to set that number of
threads. | 10 | int
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index ccfce12..f1bd416 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -42,7 +42,6 @@ class RabbitConsumer extends ServiceSupport implements
com.rabbitmq.client.Consu
private final RabbitMQConsumer consumer;
private Channel channel;
private String tag;
- /** Consumer tag for this consumer. */
private volatile String consumerTag;
private volatile boolean stopping;
@@ -177,9 +176,11 @@ class RabbitConsumer extends ServiceSupport implements
com.rabbitmq.client.Consu
channel.basicAck(deliveryTag, false);
}
} else {
- boolean isRequeueHeaderSet = false;
+ boolean isRequeueHeaderSet =
consumer.getEndpoint().isReQueue();
try {
- isRequeueHeaderSet =
msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class);
+ isRequeueHeaderSet =
msg.getHeader(RabbitMQConstants.REQUEUE, isRequeueHeaderSet, boolean.class);
+ LOG.trace("Consumer requeue property is overridden using
the message header requeue property as: {}",
+ isRequeueHeaderSet);
} catch (Exception e) {
// ignore as its an invalid header
}
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 5ff131d..7f865de 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -184,7 +184,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint
implements AsyncEndpoint {
private ExceptionHandler connectionFactoryExceptionHandler;
@UriParam(label = "allowMessageBodySerialization", defaultValue = "false")
private boolean allowMessageBodySerialization;
-
+ @UriParam(label = "consumer")
+ private boolean reQueue;
// camel-jms supports this setting but it is not currently configurable in
// camel-rabbitmq
private boolean useMessageIDAsCorrelationID = true;
@@ -1038,4 +1039,19 @@ public class RabbitMQEndpoint extends DefaultEndpoint
implements AsyncEndpoint {
public void setConnectionFactoryExceptionHandler(ExceptionHandler
connectionFactoryExceptionHandler) {
this.connectionFactoryExceptionHandler =
connectionFactoryExceptionHandler;
}
+
+ /**
+ * This is used by the consumer to control rejection of the message. When
the consumer is complete processing the
+ * exchange, and if the exchange failed, then the consumer is going to
reject the message from the RabbitMQ broker.
+ * If the header CamelRabbitmqRequeue is present then the value of the
header will be used, otherwise this endpoint
+ * value is used as fallback. If the value is false (by default) then the
message is discarded/dead-lettered. If the
+ * value is true, then the message is re-queued.
+ */
+ public boolean isReQueue() {
+ return reQueue;
+ }
+
+ public void setReQueue(boolean reQueue) {
+ this.reQueue = reQueue;
+ }
}
diff --git
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java
index 088a97c..7c17f4d 100644
---
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java
+++
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/RabbitMQEndpointBuilderFactory.java
@@ -743,6 +743,44 @@ public interface RabbitMQEndpointBuilderFactory {
return this;
}
/**
+ * This is used by the consumer to control rejection of the message.
+ * When the consumer is complete processing the exchange, and if the
+ * exchange failed, then the consumer is going to reject the message
+ * from the RabbitMQ broker. If the header CamelRabbitmqRequeue is
+ * present then the value of the header will be used, otherwise this
+ * endpoint value is used as fallback. If the value is false (by
+ * default) then the message is discarded/dead-lettered. If the value
is
+ * true, then the message is re-queued.
+ *
+ * The option is a: <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: consumer
+ */
+ default RabbitMQEndpointConsumerBuilder reQueue(boolean reQueue) {
+ doSetProperty("reQueue", reQueue);
+ return this;
+ }
+ /**
+ * This is used by the consumer to control rejection of the message.
+ * When the consumer is complete processing the exchange, and if the
+ * exchange failed, then the consumer is going to reject the message
+ * from the RabbitMQ broker. If the header CamelRabbitmqRequeue is
+ * present then the value of the header will be used, otherwise this
+ * endpoint value is used as fallback. If the value is false (by
+ * default) then the message is discarded/dead-lettered. If the value
is
+ * true, then the message is re-queued.
+ *
+ * The option will be converted to a <code>boolean</code> type.
+ *
+ * Default: false
+ * Group: consumer
+ */
+ default RabbitMQEndpointConsumerBuilder reQueue(String reQueue) {
+ doSetProperty("reQueue", reQueue);
+ return this;
+ }
+ /**
* Whether to allow Java serialization of the message body or not. If
* this value is true, the message body will be serialized on the
* producer side using Java serialization, if no type converter can
diff --git a/docs/components/modules/ROOT/pages/rabbitmq-component.adoc
b/docs/components/modules/ROOT/pages/rabbitmq-component.adoc
index f4d9111..1cf378a 100644
--- a/docs/components/modules/ROOT/pages/rabbitmq-component.adoc
+++ b/docs/components/modules/ROOT/pages/rabbitmq-component.adoc
@@ -144,7 +144,7 @@ with the following path and query parameters:
|===
-=== Query Parameters (66 parameters):
+=== Query Parameters (67 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -181,6 +181,7 @@ with the following path and query parameters:
| *prefetchEnabled* (consumer) | Enables the quality of service on the
RabbitMQConsumer side. You need to specify the option of prefetchSize,
prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchGlobal* (consumer) | If the settings should be applied to the
entire channel rather than each consumer You need to specify the option of
prefetchSize, prefetchCount, prefetchGlobal at the same time | false | boolean
| *prefetchSize* (consumer) | The maximum amount of content (measured in
octets) that the server will deliver, 0 if unlimited. You need to specify the
option of prefetchSize, prefetchCount, prefetchGlobal at the same time | | int
+| *reQueue* (consumer) | This is used by the consumer to control rejection of
the message. When the consumer is complete processing the exchange, and if the
exchange failed, then the consumer is going to reject the message from the
RabbitMQ broker. If the header CamelRabbitmqRequeue is present then the value
of the header will be used, otherwise this endpoint value is used as fallback.
If the value is false (by default) then the message is discarded/dead-lettered.
If the value is true, t [...]
| *exceptionHandler* (consumer) | To let the consumer use a custom
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this
option is not in use. By default the consumer will deal with exceptions, that
will be logged at WARN or ERROR level and ignored. | | ExceptionHandler
| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer
creates an exchange. There are 3 enums and the value can be one of: InOnly,
InOut, InOptionalOut | | ExchangePattern
| *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with
a fixed number of threads. This setting allows you to set that number of
threads. | 10 | int