This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch var-headers in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0731a28208698444e717edf72414fbaa5faee63a Author: Claus Ibsen <[email protected]> AuthorDate: Mon Jan 29 10:59:38 2024 +0100 CAMEL-19749: variables - Should also copy message headers into variable when using EIP variables --- .../java/org/apache/camel/processor/Enricher.java | 19 ++++- .../org/apache/camel/processor/PollEnricher.java | 17 +++- .../camel/processor/EnrichVariableHeadersTest.java | 94 ++++++++++++++++++++++ .../processor/PollEnrichVariableHeadersTest.java | 62 ++++++++++++++ .../org/apache/camel/support/ExchangeHelper.java | 6 +- 5 files changed, 187 insertions(+), 11 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java index bb8350971ec..9d641b01a05 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Enricher.java @@ -16,6 +16,8 @@ */ package org.apache.camel.processor; +import java.util.Map; + import org.apache.camel.AggregationStrategy; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; @@ -26,6 +28,7 @@ import org.apache.camel.ExchangePattern; import org.apache.camel.ExchangePropertyKey; import org.apache.camel.Expression; import org.apache.camel.spi.EndpointUtilizationStatistics; +import org.apache.camel.spi.HeadersMapFactory; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.ProcessorExchangeFactory; import org.apache.camel.spi.RouteIdAware; @@ -63,6 +66,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA private boolean ignoreInvalidEndpoint; private boolean allowOptimisedComponents = true; private boolean autoStartupComponents = true; + private HeadersMapFactory headersMapFactory; private ProcessorExchangeFactory processorExchangeFactory; private SendDynamicProcessor sendDynamicProcessor; @@ -188,9 +192,12 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA // if we should store the received message body in a variable, // then we need to preserve the original message body Object body = null; + Map<String, Object> headers = null; if (variableReceive != null) { try { body = exchange.getMessage().getBody(); + // do a defensive copy of the headers + headers = headersMapFactory.newMap(exchange.getMessage().getHeaders()); } catch (Exception throwable) { exchange.setException(throwable); callback.done(true); @@ -198,6 +205,7 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA } } final Object originalBody = body; + final Map<String, Object> originalHeaders = headers; return sendDynamicProcessor.process(resourceExchange, new AsyncCallback() { @Override @@ -216,9 +224,9 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA if (aggregatedExchange != null) { if (variableReceive != null) { // result should be stored in variable instead of message body - Object value = aggregatedExchange.getMessage().getBody(); - ExchangeHelper.setVariable(exchange, variableReceive, value); - aggregatedExchange.getMessage().setBody(originalBody); + ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive); + exchange.getMessage().setBody(originalBody); + exchange.getMessage().setHeaders(originalHeaders); } // copy aggregation result onto original exchange (preserving pattern) copyResultsWithoutCorrelationId(exchange, aggregatedExchange); @@ -302,6 +310,11 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA ServiceHelper.buildService(processorExchangeFactory, sendDynamicProcessor); } + @Override + protected void doInit() throws Exception { + headersMapFactory = camelContext.getCamelContextExtension().getHeadersMapFactory(); + } + @Override protected void doStart() throws Exception { ServiceHelper.startService(processorExchangeFactory, aggregationStrategy, sendDynamicProcessor); diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java index 21759c73f24..4005d83834c 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -16,6 +16,8 @@ */ package org.apache.camel.processor; +import java.util.Map; + import org.apache.camel.AggregationStrategy; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; @@ -32,6 +34,7 @@ import org.apache.camel.PollingConsumer; import org.apache.camel.spi.ConsumerCache; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.ExceptionHandler; +import org.apache.camel.spi.HeadersMapFactory; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.RouteIdAware; @@ -64,7 +67,8 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout private CamelContext camelContext; private ConsumerCache consumerCache; - protected volatile String scheme; + private HeadersMapFactory headersMapFactory; + private volatile String scheme; private String id; private String routeId; private String variableReceive; @@ -320,9 +324,12 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout // if we should store the received message body in a variable, // then we need to preserve the original message body Object originalBody = null; + Map<String, Object> originalHeaders = null; if (variableReceive != null) { try { originalBody = exchange.getMessage().getBody(); + // do a defensive copy of the headers + originalHeaders = headersMapFactory.newMap(exchange.getMessage().getHeaders()); } catch (Exception throwable) { exchange.setException(throwable); callback.done(true); @@ -345,9 +352,9 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout if (aggregatedExchange != null) { if (variableReceive != null) { // result should be stored in variable instead of message body - Object value = aggregatedExchange.getMessage().getBody(); - ExchangeHelper.setVariable(exchange, variableReceive, value); - aggregatedExchange.getMessage().setBody(originalBody); + ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive); + exchange.getMessage().setBody(originalBody); + exchange.getMessage().setHeaders(originalHeaders); } // copy aggregation result onto original exchange (preserving pattern) copyResultsPreservePattern(exchange, aggregatedExchange); @@ -485,6 +492,8 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Rout scheme = ExchangeHelper.resolveScheme(u); } + headersMapFactory = camelContext.getCamelContextExtension().getHeadersMapFactory(); + ServiceHelper.initService(consumerCache, aggregationStrategy); } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableHeadersTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableHeadersTest.java new file mode 100644 index 00000000000..229e5c86d4e --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/EnrichVariableHeadersTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; + +public class EnrichVariableHeadersTest extends ContextTestSupport { + + @Test + public void testSend() throws Exception { + getMockEndpoint("mock:before").expectedBodiesReceived("World"); + getMockEndpoint("mock:before").expectedVariableReceived("hello", "Camel"); + getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel"); + getMockEndpoint("mock:result").expectedVariableReceived("hello", "Camel"); + getMockEndpoint("mock:result").message(0).header("echo").isEqualTo("CamelCamel"); + + template.sendBody("direct:send", "World"); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testReceive() throws Exception { + getMockEndpoint("mock:after").expectedBodiesReceived("World"); + getMockEndpoint("mock:after").expectedVariableReceived("bye", "Bye World"); + getMockEndpoint("mock:after").message(0).header("echo").isNull(); + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye World"); + getMockEndpoint("mock:result").message(0).header("echo").isNull(); + + template.sendBody("direct:receive", "World"); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testSendAndReceive() throws Exception { + getMockEndpoint("mock:before").expectedBodiesReceived("World"); + getMockEndpoint("mock:before").expectedVariableReceived("hello", "Camel"); + getMockEndpoint("mock:result").expectedBodiesReceived("World"); + getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye Camel"); + getMockEndpoint("mock:result").message(0).header("echo").isNull(); + + template.sendBody("direct:sendAndReceive", "World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:send") + .setVariable("hello", simple("Camel")) + .to("mock:before") + .enrich().constant("direct:foo").variableSend("hello") + .to("mock:result"); + + from("direct:receive") + .enrich().constant("direct:foo").variableReceive("bye") + .to("mock:after") + .setBody(simple("${variable:bye}")) + .to("mock:result"); + + from("direct:sendAndReceive") + .setVariable("hello", simple("Camel")) + .to("mock:before") + .enrich().constant("direct:foo").variableSend("hello").variableReceive("bye") + .to("mock:result"); + + from("direct:foo") + .setHeader("echo", simple("${body}${body}")) + .transform().simple("Bye ${body}"); + } + }; + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableHeadersTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableHeadersTest.java new file mode 100644 index 00000000000..852e3b0cd13 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/PollEnrichVariableHeadersTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.Map; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class PollEnrichVariableHeadersTest extends ContextTestSupport { + + @Test + public void testReceive() throws Exception { + template.sendBodyAndHeader("seda:foo", "Bye World", "echo", "CamelCamel"); + + getMockEndpoint("mock:after").expectedBodiesReceived("World"); + getMockEndpoint("mock:after").expectedVariableReceived("bye", "Bye World"); + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:result").expectedVariableReceived("bye", "Bye World"); + getMockEndpoint("mock:result").message(0).header("echo").isNull(); + getMockEndpoint("mock:result").whenAnyExchangeReceived(e -> { + Map m = e.getVariable("bye.headers", Map.class); + Assertions.assertNotNull(m); + Assertions.assertEquals(1, m.size()); + Assertions.assertEquals("CamelCamel", m.get("echo")); + }); + + template.sendBody("direct:receive", "World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:receive") + .pollEnrich().constant("seda:foo").timeout(1000).variableReceive("bye") + .to("mock:after") + .setBody(simple("${variable:bye}")) + .to("mock:result"); + } + }; + } +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java index 85c34ecf86a..02df6ccce7b 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java @@ -1094,8 +1094,7 @@ public final class ExchangeHelper { name = StringHelper.after(name, ":"); repo.setVariable(name, value); } else { - exchange.setException( - new IllegalArgumentException("VariableRepository with id: " + id + " does not exist")); + throw new IllegalArgumentException("VariableRepository with id: " + id + " does not exist"); } } else { exchange.setVariable(name, value); @@ -1110,8 +1109,7 @@ public final class ExchangeHelper { = exchange.getContext().getCamelContextExtension().getContextPlugin(VariableRepositoryFactory.class); repo = factory.getVariableRepository(id); if (repo == null) { - exchange.setException( - new IllegalArgumentException("VariableRepository with id: " + id + " does not exist")); + throw new IllegalArgumentException("VariableRepository with id: " + id + " does not exist"); } name = StringHelper.after(name, ":"); }
