This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new e15819a7ffa CAMEL-20699, CAMEL-20691: Fix Azure ServiceBus header
propagation (#13872)
e15819a7ffa is described below
commit e15819a7ffa0ff0cbd36eef15b4978cd0ac563ce
Author: Dylan Piergies <[email protected]>
AuthorDate: Wed Apr 24 20:16:18 2024 +0100
CAMEL-20699, CAMEL-20691: Fix Azure ServiceBus header propagation (#13872)
* CAMEL-20699: Fix Azure ServiceBus consumer broker property propagation
* CAMEL-20691: Fix propagation of application properties
---
.../azure/servicebus/ServiceBusConfiguration.java | 3 +--
.../azure/servicebus/ServiceBusConsumer.java | 2 +-
.../servicebus/ServiceBusHeaderFilterStrategy.java | 31 ++++++++++++++++++++++
3 files changed, 33 insertions(+), 3 deletions(-)
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConfiguration.java
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConfiguration.java
index 4bc10e16f2f..55f2840134b 100644
---
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConfiguration.java
+++
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConfiguration.java
@@ -31,7 +31,6 @@ import
com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.azure.messaging.servicebus.models.SubQueue;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.spi.*;
-import org.apache.camel.support.DefaultHeaderFilterStrategy;
import static
org.apache.camel.component.azure.servicebus.CredentialType.CONNECTION_STRING;
@@ -59,7 +58,7 @@ public class ServiceBusConfiguration implements Cloneable,
HeaderFilterStrategyA
private AmqpTransportType amqpTransportType = AmqpTransportType.AMQP;
@UriParam(label = "common",
description = "To use a custom HeaderFilterStrategy to filter
Service Bus application properties to and from Camel message headers.")
- private HeaderFilterStrategy headerFilterStrategy = new
DefaultHeaderFilterStrategy();
+ private HeaderFilterStrategy headerFilterStrategy = new
ServiceBusHeaderFilterStrategy();
@UriParam(label = "consumer", defaultValue = "receiveMessages")
private ServiceBusConsumerOperationDefinition consumerOperation =
ServiceBusConsumerOperationDefinition.receiveMessages;
@UriParam(label = "consumer")
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
index 14f8297ff25..17551243b18 100644
---
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
+++
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
@@ -189,7 +189,7 @@ public class ServiceBusConsumer extends DefaultConsumer {
// propagate headers
final HeaderFilterStrategy headerFilterStrategy =
getConfiguration().getHeaderFilterStrategy();
-
message.setHeaders(receivedMessage.getApplicationProperties().entrySet().stream()
+
message.getHeaders().putAll(receivedMessage.getApplicationProperties().entrySet().stream()
.filter(entry ->
!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(),
entry.getValue(), exchange))
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)));
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusHeaderFilterStrategy.java
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusHeaderFilterStrategy.java
new file mode 100644
index 00000000000..5ab2a97e08a
--- /dev/null
+++
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusHeaderFilterStrategy.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.servicebus;
+
+import org.apache.camel.support.DefaultHeaderFilterStrategy;
+
+public class ServiceBusHeaderFilterStrategy extends
DefaultHeaderFilterStrategy {
+ public ServiceBusHeaderFilterStrategy() {
+ super();
+ initialise();
+ }
+
+ private void initialise() {
+ setOutFilterStartsWith("Camel", "camel", "org.apache.camel.");
+ setInFilterStartsWith("Camel", "camel", "org.apache.camel.");
+ }
+}