This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch var-headers in repository https://gitbox.apache.org/repos/asf/camel.git
commit 591b96914769ae82cf8c9dc09897bc395bbd3597 Author: Claus Ibsen <[email protected]> AuthorDate: Tue Jan 30 10:22:07 2024 +0100 CAMEL-19749: variables - Should also copy message headers into variable when using EIP variables --- .../apache/camel/spi/StreamCachingStrategy.java | 8 +++ .../impl/engine/DefaultStreamCachingStrategy.java | 18 ++++- .../camel/language/tokenizer/TokenizeLanguage.java | 3 +- ...sitory.java => AbstractVariableRepository.java} | 83 +++++++++++++--------- .../camel/support/ExchangeVariableRepository.java | 74 ++----------------- .../camel/support/GlobalVariableRepository.java | 60 +--------------- .../camel/support/builder/ExpressionBuilder.java | 8 +-- 7 files changed, 85 insertions(+), 169 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java b/core/camel-api/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java index 68de83dd718..267949c9ba6 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/StreamCachingStrategy.java @@ -282,4 +282,12 @@ public interface StreamCachingStrategy extends StaticService { */ StreamCache cache(Message message); + /** + * Caches the value aas a {@link StreamCache}. + * + * @param value the value + * @return the value cached as a {@link StreamCache}, or <tt>null</tt> if not possible or no need to cache + */ + StreamCache cache(Object value); + } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java index 4c0919f7b7e..92df0417bd8 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java @@ -249,21 +249,33 @@ public class DefaultStreamCachingStrategy extends ServiceSupport implements Came @Override public StreamCache cache(Exchange exchange) { - return cache(exchange.getMessage()); + return doCache(exchange.getMessage().getBody(), exchange); } @Override public StreamCache cache(Message message) { + return doCache(message.getBody(), message.getExchange()); + } + + @Override + public StreamCache cache(Object body) { + return doCache(body, null); + } + + private StreamCache doCache(Object body, Exchange exchange) { StreamCache cache = null; // try convert to stream cache - Object body = message.getBody(); if (body != null) { boolean allowed = allowClasses == null && denyClasses == null; if (!allowed) { allowed = checkAllowDenyList(body); } if (allowed) { - cache = camelContext.getTypeConverter().convertTo(StreamCache.class, message.getExchange(), body); + if (exchange != null) { + cache = camelContext.getTypeConverter().convertTo(StreamCache.class, exchange, body); + } else { + cache = camelContext.getTypeConverter().convertTo(StreamCache.class, body); + } } } if (cache != null) { diff --git a/core/camel-core-languages/src/main/java/org/apache/camel/language/tokenizer/TokenizeLanguage.java b/core/camel-core-languages/src/main/java/org/apache/camel/language/tokenizer/TokenizeLanguage.java index 62b63391ba1..3662e1b0284 100644 --- a/core/camel-core-languages/src/main/java/org/apache/camel/language/tokenizer/TokenizeLanguage.java +++ b/core/camel-core-languages/src/main/java/org/apache/camel/language/tokenizer/TokenizeLanguage.java @@ -133,7 +133,8 @@ public class TokenizeLanguage extends SingleInputLanguageSupport implements Prop if (answer == null) { // use the regular tokenizer - final Expression exp = ExpressionBuilder.singleInputExpression(getVariableName(), getHeaderName(), getPropertyName()); + final Expression exp + = ExpressionBuilder.singleInputExpression(getVariableName(), getHeaderName(), getPropertyName()); if (regex) { answer = ExpressionBuilder.regexTokenizeExpression(exp, token); } else { diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java b/core/camel-support/src/main/java/org/apache/camel/support/AbstractVariableRepository.java similarity index 58% copy from core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java copy to core/camel-support/src/main/java/org/apache/camel/support/AbstractVariableRepository.java index 2101982bf4a..ba6655b0541 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/AbstractVariableRepository.java @@ -22,49 +22,35 @@ import java.util.stream.Stream; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; -import org.apache.camel.Exchange; -import org.apache.camel.NonManagedService; import org.apache.camel.StreamCache; -import org.apache.camel.converter.stream.CachedOutputStream; +import org.apache.camel.StreamCacheException; import org.apache.camel.spi.BrowsableVariableRepository; -import org.apache.camel.spi.VariableRepository; +import org.apache.camel.spi.StreamCachingStrategy; import org.apache.camel.support.service.ServiceSupport; -import org.apache.camel.util.CaseInsensitiveMap; -import org.apache.camel.util.StringHelper; /** - * {@link VariableRepository} which is local per {@link Exchange} to hold request-scoped variables. + * Base class for {@link org.apache.camel.spi.VariableRepository} implementations that store variables in memory. */ -class ExchangeVariableRepository extends ServiceSupport implements BrowsableVariableRepository, NonManagedService { +public abstract class AbstractVariableRepository extends ServiceSupport + implements BrowsableVariableRepository, CamelContextAware { private final Map<String, Object> variables = new ConcurrentHashMap<>(8); - private final CamelContext camelContext; + private CamelContext camelContext; + private StreamCachingStrategy strategy; - public ExchangeVariableRepository(CamelContext camelContext) { - this.camelContext = camelContext; + @Override + public CamelContext getCamelContext() { + return camelContext; } @Override - public String getId() { - return "exchange"; + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; } @Override public Object getVariable(String name) { Object answer = variables.get(name); - if (answer == null && name.endsWith(".headers")) { - String prefix = name.substring(0, name.length() - 1) + "."; // xxx.headers -> xxx.header. - // we want all headers for a given variable - Map<String, Object> map = new CaseInsensitiveMap(); - for (Map.Entry<String, Object> entry : variables.entrySet()) { - String key = entry.getKey(); - if (key.startsWith(prefix)) { - key = StringHelper.after(key, prefix); - map.put(key, entry.getValue()); - } - } - return map; - } if (answer instanceof StreamCache sc) { // reset so the cache is ready to be used as a variable sc.reset(); @@ -74,15 +60,10 @@ class ExchangeVariableRepository extends ServiceSupport implements BrowsableVari @Override public void setVariable(String name, Object value) { - // special for some values that are CachedOutputStream which we want to be re-readable and therefore - // convert this to StreamCache - // TODO: Do something like StreamCachingHelper - // TODO: support base class that has stream caching stuff for set/getVariable - if (camelContext.isStreamCaching()) - return StreamCachingHelper.convertToStreamCache(strategy, exchange, exchange.getIn()); - Object cache = camelContext.getTypeConverter().tryConvertTo(StreamCache.class, value); - if (cache != null) { - value = cache; + if (value != null && strategy != null) { + StreamCache sc = convertToStreamCache(value); + if (sc != null) { + value = sc; } } if (value != null) { @@ -125,4 +106,36 @@ class ExchangeVariableRepository extends ServiceSupport implements BrowsableVari } return variables.remove(name); } + + @Override + protected void doInit() throws Exception { + super.doInit(); + + if (camelContext != null && camelContext.isStreamCaching()) { + strategy = camelContext.getStreamCachingStrategy(); + } + } + + protected StreamCache convertToStreamCache(Object body) { + // check if body is already cached + if (body == null) { + return null; + } else if (body instanceof StreamCache) { + StreamCache sc = (StreamCache) body; + // reset so the cache is ready to be used before processing + sc.reset(); + return sc; + } + return tryStreamCache(body); + } + + protected StreamCache tryStreamCache(Object body) { + try { + // cache the body and if we could do that replace it as the new body + return strategy.cache(body); + } catch (Exception e) { + throw new StreamCacheException(body, e); + } + } + } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java index 2101982bf4a..1d75855542f 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeVariableRepository.java @@ -17,31 +17,24 @@ package org.apache.camel.support; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Stream; import org.apache.camel.CamelContext; -import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; -import org.apache.camel.NonManagedService; import org.apache.camel.StreamCache; -import org.apache.camel.converter.stream.CachedOutputStream; -import org.apache.camel.spi.BrowsableVariableRepository; import org.apache.camel.spi.VariableRepository; -import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.CaseInsensitiveMap; import org.apache.camel.util.StringHelper; /** * {@link VariableRepository} which is local per {@link Exchange} to hold request-scoped variables. */ -class ExchangeVariableRepository extends ServiceSupport implements BrowsableVariableRepository, NonManagedService { - - private final Map<String, Object> variables = new ConcurrentHashMap<>(8); - private final CamelContext camelContext; +final class ExchangeVariableRepository extends AbstractVariableRepository { public ExchangeVariableRepository(CamelContext camelContext) { - this.camelContext = camelContext; + setCamelContext(camelContext); + // ensure its started + ServiceHelper.startService(this); } @Override @@ -51,12 +44,12 @@ class ExchangeVariableRepository extends ServiceSupport implements BrowsableVari @Override public Object getVariable(String name) { - Object answer = variables.get(name); + Object answer = super.getVariable(name); if (answer == null && name.endsWith(".headers")) { String prefix = name.substring(0, name.length() - 1) + "."; // xxx.headers -> xxx.header. // we want all headers for a given variable Map<String, Object> map = new CaseInsensitiveMap(); - for (Map.Entry<String, Object> entry : variables.entrySet()) { + for (Map.Entry<String, Object> entry : getVariables().entrySet()) { String key = entry.getKey(); if (key.startsWith(prefix)) { key = StringHelper.after(key, prefix); @@ -72,57 +65,4 @@ class ExchangeVariableRepository extends ServiceSupport implements BrowsableVari return answer; } - @Override - public void setVariable(String name, Object value) { - // special for some values that are CachedOutputStream which we want to be re-readable and therefore - // convert this to StreamCache - // TODO: Do something like StreamCachingHelper - // TODO: support base class that has stream caching stuff for set/getVariable - if (camelContext.isStreamCaching()) - return StreamCachingHelper.convertToStreamCache(strategy, exchange, exchange.getIn()); - Object cache = camelContext.getTypeConverter().tryConvertTo(StreamCache.class, value); - if (cache != null) { - value = cache; - } - } - if (value != null) { - // avoid the NullPointException - variables.put(name, value); - } else { - // if the value is null, we just remove the key from the map - variables.remove(name); - } - } - - public boolean hasVariables() { - return !variables.isEmpty(); - } - - public int size() { - return variables.size(); - } - - public Stream<String> names() { - return variables.keySet().stream(); - } - - public Map<String, Object> getVariables() { - return variables; - } - - public void setVariables(Map<String, Object> map) { - variables.putAll(map); - } - - public void clear() { - variables.clear(); - } - - @Override - public Object removeVariable(String name) { - if (!hasVariables()) { - return null; - } - return variables.remove(name); - } } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/GlobalVariableRepository.java b/core/camel-support/src/main/java/org/apache/camel/support/GlobalVariableRepository.java index 09f1e7d7270..cf7abcd1fd8 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/GlobalVariableRepository.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/GlobalVariableRepository.java @@ -17,75 +17,17 @@ package org.apache.camel.support; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.stream.Stream; -import org.apache.camel.StreamCache; -import org.apache.camel.spi.BrowsableVariableRepository; import org.apache.camel.spi.VariableRepository; -import org.apache.camel.support.service.ServiceSupport; /** * Global {@link VariableRepository} which stores variables in-memory in a {@link Map}. */ -public final class GlobalVariableRepository extends ServiceSupport implements BrowsableVariableRepository { - - private final ConcurrentMap<String, Object> variables = new ConcurrentHashMap<>(); +public final class GlobalVariableRepository extends AbstractVariableRepository { @Override public String getId() { return "global"; } - @Override - public Object getVariable(String name) { - Object answer = variables.get(name); - if (answer instanceof StreamCache sc) { - // reset so the cache is ready to be used as a variable - sc.reset(); - } - return answer; - } - - @Override - public void setVariable(String name, Object value) { - if (value != null) { - // avoid the NullPointException - variables.put(name, value); - } else { - // if the value is null, we just remove the key from the map - variables.remove(name); - } - } - - @Override - public Object removeVariable(String name) { - return variables.remove(name); - } - - @Override - public boolean hasVariables() { - return !variables.isEmpty(); - } - - @Override - public int size() { - return variables.size(); - } - - @Override - public Stream<String> names() { - return variables.keySet().stream(); - } - - @Override - public Map<String, Object> getVariables() { - return variables; - } - - @Override - public void clear() { - variables.clear(); - } } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/builder/ExpressionBuilder.java b/core/camel-support/src/main/java/org/apache/camel/support/builder/ExpressionBuilder.java index 4853d3eafe7..6ea8d094b0b 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/builder/ExpressionBuilder.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/builder/ExpressionBuilder.java @@ -1142,13 +1142,13 @@ public class ExpressionBuilder { } /** - * @param variableName the name of the variable from which the input data must be extracted if not empty. + * @param variableName the name of the variable from which the input data must be extracted if not empty. * @param headerName the name of the header from which the input data must be extracted if not empty. * @param propertyName the name of the property from which the input data must be extracted if not empty and * {@code headerName} is empty. - * @return a variable expression if {@code variableName} is not empty, - * a header expression if {@code headerName} is not empty, otherwise a property expression if - * {@code propertyName} is not empty or finally a body expression. + * @return a variable expression if {@code variableName} is not empty, a header expression if + * {@code headerName} is not empty, otherwise a property expression if {@code propertyName} is + * not empty or finally a body expression. */ public static Expression singleInputExpression(String variableName, String headerName, String propertyName) { final Expression exp;
