Let's say there is a route from which a statefull bean is invoked:

<camel:route id="Concurrently-called-route">
    <camel:from uri="direct:concurrentlyCalledRoute"/>
    <camel:bean ref="statefullBean" method="setSomeState"/>
    <camel:bean ref="statefullBean" method="getSomeDataDependingOnState"/>
</camel:route>

Messages could be sent along this route concurrently, i.e. requestBody method 
of ProducerTemplate is called from concurrent threads. So a problem would arise 
if two excahnges were going and setSomeState was called during one exchange in 
between of calls to setSomeState and getSomeDataDependingOnState performed 
during another exchange. I see two ways to solve this problem each of which has 
a drawback.


Using SEDA

<camel:route id="Councurrently-called-route">
    <camel:from uri="direct:concurrentlyCalledRoute"/>
    <camel:to uri="seda:sedaRoute"/>
</camel:route>

<camel:route id="SEDA-route">
    <camel:from uri="seda:sedaRoute"/>
    <camel:bean ref="statefullBean" method="setSomeState"/>
    <camel:bean ref="statefullBean" method="getSomeDataDependingOnState"/>
</camel:route>

In this case messages sent from different threads gather in a queue of the SEDA 
endpoint. Messages from this queue is handled in one thread while going along 
SEDA-route. So processing of a message won't interfere processing of another 
one. However, if there were many threads sending messages to 
concurrentlyCalledRoute SEDA-route would be a bottleneck. If more than one 
thread were used to handle items from the seda queue the problem with 
concurrent calls to statefull beans would arise again.

Another way - use custom scope.


Custom scope

Spring Framework allows to implement custom scopes. So we are able to implement 
a scope which will store a separate instance of a bean for each excahange.

public class ExchangeScope implements Scope {

    private Map<String, Map<String,Object>> instances = new 
ConcurrentHashMap<>();

    private Map<String,Runnable> destructionCallbacks = new 
ConcurrentHashMap<>();

    private final ThreadLocal<String> currentExchangeId = new ThreadLocal<>();

    public void activate(String exchangeId) {
        if (!this.instances.containsKey(exchangeId)) {
            Map<String, Object> instancesInCurrentExchangeScope = new 
ConcurrentHashMap<>();
            this.instances.put(exchangeId, instancesInCurrentExchangeScope);
        }
        this.currentExchangeId.set(exchangeId);
    }

    public void destroy() {
        String currentExchangeId = this.currentExchangeId.get();
        Map<String,Object> instancesInCurrentExchangeScope = 
instances.get(currentExchangeId);
        if (instancesInCurrentExchangeScope == null)
            throw new RuntimeException("ExchangeScope with id = " + 
currentExchangeId + " doesn't exist");
        for (String name : instancesInCurrentExchangeScope.keySet()) {
            this.remove(name);
        }
        instances.remove(currentExchangeId);
        this.currentExchangeId.set(null);
    }

    @Override
    public Object get(String name, ObjectFactory<?> objectFactory) {
    // selects by name a bean instance from a map storing instances for current 
exchange
    // creates a new bean instance if necessary
    }

    @Override
    public Object remove(String name) {
    // removes a bean instance
    }

    @Override
    public void registerDestructionCallback(String name, Runnable callback) {
        this.destructionCallbacks.put(name, callback);
    }

    @Override
    public Object resolveContextualObject(String name) {
        String currentExchangeId = this.currentExchangeId.get();
        if (currentExchangeId == null)
            return null;

        Map<String,Object> instancesInCurrentExchangeScope = 
this.instances.get(currentExchangeId);
        if (instancesInCurrentExchangeScope == null)
            return null;

        return instancesInCurrentExchangeScope.get(name);
    }

    @Override
    public String getConversationId() {
        return this.currentExchangeId.get();
    }
}

Now we can register this custom scope and declare statefullBean as exchange 
scoped:

<bean id="exchangeScope" class="org.my.ExchangeScope"/>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="ExchangeScope" value-ref="exchangeScope"/>
        </map>
    </property>
</bean>

<bean id="statefullBean" class="org.my.StatefullBean" scope="ExchangeScope"/>

To use exchange scope we should call activate method of ExchangeScope before 
sending a message and call destroy after that:

exchangeScope.activate(exchangeId);
producerTemplate.requestBody(request);
exchangeScope.destroy(exchangeId);

With this implementation a exchange scope is actually a thread scope. And this 
is a drawback. If for example multithreaded splitter was used in a route it 
would be unable to call exchange scoped beans from threads created by the 
splitter because calls to beans would be performed in threads different from a 
thread an exchange is started in.

Any ideas how to work around these drawbacks? Are there completelly different 
ways to isolate statefull beans during concurrent exchanges?

Reply via email to