This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new ca8dae067d3 CAMEL-21407: Implemented the case of processing an
unsubscribe request by first looking to see if the message body is a control
message, and processing it, then obtaining the parameters from headers if they
are not in the message body. (#16128)
ca8dae067d3 is described below
commit ca8dae067d3a769061d2ada4494415694eacb9ad
Author: Steve Storck <[email protected]>
AuthorDate: Thu Oct 31 07:54:33 2024 +0000
CAMEL-21407: Implemented the case of processing an unsubscribe request by
first looking to see if the message body is a control message, and processing
it, then obtaining the parameters from headers if they are not in the message
body. (#16128)
---
.../control/DynamicRouterControlProducer.java | 13 +++++++---
.../control/DynamicRouterControlProducerTest.java | 28 ++++++++++++++++------
2 files changed, 31 insertions(+), 10 deletions(-)
diff --git
a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducer.java
b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducer.java
index 797052f56a1..178cd090339 100644
---
a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducer.java
+++
b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducer.java
@@ -164,9 +164,16 @@ public class DynamicRouterControlProducer extends
HeaderSelectorProducer {
*/
@InvokeOnHeader(CONTROL_ACTION_UNSUBSCRIBE)
public void performUnsubscribe(final Message message, AsyncCallback
callback) {
- Map<String, Object> headers = message.getHeaders();
- String subscriptionId = (String)
headers.getOrDefault(CONTROL_SUBSCRIPTION_ID,
configuration.getSubscriptionId());
- String subscribeChannel = (String)
headers.getOrDefault(CONTROL_SUBSCRIBE_CHANNEL,
configuration.getSubscribeChannel());
+ String subscriptionId;
+ String subscribeChannel;
+ if (message.getBody() instanceof DynamicRouterControlMessage) {
+ DynamicRouterControlMessage controlMessage =
message.getBody(DynamicRouterControlMessage.class);
+ subscriptionId = controlMessage.getSubscriptionId();
+ subscribeChannel = controlMessage.getSubscribeChannel();
+ } else {
+ subscriptionId = message.getHeader(CONTROL_SUBSCRIPTION_ID,
configuration.getSubscriptionId(), String.class);
+ subscribeChannel = message.getHeader(CONTROL_SUBSCRIBE_CHANNEL,
configuration.getSubscribeChannel(), String.class);
+ }
boolean result =
dynamicRouterControlService.removeSubscription(subscribeChannel,
subscriptionId);
message.setBody(result, boolean.class);
callback.done(false);
diff --git
a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducerTest.java
b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducerTest.java
index ea65164dfe5..f86a48d5557 100644
---
a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducerTest.java
+++
b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducerTest.java
@@ -40,7 +40,6 @@ import static
org.apache.camel.component.dynamicrouter.control.DynamicRouterCont
import static
org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_LIST;
import static
org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_STATS;
import static
org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_SUBSCRIBE;
-import static
org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_UNSUBSCRIBE;
import static
org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_UPDATE;
import static
org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_DESTINATION_URI;
import static
org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_EXPRESSION_LANGUAGE;
@@ -125,14 +124,29 @@ class DynamicRouterControlProducerTest {
}
@Test
- void performUnsubscribeAction() {
+ void performUnsubscribeActionWithControlMessage() {
String subscriptionId = "testId";
String subscribeChannel = "testChannel";
- Map<String, Object> headers = Map.of(
- CONTROL_ACTION_HEADER, CONTROL_ACTION_UNSUBSCRIBE,
- CONTROL_SUBSCRIBE_CHANNEL, subscribeChannel,
- CONTROL_SUBSCRIPTION_ID, subscriptionId);
- when(message.getHeaders()).thenReturn(headers);
+ DynamicRouterControlMessage unsubscribeMsg =
DynamicRouterControlMessage.Builder.newBuilder()
+ .subscriptionId(subscriptionId)
+ .subscribeChannel(subscribeChannel)
+ .build();
+ when(message.getBody()).thenReturn(unsubscribeMsg);
+
when(message.getBody(DynamicRouterControlMessage.class)).thenReturn(unsubscribeMsg);
+ Mockito.doNothing().when(callback).done(false);
+ producer.performUnsubscribe(message, callback);
+ Mockito.verify(controlService, Mockito.times(1))
+ .removeSubscription(subscribeChannel, subscriptionId);
+ }
+
+ @Test
+ void performUnsubscribeActionWithHeaders() {
+ String subscriptionId = "testId";
+ String subscribeChannel = "testChannel";
+ when(message.getHeader(CONTROL_SUBSCRIPTION_ID,
configuration.getSubscriptionId(), String.class))
+ .thenReturn(subscriptionId);
+ when(message.getHeader(CONTROL_SUBSCRIBE_CHANNEL,
configuration.getSubscribeChannel(), String.class))
+ .thenReturn(subscribeChannel);
Mockito.doNothing().when(callback).done(false);
producer.performUnsubscribe(message, callback);
Mockito.verify(controlService, Mockito.times(1))