Let's say there is a route from which a statefull bean is invoked:
<camel:route id="Concurrently-called-route">
<camel:from uri="direct:concurrentlyCalledRoute"/>
<camel:bean ref="statefullBean" method="setSomeState"/>
<camel:bean ref="statefullBean" method="getSomeDataDependingOnState"/>
</camel:route>
Messages could be sent along this route concurrently, i.e. requestBody method
of ProducerTemplate is called from concurrent threads. So a problem would arise
if two excahnges were going and setSomeState was called during one exchange in
between of calls to setSomeState and getSomeDataDependingOnState performed
during another exchange. I see two ways to solve this problem each of which has
a drawback.
Using SEDA
<camel:route id="Councurrently-called-route">
<camel:from uri="direct:concurrentlyCalledRoute"/>
<camel:to uri="seda:sedaRoute"/>
</camel:route>
<camel:route id="SEDA-route">
<camel:from uri="seda:sedaRoute"/>
<camel:bean ref="statefullBean" method="setSomeState"/>
<camel:bean ref="statefullBean" method="getSomeDataDependingOnState"/>
</camel:route>
In this case messages sent from different threads gather in a queue of the SEDA
endpoint. Messages from this queue is handled in one thread while going along
SEDA-route. So processing of a message won't interfere processing of another
one. However, if there were many threads sending messages to
concurrentlyCalledRoute SEDA-route would be a bottleneck. If more than one
thread were used to handle items from the seda queue the problem with
concurrent calls to statefull beans would arise again.
Another way - use custom scope.
Custom scope
Spring Framework allows to implement custom scopes. So we are able to implement
a scope which will store a separate instance of a bean for each excahange.
public class ExchangeScope implements Scope {
private Map<String, Map<String,Object>> instances = new
ConcurrentHashMap<>();
private Map<String,Runnable> destructionCallbacks = new
ConcurrentHashMap<>();
private final ThreadLocal<String> currentExchangeId = new ThreadLocal<>();
public void activate(String exchangeId) {
if (!this.instances.containsKey(exchangeId)) {
Map<String, Object> instancesInCurrentExchangeScope = new
ConcurrentHashMap<>();
this.instances.put(exchangeId, instancesInCurrentExchangeScope);
}
this.currentExchangeId.set(exchangeId);
}
public void destroy() {
String currentExchangeId = this.currentExchangeId.get();
Map<String,Object> instancesInCurrentExchangeScope =
instances.get(currentExchangeId);
if (instancesInCurrentExchangeScope == null)
throw new RuntimeException("ExchangeScope with id = " +
currentExchangeId + " doesn't exist");
for (String name : instancesInCurrentExchangeScope.keySet()) {
this.remove(name);
}
instances.remove(currentExchangeId);
this.currentExchangeId.set(null);
}
@Override
public Object get(String name, ObjectFactory<?> objectFactory) {
// selects by name a bean instance from a map storing instances for current
exchange
// creates a new bean instance if necessary
}
@Override
public Object remove(String name) {
// removes a bean instance
}
@Override
public void registerDestructionCallback(String name, Runnable callback) {
this.destructionCallbacks.put(name, callback);
}
@Override
public Object resolveContextualObject(String name) {
String currentExchangeId = this.currentExchangeId.get();
if (currentExchangeId == null)
return null;
Map<String,Object> instancesInCurrentExchangeScope =
this.instances.get(currentExchangeId);
if (instancesInCurrentExchangeScope == null)
return null;
return instancesInCurrentExchangeScope.get(name);
}
@Override
public String getConversationId() {
return this.currentExchangeId.get();
}
}
Now we can register this custom scope and declare statefullBean as exchange
scoped:
<bean id="exchangeScope" class="org.my.ExchangeScope"/>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="ExchangeScope" value-ref="exchangeScope"/>
</map>
</property>
</bean>
<bean id="statefullBean" class="org.my.StatefullBean" scope="ExchangeScope"/>
To use exchange scope we should call activate method of ExchangeScope before
sending a message and call destroy after that:
exchangeScope.activate(exchangeId);
producerTemplate.requestBody(request);
exchangeScope.destroy(exchangeId);
With this implementation a exchange scope is actually a thread scope. And this
is a drawback. If for example multithreaded splitter was used in a route it
would be unable to call exchange scoped beans from threads created by the
splitter because calls to beans would be performed in threads different from a
thread an exchange is started in.
Any ideas how to work around these drawbacks? Are there completelly different
ways to isolate statefull beans during concurrent exchanges?