This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new ecdfbbfe58c CAMEL-20701: added a custom deserializer to be used when
interoperating with JMS
ecdfbbfe58c is described below
commit ecdfbbfe58cd648c2efada77d1536aec6f8d929f
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Fri Apr 19 10:06:01 2024 +0200
CAMEL-20701: added a custom deserializer to be used when interoperating
with JMS
---
.../camel-kafka/src/main/docs/kafka-component.adoc | 30 ++++++++-
.../consumer/support/interop/JMSDeserializer.java | 78 ++++++++++++++++++++++
2 files changed, 107 insertions(+), 1 deletion(-)
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index ae166242489..a00f0e78863 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -656,13 +656,41 @@ public class CustomSubscribeAdapter implements
SubscribeAdapter {
Then, it is necessary to add it as named bean instance to the registry:
-
[source,java]
.Add to registry example
----
context.getRegistry().bind(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER, new
CustomSubscribeAdapter());
----
+== Interoperability
+
+=== JMS
+
+When interoperating Kafka and JMS, it may be necessary to coerce the JMS
headers into their expected type.
+
+For instance, when consuming messages from Kafka carrying JMS headers and then
sending them to a JMS broker, those headers are
+first deserialized into a byte array. Then, the `camel-jms` component tries to
coerce this byte array into the
+specific type used by.
+However, both the origin endpoint as well as how this was setup on the code
itsef may affect how the data is serialized and
+deserialized. As such, it is not feasible to naively assume the data type of
the byte array.
+
+To address this issue, we provide a custom header deserializer to force Kafka
to de-serialize the JMS data according to
+the JMS specification. This approach ensures that the headers are properly
interpreted and processed by the camel-jms component.
+
+Due to the inherent complexity of each possible system and endpoint, it may
not be possible for this deserializer to cover all
+possible scenarios. As such, it is provided as model that can be modified
and/or adapted for the specific needs of each application.
+
+To utilize this solution, you need to modify the route URI on the consumer end
of the pipeline by including the
+`headerDeserializer` option.
+For example:
+
+[source,java]
+.Route snippet
+----
+from("kafka:topic?headerDeserializer=#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer")
+ .to("...");
+----
+
include::spring-boot:partial$starter.adoc[]
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/interop/JMSDeserializer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/interop/JMSDeserializer.java
new file mode 100644
index 00000000000..61a489d81af
--- /dev/null
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/interop/JMSDeserializer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.support.interop;
+
+import java.nio.ByteBuffer;
+
+import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
+
+public class JMSDeserializer implements KafkaHeaderDeserializer {
+
+ public boolean isLong(byte[] bytes) {
+ return bytes.length == Long.BYTES;
+ }
+
+ private static long bytesToLong(byte[] bytes) {
+ final ByteBuffer buffer = toByteBuffer(bytes, Long.BYTES);
+ return buffer.getLong();
+ }
+
+ private static int bytesToInt(byte[] bytes) {
+ final ByteBuffer buffer = toByteBuffer(bytes, Integer.BYTES);
+ return buffer.getInt();
+ }
+
+ private static ByteBuffer toByteBuffer(byte[] bytes, int size) {
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ buffer.put(bytes);
+ buffer.flip();
+ return buffer;
+ }
+
+ @Override
+ public Object deserialize(String key, byte[] value) {
+ if (key.startsWith("JMS")) {
+ switch (key) {
+ case "JMSDestination":
+ return new String(value);
+ case "JMSDeliveryMode":
+ return bytesToInt(value);
+ case "JMSTimestamp":
+ return bytesToLong(value);
+ case "JMSCorrelationID":
+ return value;
+ case "JMSReplyTo":
+ return new String(value);
+ case "JMSRedelivered":
+ return bytesToInt(value);
+ case "JMSType":
+ return new String(value);
+ case "JMSExpiration":
+ return isLong(value) ? bytesToLong(value) :
bytesToInt(value);
+ case "JMSPriority":
+ return bytesToInt(value);
+ case "JMSMessageID":
+ return new String(value);
+ default:
+ return value;
+ }
+ }
+
+ return value;
+ }
+}