This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new eedb8c2f821 CAMEL-19417: camel-spring-rabbitmq - convert all message 
properties by default (#10325)
eedb8c2f821 is described below

commit eedb8c2f82171b5c67f95630856e7fd43ae59c3c
Author: Steven Dürrenmatt <[email protected]>
AuthorDate: Mon Jun 12 06:42:56 2023 +0200

    CAMEL-19417: camel-spring-rabbitmq - convert all message properties by 
default (#10325)
    
    * CAMEL-19417: camel-spring-rabbitmq - convert all message properties by 
default
    
    * CAMEL-19417: PR review
    - replace exchange.getMessage() with local variable
    - use labels in metadata
    - use Map.of instead of ImmutableMap.Builder
    
    * CAMEL-19417: PR review
    - do not remove headers (header filter strategy already applies)
    
    * CAMEL-19417: PR review
    - use getHeader(String name, Class<T> type)
    
    * CAMEL-19417: Reformat code
---
 .../DefaultMessagePropertiesConverter.java         | 121 +++++++++++++++++++--
 .../springrabbit/SpringRabbitMQConstants.java      |  44 +++++++-
 .../integration/RabbitMQConsumerQueuesIT.java      |  35 +++++-
 .../integration/RabbitMQProducerIT.java            |  17 ++-
 4 files changed, 199 insertions(+), 18 deletions(-)

diff --git 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/DefaultMessagePropertiesConverter.java
 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/DefaultMessagePropertiesConverter.java
index 8f72aa5d912..87343d20820 100644
--- 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/DefaultMessagePropertiesConverter.java
+++ 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/DefaultMessagePropertiesConverter.java
@@ -16,14 +16,16 @@
  */
 package org.apache.camel.component.springrabbit;
 
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.spi.HeaderFilterStrategy;
-import org.apache.camel.support.ExchangeHelper;
+import org.springframework.amqp.core.MessageDeliveryMode;
 import org.springframework.amqp.core.MessageProperties;
 
 public class DefaultMessagePropertiesConverter implements 
MessagePropertiesConverter {
@@ -39,12 +41,66 @@ public class DefaultMessagePropertiesConverter implements 
MessagePropertiesConve
     @Override
     public MessageProperties toMessageProperties(Exchange exchange) {
         MessageProperties answer = new MessageProperties();
-        String contentType = ExchangeHelper.getContentType(exchange);
+        Message message = exchange.getMessage();
+
+        MessageDeliveryMode deliveryMode = 
message.getHeader(SpringRabbitMQConstants.DELIVERY_MODE, 
MessageDeliveryMode.class);
+        if (deliveryMode != null) {
+            answer.setDeliveryMode(deliveryMode);
+        }
+        String type = message.getHeader(SpringRabbitMQConstants.TYPE, 
String.class);
+        if (type != null) {
+            answer.setType(type);
+        }
+        String contentType = 
message.getHeader(SpringRabbitMQConstants.CONTENT_TYPE, String.class);
         if (contentType != null) {
             answer.setContentType(contentType);
         }
+        Long contentLength = 
message.getHeader(SpringRabbitMQConstants.CONTENT_LENGTH, Long.class);
+        if (contentLength != null) {
+            answer.setContentLength(contentLength);
+        }
+        String contentEncoding = 
message.getHeader(SpringRabbitMQConstants.CONTENT_ENCODING, String.class);
+        if (contentEncoding != null) {
+            answer.setContentEncoding(contentEncoding);
+        }
+        String messageId = 
message.getHeader(SpringRabbitMQConstants.MESSAGE_ID, String.class);
+        if (messageId != null) {
+            answer.setMessageId(messageId);
+        }
+        String correlationId = 
message.getHeader(SpringRabbitMQConstants.CORRELATION_ID, String.class);
+        if (correlationId != null) {
+            answer.setCorrelationId(correlationId);
+        }
+        String replyTo = message.getHeader(SpringRabbitMQConstants.REPLY_TO, 
String.class);
+        if (replyTo != null) {
+            answer.setReplyTo(replyTo);
+        }
+        String expiration = 
message.getHeader(SpringRabbitMQConstants.EXPIRATION, String.class);
+        if (expiration != null) {
+            answer.setExpiration(expiration);
+        }
+        Date timestamp = message.getHeader(SpringRabbitMQConstants.TIMESTAMP, 
Date.class);
+        if (timestamp != null) {
+            answer.setTimestamp(timestamp);
+        }
+        String userId = message.getHeader(SpringRabbitMQConstants.USER_ID, 
String.class);
+        if (userId != null) {
+            answer.setUserId(userId);
+        }
+        String appId = message.getHeader(SpringRabbitMQConstants.APP_ID, 
String.class);
+        if (appId != null) {
+            answer.setAppId(appId);
+        }
+        Integer priority = message.getHeader(SpringRabbitMQConstants.PRIORITY, 
Integer.class);
+        if (priority != null) {
+            answer.setPriority(priority);
+        }
+        String clusterId = 
message.getHeader(SpringRabbitMQConstants.CLUSTER_ID, String.class);
+        if (clusterId != null) {
+            answer.setClusterId(clusterId);
+        }
 
-        Set<Map.Entry<String, Object>> entries = 
exchange.getMessage().getHeaders().entrySet();
+        Set<Map.Entry<String, Object>> entries = 
message.getHeaders().entrySet();
         for (Map.Entry<String, Object> entry : entries) {
             String headerName = entry.getKey();
             Object headerValue = entry.getValue();
@@ -65,15 +121,61 @@ public class DefaultMessagePropertiesConverter implements 
MessagePropertiesConve
                 Object headerValue = entry.getValue();
                 appendInputHeader(answer, headerName, headerValue, exchange);
             }
+
+            if (messageProperties.getRedelivered() != null) {
+                answer.put(SpringRabbitMQConstants.REDELIVERED, 
messageProperties.getRedelivered());
+            }
+            if (messageProperties.getDeliveryTag() > 0) {
+                answer.put(SpringRabbitMQConstants.DELIVERY_TAG, 
messageProperties.getDeliveryTag());
+            }
+            if (messageProperties.getReceivedExchange() != null) {
+                answer.put(SpringRabbitMQConstants.EXCHANGE_NAME, 
messageProperties.getReceivedExchange());
+            }
+            if (messageProperties.getReceivedRoutingKey() != null) {
+                answer.put(SpringRabbitMQConstants.ROUTING_KEY, 
messageProperties.getReceivedRoutingKey());
+            }
+            if (messageProperties.getReceivedDeliveryMode() != null) {
+                answer.put(SpringRabbitMQConstants.DELIVERY_MODE, 
messageProperties.getReceivedDeliveryMode());
+            }
+            if (messageProperties.getType() != null) {
+                answer.put(SpringRabbitMQConstants.TYPE, 
messageProperties.getType());
+            }
             if (messageProperties.getContentType() != null) {
-                answer.put(Exchange.CONTENT_TYPE, 
messageProperties.getContentType());
+                answer.put(SpringRabbitMQConstants.CONTENT_TYPE, 
messageProperties.getContentType());
+            }
+            if (messageProperties.getContentLength() > 0) {
+                answer.put(SpringRabbitMQConstants.CONTENT_LENGTH, 
messageProperties.getContentLength());
+            }
+            if (messageProperties.getContentEncoding() != null) {
+                answer.put(SpringRabbitMQConstants.CONTENT_ENCODING, 
messageProperties.getContentEncoding());
+            }
+            if (messageProperties.getMessageId() != null) {
+                answer.put(SpringRabbitMQConstants.MESSAGE_ID, 
messageProperties.getMessageId());
+            }
+            if (messageProperties.getCorrelationId() != null) {
+                answer.put(SpringRabbitMQConstants.CORRELATION_ID, 
messageProperties.getCorrelationId());
+            }
+            if (messageProperties.getReplyTo() != null) {
+                answer.put(SpringRabbitMQConstants.REPLY_TO, 
messageProperties.getReplyTo());
+            }
+            if (messageProperties.getExpiration() != null) {
+                answer.put(SpringRabbitMQConstants.EXPIRATION, 
messageProperties.getExpiration());
             }
             if (messageProperties.getTimestamp() != null) {
-                answer.put(Exchange.MESSAGE_TIMESTAMP, 
messageProperties.getTimestamp().getTime());
+                answer.put(SpringRabbitMQConstants.TIMESTAMP, 
messageProperties.getTimestamp());
+            }
+            if (messageProperties.getReceivedUserId() != null) {
+                answer.put(SpringRabbitMQConstants.USER_ID, 
messageProperties.getReceivedUserId());
+            }
+            if (messageProperties.getAppId() != null) {
+                answer.put(SpringRabbitMQConstants.APP_ID, 
messageProperties.getAppId());
+            }
+            if (messageProperties.getPriority() != null) {
+                answer.put(SpringRabbitMQConstants.PRIORITY, 
messageProperties.getPriority());
+            }
+            if (messageProperties.getClusterId() != null) {
+                answer.put(SpringRabbitMQConstants.CLUSTER_ID, 
messageProperties.getClusterId());
             }
-
-            // Helps in getting to acknowledge manually
-            answer.put(SpringRabbitMQConstants.DELIVERY_TAG, 
messageProperties.getDeliveryTag());
         }
 
         return answer;
@@ -86,7 +188,7 @@ public class DefaultMessagePropertiesConverter implements 
MessagePropertiesConve
     }
 
     private void appendInputHeader(Map<String, Object> answer, String 
headerName, Object headerValue, Exchange ex) {
-        if (shouldOutputHeader(headerName, headerValue, ex)) {
+        if (shouldInputHeader(headerName, headerValue, ex)) {
             answer.put(headerName, headerValue);
         }
     }
@@ -106,5 +208,4 @@ public class DefaultMessagePropertiesConverter implements 
MessagePropertiesConve
         return headerFilterStrategy == null
                 || 
!headerFilterStrategy.applyFilterToExternalHeaders(headerName, headerValue, 
exchange);
     }
-
 }
diff --git 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQConstants.java
 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQConstants.java
index 4518c39c3e8..865951c3dad 100644
--- 
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQConstants.java
+++ 
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQConstants.java
@@ -23,12 +23,50 @@ public final class SpringRabbitMQConstants {
     public static final String DEFAULT_EXCHANGE_NAME = "default";
 
     public static final String CHANNEL = "CamelSpringRabbitmqChannel";
-    @Metadata(description = "The exchange key.", javaType = "String")
+    @Metadata(description = "To override the endpoint configuration's routing 
key.", javaType = "String", label = "producer")
     public static final String ROUTING_OVERRIDE_KEY = 
"CamelSpringRabbitmqRoutingOverrideKey";
-    @Metadata(description = "The exchange name.", javaType = "String")
+    @Metadata(description = "To override the endpoint configuration's exchange 
name.", javaType = "String", label = "producer")
     public static final String EXCHANGE_OVERRIDE_NAME = 
"CamelSpringRabbitmqExchangeOverrideName";
-    @Metadata(description = "Delivery tag for manual acknowledge mode", 
javaType = "long")
+    @Metadata(description = "Whether the message was previously delivered and 
requeued.", javaType = "Boolean",
+              label = "consumer")
+    public static final String REDELIVERED = "CamelSpringRabbitmqRedelivered";
+    @Metadata(description = "Delivery tag for manual acknowledge mode.", 
javaType = "long", label = "consumer")
     public static final String DELIVERY_TAG = "CamelSpringRabbitmqDeliveryTag";
+    @Metadata(description = "The exchange name that was used when publishing 
the message.", javaType = "String",
+              label = "consumer")
+    public static final String EXCHANGE_NAME = 
"CamelSpringRabbitmqExchangeName";
+    @Metadata(description = "The routing key that was used when publishing the 
message.", javaType = "String",
+              label = "consumer")
+    public static final String ROUTING_KEY = "CamelSpringRabbitmqRoutingKey";
+    @Metadata(description = "The message delivery mode.", javaType = 
"MessageDeliveryMode", enums = "NON_PERSISTENT,PERSISTENT")
+    public static final String DELIVERY_MODE = 
"CamelSpringRabbitmqDeliveryMode";
+    @Metadata(description = "Application-specific message type.", javaType = 
"String")
+    public static final String TYPE = "CamelSpringRabbitmqType";
+    @Metadata(description = "The message content type.", javaType = "String")
+    public static final String CONTENT_TYPE = "CamelSpringRabbitmqContentType";
+    @Metadata(description = "The message content length.", javaType = "long")
+    public static final String CONTENT_LENGTH = 
"CamelSpringRabbitmqContentLength";
+    @Metadata(description = "Content encoding used by applications.", javaType 
= "String")
+    public static final String CONTENT_ENCODING = 
"CamelSpringRabbitmqContentEncoding";
+    @Metadata(description = "Arbitrary message id.", javaType = "String")
+    public static final String MESSAGE_ID = "CamelSpringRabbitmqMessageId";
+    @Metadata(description = "Identifier to correlate RPC responses with 
requests.", javaType = "String")
+    public static final String CORRELATION_ID = 
"CamelSpringRabbitmqCorrelationId";
+    @Metadata(description = "Commonly used to name a callback queue.", 
javaType = "String")
+    public static final String REPLY_TO = "CamelSpringRabbitmqReplyTo";
+    @Metadata(description = "Per-message TTL.", javaType = "String")
+    public static final String EXPIRATION = "CamelSpringRabbitmqExpiration";
+    @Metadata(description = "Application-provided timestamp.", javaType = 
"Date")
+    public static final String TIMESTAMP = "CamelSpringRabbitmqTimestamp";
+    @Metadata(description = "Validated user id.", javaType = "String")
+    public static final String USER_ID = "CamelSpringRabbitmqUserId";
+    @Metadata(description = "The application name.", javaType = "String")
+    public static final String APP_ID = "CamelSpringRabbitmqAppId";
+    @Metadata(description = "The message priority.", javaType = "Integer")
+    public static final String PRIORITY = "CamelSpringRabbitmqPriority";
+    @Metadata(description = "The cluster id.", javaType = "String")
+    public static final String CLUSTER_ID = "CamelSpringRabbitmqClusterId";
+
     public static final String DIRECT_MESSAGE_LISTENER_CONTAINER = "DMLC";
     public static final String SIMPLE_MESSAGE_LISTENER_CONTAINER = "SMLC";
 
diff --git 
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQConsumerQueuesIT.java
 
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQConsumerQueuesIT.java
index 21c5f718cd3..4cfa47e1095 100644
--- 
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQConsumerQueuesIT.java
+++ 
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQConsumerQueuesIT.java
@@ -18,13 +18,14 @@ package org.apache.camel.component.springrabbit.integration;
 
 import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.Exchange;
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.springrabbit.SpringRabbitMQConstants;
 import org.junit.jupiter.api.Test;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.core.MessageBuilder;
+import org.springframework.amqp.core.MessageDeliveryMode;
 import org.springframework.amqp.core.MessageProperties;
 import org.springframework.amqp.core.MessagePropertiesBuilder;
 
@@ -62,7 +63,37 @@ public class RabbitMQConsumerQueuesIT extends 
RabbitMQITSupport {
 
         getMockEndpoint("mock:result").expectedBodiesReceived("foo");
         getMockEndpoint("mock:result").expectedHeaderReceived("bar", "baz");
-        
getMockEndpoint("mock:result").expectedHeaderReceived(Exchange.CONTENT_TYPE, 
MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
+        
getMockEndpoint("mock:result").expectedHeaderReceived(SpringRabbitMQConstants.CONTENT_TYPE,
+                MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
+
+        template.sendBody("direct:start", body);
+
+        MockEndpoint.assertIsSatisfied(context, 30, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testConsumerWithMessageProperties() throws Exception {
+        MessageProperties props = MessagePropertiesBuilder.newInstance()
+                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
+                .setType("price")
+                .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
+                .setMessageId("123")
+                .setPriority(1)
+                .setHeader("bar", "baz")
+                .build();
+        Message body = MessageBuilder.withBody("foo".getBytes())
+                .andProperties(props)
+                .build();
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("foo");
+        getMockEndpoint("mock:result").expectedHeaderReceived("bar", "baz");
+        
getMockEndpoint("mock:result").expectedHeaderReceived(SpringRabbitMQConstants.DELIVERY_MODE,
+                MessageDeliveryMode.PERSISTENT);
+        
getMockEndpoint("mock:result").expectedHeaderReceived(SpringRabbitMQConstants.TYPE,
 "price");
+        
getMockEndpoint("mock:result").expectedHeaderReceived(SpringRabbitMQConstants.CONTENT_TYPE,
+                MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
+        
getMockEndpoint("mock:result").expectedHeaderReceived(SpringRabbitMQConstants.MESSAGE_ID,
 "123");
+        
getMockEndpoint("mock:result").expectedHeaderReceived(SpringRabbitMQConstants.PRIORITY,
 1);
 
         template.sendBody("direct:start", body);
 
diff --git 
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerIT.java
 
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerIT.java
index e8b3f41fdf8..878e95d20e9 100644
--- 
a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerIT.java
+++ 
b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQProducerIT.java
@@ -17,10 +17,11 @@
 package org.apache.camel.component.springrabbit.integration;
 
 import java.nio.charset.Charset;
+import java.util.Map;
 
-import org.apache.camel.Exchange;
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.springrabbit.SpringRabbitMQConstants;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.springframework.amqp.core.AmqpAdmin;
@@ -28,6 +29,7 @@ import org.springframework.amqp.core.AmqpTemplate;
 import org.springframework.amqp.core.BindingBuilder;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.core.MessageBuilder;
+import org.springframework.amqp.core.MessageDeliveryMode;
 import org.springframework.amqp.core.MessageProperties;
 import org.springframework.amqp.core.MessagePropertiesBuilder;
 import org.springframework.amqp.core.Queue;
@@ -107,7 +109,7 @@ public class RabbitMQProducerIT extends RabbitMQITSupport {
     }
 
     @Test
-    public void testProducerContentType() throws Exception {
+    public void testProducerWithMessageProperties() throws Exception {
         ConnectionFactory cf = 
context.getRegistry().lookupByNameAndType("myCF", ConnectionFactory.class);
 
         Queue q = new Queue("myqueue");
@@ -118,7 +120,12 @@ public class RabbitMQProducerIT extends RabbitMQITSupport {
         admin.declareExchange(t);
         admin.declareBinding(BindingBuilder.bind(q).to(t).with("foo.bar.#"));
 
-        template.sendBodyAndHeader("direct:start", "<price>123</price>", 
Exchange.CONTENT_TYPE, "application/xml");
+        template.sendBodyAndHeaders("direct:start", "<price>123</price>",
+                Map.of(SpringRabbitMQConstants.DELIVERY_MODE, 
MessageDeliveryMode.PERSISTENT,
+                        SpringRabbitMQConstants.TYPE, "price",
+                        SpringRabbitMQConstants.CONTENT_TYPE, 
"application/xml",
+                        SpringRabbitMQConstants.MESSAGE_ID, 
"0fe9c142-f9c1-426f-9237-f5a4c988a8ae",
+                        SpringRabbitMQConstants.PRIORITY, 1));
 
         AmqpTemplate template = new RabbitTemplate(cf);
         Message out = template.receive("myqueue");
@@ -126,7 +133,11 @@ public class RabbitMQProducerIT extends RabbitMQITSupport {
         String encoding = out.getMessageProperties().getContentEncoding();
         Assertions.assertEquals(Charset.defaultCharset().name(), encoding);
         Assertions.assertEquals("<price>123</price>", new 
String(out.getBody(), encoding));
+        Assertions.assertEquals(MessageDeliveryMode.PERSISTENT, 
out.getMessageProperties().getReceivedDeliveryMode());
+        Assertions.assertEquals("price", out.getMessageProperties().getType());
         Assertions.assertEquals("application/xml", 
out.getMessageProperties().getContentType());
+        Assertions.assertEquals("0fe9c142-f9c1-426f-9237-f5a4c988a8ae", 
out.getMessageProperties().getMessageId());
+        Assertions.assertEquals(1, out.getMessageProperties().getPriority());
         Assertions.assertEquals(0, 
out.getMessageProperties().getHeaders().size());
     }
 

Reply via email to