This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new ddd2e40921a CAMEL-20754: Support for MQTT 5 Publish Properties (#14175)
ddd2e40921a is described below
commit ddd2e40921afb570a1dd78c34bd495bfc2b83ac2
Author: Henning Sudbrock <[email protected]>
AuthorDate: Fri May 17 12:34:34 2024 +0200
CAMEL-20754: Support for MQTT 5 Publish Properties (#14175)
Adds a message header for MQTT 5 Publish Properties to the Paho MQTT 5
component. This enables support for the MQTT message headers introduced with
MQTT 5 (like, e.g., content-type, response-topic, message-expiry-interval, or
user properties; cf. section 3.3.2.3 of the MQTT 5 spec,
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html).
When consuming a message, the contents of the Camel message header are
taken from the properties of the incoming MQT message; when producing an MQTT
message, the properties are taken from the Camel message header if available.
---
.../camel/catalog/components/paho-mqtt5.json | 3 +-
.../camel/component/paho/mqtt5/paho-mqtt5.json | 3 +-
.../component/paho/mqtt5/PahoMqtt5Constants.java | 5 ++++
.../component/paho/mqtt5/PahoMqtt5Consumer.java | 1 +
.../component/paho/mqtt5/PahoMqtt5Producer.java | 4 +++
.../integration/PahoMqtt5ComponentMqtt5IT.java | 34 ++++++++++++++++++++++
.../dsl/PahoMqtt5EndpointBuilderFactory.java | 14 +++++++++
7 files changed, 62 insertions(+), 2 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/paho-mqtt5.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/paho-mqtt5.json
index 54161f7f611..1038353b6a3 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/paho-mqtt5.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/paho-mqtt5.json
@@ -61,7 +61,8 @@
"CamelMqttQoS": { "index": 1, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The quality of service of the incoming
message.", "constantName":
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#MQTT_QOS" },
"CamelPahoMqtt5Qos": { "index": 2, "kind": "header", "displayName": "",
"group": "producer", "label": "producer", "required": false, "javaType":
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The client quality of service level (0-2).",
"constantName":
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_MSG_QOS" },
"CamelPahoMqtt5Retained": { "index": 3, "kind": "header", "displayName":
"", "group": "producer", "label": "producer", "required": false, "javaType":
"Boolean", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Retain option.", "constantName":
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_MSG_RETAINED"
},
- "CamelPahoMqtt5OverrideTopic": { "index": 4, "kind": "header",
"displayName": "", "group": "producer", "label": "producer", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The name of topic to override and send
to instead of topic specified on endpoint.", "constantName":
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_OVERRIDE_TOPIC"
}
+ "CamelPahoMqtt5OverrideTopic": { "index": 4, "kind": "header",
"displayName": "", "group": "producer", "label": "producer", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The name of topic to override and send
to instead of topic specified on endpoint.", "constantName":
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_OVERRIDE_TOPIC"
},
+ "CamelPahoMqtt5MsgProperties": { "index": 5, "kind": "header",
"displayName": "", "group": "producer", "label": "consumer,producer",
"required": false, "javaType":
"org.eclipse.paho.mqttv5.common.packet.MqttProperties", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "description":
"Consumer: The properties set on the incoming message. Producer: The properties
to be set on the outgoing message.", "constantName":
"org.apache.camel.component.paho.mqtt5. [...]
},
"properties": {
"topic": { "index": 0, "kind": "path", "displayName": "Topic", "group":
"common", "label": "", "required": true, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Name of the topic" },
diff --git
a/components/camel-paho-mqtt5/src/generated/resources/META-INF/org/apache/camel/component/paho/mqtt5/paho-mqtt5.json
b/components/camel-paho-mqtt5/src/generated/resources/META-INF/org/apache/camel/component/paho/mqtt5/paho-mqtt5.json
index 54161f7f611..1038353b6a3 100644
---
a/components/camel-paho-mqtt5/src/generated/resources/META-INF/org/apache/camel/component/paho/mqtt5/paho-mqtt5.json
+++
b/components/camel-paho-mqtt5/src/generated/resources/META-INF/org/apache/camel/component/paho/mqtt5/paho-mqtt5.json
@@ -61,7 +61,8 @@
"CamelMqttQoS": { "index": 1, "kind": "header", "displayName": "",
"group": "consumer", "label": "consumer", "required": false, "javaType":
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The quality of service of the incoming
message.", "constantName":
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#MQTT_QOS" },
"CamelPahoMqtt5Qos": { "index": 2, "kind": "header", "displayName": "",
"group": "producer", "label": "producer", "required": false, "javaType":
"Integer", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "The client quality of service level (0-2).",
"constantName":
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_MSG_QOS" },
"CamelPahoMqtt5Retained": { "index": 3, "kind": "header", "displayName":
"", "group": "producer", "label": "producer", "required": false, "javaType":
"Boolean", "deprecated": false, "deprecationNote": "", "autowired": false,
"secret": false, "description": "Retain option.", "constantName":
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_MSG_RETAINED"
},
- "CamelPahoMqtt5OverrideTopic": { "index": 4, "kind": "header",
"displayName": "", "group": "producer", "label": "producer", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The name of topic to override and send
to instead of topic specified on endpoint.", "constantName":
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_OVERRIDE_TOPIC"
}
+ "CamelPahoMqtt5OverrideTopic": { "index": 4, "kind": "header",
"displayName": "", "group": "producer", "label": "producer", "required": false,
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "The name of topic to override and send
to instead of topic specified on endpoint.", "constantName":
"org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants#CAMEL_PAHO_OVERRIDE_TOPIC"
},
+ "CamelPahoMqtt5MsgProperties": { "index": 5, "kind": "header",
"displayName": "", "group": "producer", "label": "consumer,producer",
"required": false, "javaType":
"org.eclipse.paho.mqttv5.common.packet.MqttProperties", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "description":
"Consumer: The properties set on the incoming message. Producer: The properties
to be set on the outgoing message.", "constantName":
"org.apache.camel.component.paho.mqtt5. [...]
},
"properties": {
"topic": { "index": 0, "kind": "path", "displayName": "Topic", "group":
"common", "label": "", "required": true, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "description": "Name of the topic" },
diff --git
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Constants.java
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Constants.java
index 3317fa364b1..4e95fe474de 100644
---
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Constants.java
+++
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Constants.java
@@ -47,6 +47,11 @@ public final class PahoMqtt5Constants {
javaType = "String")
public static final String CAMEL_PAHO_OVERRIDE_TOPIC = CAMEL_PAHO +
"OverrideTopic";
+ @Metadata(label = "consumer,producer",
+ description = "Consumer: The properties set on the incoming
message. Producer: The properties to be set on the outgoing message.",
+ javaType =
"org.eclipse.paho.mqttv5.common.packet.MqttProperties")
+ public static final String CAMEL_PAHO_MSG_PROPERTIES = CAMEL_PAHO +
"MsgProperties";
+
private PahoMqtt5Constants() {
}
}
diff --git
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
index 224c070134d..140736a7c25 100644
---
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
+++
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java
@@ -152,6 +152,7 @@ public class PahoMqtt5Consumer extends DefaultConsumer {
paho.setBody(mqttMessage.getPayload());
paho.setHeader(PahoMqtt5Constants.MQTT_TOPIC, topic);
paho.setHeader(PahoMqtt5Constants.MQTT_QOS, mqttMessage.getQos());
+ paho.setHeader(PahoMqtt5Constants.CAMEL_PAHO_MSG_PROPERTIES,
mqttMessage.getProperties());
exchange.setIn(paho);
return exchange;
diff --git
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Producer.java
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Producer.java
index 6b26d8ce612..9a407742893 100644
---
a/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Producer.java
+++
b/components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Producer.java
@@ -21,6 +21,7 @@ import org.apache.camel.support.DefaultProducer;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.common.MqttMessage;
+import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,11 +46,14 @@ public class PahoMqtt5Producer extends DefaultProducer {
getEndpoint().getConfiguration().getQos(), Integer.class);
boolean retained =
exchange.getIn().getHeader(PahoMqtt5Constants.CAMEL_PAHO_MSG_RETAINED,
getEndpoint().getConfiguration().isRetained(), Boolean.class);
+ MqttProperties properties
+ =
exchange.getIn().getHeader(PahoMqtt5Constants.CAMEL_PAHO_MSG_PROPERTIES,
MqttProperties.class);
byte[] payload = exchange.getIn().getBody(byte[].class);
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
message.setRetained(retained);
+ message.setProperties(properties);
LOG.debug("Publishing to topic: {}, qos: {}, retrained: {}", topic,
qos, retained);
client.publish(topic, message);
diff --git
a/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/integration/PahoMqtt5ComponentMqtt5IT.java
b/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/integration/PahoMqtt5ComponentMqtt5IT.java
index 3940d785951..bb8e87a1afd 100644
---
a/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/integration/PahoMqtt5ComponentMqtt5IT.java
+++
b/components/camel-paho-mqtt5/src/test/java/org/apache/camel/component/paho/mqtt5/integration/PahoMqtt5ComponentMqtt5IT.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.paho.mqtt5.integration;
import java.nio.charset.StandardCharsets;
+import java.util.List;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
@@ -28,9 +29,12 @@ import
org.apache.camel.component.paho.mqtt5.PahoMqtt5Endpoint;
import org.apache.camel.component.paho.mqtt5.PahoMqtt5Message;
import org.apache.camel.component.paho.mqtt5.PahoMqtt5Persistence;
import org.eclipse.paho.mqttv5.common.MqttMessage;
+import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
+import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import static
org.apache.camel.component.paho.mqtt5.PahoMqtt5Constants.CAMEL_PAHO_MSG_PROPERTIES;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -114,6 +118,28 @@ public class PahoMqtt5ComponentMqtt5IT extends
PahoMqtt5ITSupport {
mock.assertIsSatisfied();
}
+ @Test
+ public void shouldSendAndReadMessagePropertiesFromMqtt() throws
InterruptedException {
+ // Given
+ String msg = "msg";
+ MqttProperties publishedMqttProperties = createMqttProperties(
+ "text/plain",
+ "some-response-topic",
+ List.of(new UserProperty("key1", "value1"), new
UserProperty("key2", "value2")));
+ mock.expectedBodiesReceived(msg);
+
+ // When
+ template.sendBodyAndHeader("direct:test", msg,
CAMEL_PAHO_MSG_PROPERTIES, publishedMqttProperties);
+
+ // Then
+ mock.assertIsSatisfied();
+ MqttProperties receivedMqttProperties
+ =
mock.getExchanges().get(0).getIn().getHeader(CAMEL_PAHO_MSG_PROPERTIES,
MqttProperties.class);
+ assertEquals(receivedMqttProperties.getContentType(),
publishedMqttProperties.getContentType());
+ assertEquals(receivedMqttProperties.getResponseTopic(),
publishedMqttProperties.getResponseTopic());
+ assertEquals(receivedMqttProperties.getUserProperties(),
publishedMqttProperties.getUserProperties());
+ }
+
@Test
public void shouldNotReadMessageFromUnregisteredTopic() throws
InterruptedException {
// Given
@@ -189,4 +215,12 @@ public class PahoMqtt5ComponentMqtt5IT extends
PahoMqtt5ITSupport {
mock.assertIsSatisfied();
}
+ private MqttProperties createMqttProperties(String contentType, String
responseTopic, List<UserProperty> userProperties) {
+ MqttProperties properties = new MqttProperties();
+ properties.setContentType(contentType);
+ properties.setResponseTopic(responseTopic);
+ properties.setUserProperties(userProperties);
+ return properties;
+ }
+
}
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PahoMqtt5EndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PahoMqtt5EndpointBuilderFactory.java
index 5fc5855ddbf..a8c6feaae6b 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PahoMqtt5EndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PahoMqtt5EndpointBuilderFactory.java
@@ -3241,6 +3241,20 @@ public interface PahoMqtt5EndpointBuilderFactory {
public String pahoMqtt5OverrideTopic() {
return "CamelPahoMqtt5OverrideTopic";
}
+ /**
+ * Consumer: The properties set on the incoming message. Producer: The
+ * properties to be set on the outgoing message.
+ *
+ * The option is a: {@code
+ * org.eclipse.paho.mqttv5.common.packet.MqttProperties} type.
+ *
+ * Group: producer
+ *
+ * @return the name of the header {@code PahoMqtt5MsgProperties}.
+ */
+ public String pahoMqtt5MsgProperties() {
+ return "CamelPahoMqtt5MsgProperties";
+ }
}
static PahoMqtt5EndpointBuilder endpointBuilder(String componentName,
String path) {
class PahoMqtt5EndpointBuilderImpl extends AbstractEndpointBuilder
implements PahoMqtt5EndpointBuilder, AdvancedPahoMqtt5EndpointBuilder {