[ 
https://issues.apache.org/jira/browse/CXF-2002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12703198#action_12703198
 ] 

Sergey Beryozkin commented on CXF-2002:
---------------------------------------

Hmm... This is something that I don't understand...

Setting maxSuspendedContinuations to 1 is like saying "Start a new continuation 
and block the consumption of other messages untill this continuation 
completes". 

The bogus selector is set without using any synchronization blocks. The 
messageSelector property is volatile in DLMC, but I'm wondering, may be 
ConcurrentLinkedQueue.size() is what causes the problem is that it does not 
reflect the actual size in time ?

Even if it were the case then I'd still be surprised. Though one possible 
explanation is here : By the time we set a bogus message selector from a given 
continuation instance, we already have say 100 messages in the queue, and then 
while this continuation is being processed, no more messages are accepted; also 
while this continuation is being processed, other continuations corresponding 
to some of those 100 messages are also being handled and eventually completed; 
now, finally, the continuation which set the bogus selector completes and 
resets the selector back to normal. 

Now the same story happens again but by the time we set the bogus selector 
again, we already have say  180 messages in the queue, etc. So eventually, even 
though we try to throttle the consumption, the queue gets overflown. 

And I'm not sure if adding some additional synchronization will fix it - but we 
can give it a try.

Freeman - can you please give me another favor and update

protected void updateContinuations(boolean remove) {

        modifyList(remove);
        
        if (jmsConfig.getMaxSuspendedContinuations() < 0
            || jmsListener.getCacheLevel() >= 
DefaultMessageListenerContainer.CACHE_CONSUMER) {
            return;
        }
        
        // throttle the flow if there're too many continuation instances in 
memory
        if (remove && !BOGUS_MESSAGE_SELECTOR.equals(currentMessageSelector)) {
            jmsListener.setMessageSelector(currentMessageSelector);
            currentMessageSelector = BOGUS_MESSAGE_SELECTOR;
        } else if (!remove && continuations.size() >= 
jmsConfig.getMaxSuspendedContinuations()) {
            currentMessageSelector = jmsListener.getMessageSelector();
            if (!BOGUS_MESSAGE_SELECTOR.equals(currentMessageSelector)) {
                jmsListener.setMessageSelector(BOGUS_MESSAGE_SELECTOR);
            }
        }

    }


to 

protected void updateContinuations(boolean remove) {
        
        if (jmsConfig.getMaxSuspendedContinuations() < 0
            || jmsListener.getCacheLevel() >= 
DefaultMessageListenerContainer.CACHE_CONSUMER) {
            modifyList(remove);
            return;
        }
        
        // throttle the flow if there're too many continuation instances in 
memory
       synchronized(continuations) {
          modifyList(remove);
          if (remove && !BOGUS_MESSAGE_SELECTOR.equals(currentMessageSelector)) 
{
            jmsListener.setMessageSelector(currentMessageSelector);
            currentMessageSelector = BOGUS_MESSAGE_SELECTOR;
          } else if (!remove && continuations.size() >= 
jmsConfig.getMaxSuspendedContinuations()) {
            currentMessageSelector = jmsListener.getMessageSelector();
            if (!BOGUS_MESSAGE_SELECTOR.equals(currentMessageSelector)) {
                jmsListener.setMessageSelector(BOGUS_MESSAGE_SELECTOR);
            }
          }
     }


and see if it will slow down at least the occurence of the lost messages 
problem ?

Ron : what do you think may be causing the problem despite the fact we try to 
set the bogus selector ? If the above explanation seems plausible then I'm 
actually not sure restarting DMLC will fix the issue either ? Do we also need 
to do some work in SMX-BC for this fix in CXF to take effect ?

Also, can anyone spot any flaw in JMSContinuation#updateContinuations ?



many thanks, Sergey


[1] 
http://svn.apache.org/repos/asf/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java

> Server async jms transport needs dynamic mechanism to throttle message 
> consumption
> ----------------------------------------------------------------------------------
>
>                 Key: CXF-2002
>                 URL: https://issues.apache.org/jira/browse/CXF-2002
>             Project: CXF
>          Issue Type: Improvement
>          Components: Transports
>    Affects Versions: 2.0.9, 2.1.3, 2.0.10
>            Reporter: Ron Gavlin
>            Assignee: Sergey Beryozkin
>
> Currently, the server-side async jms transport has no mechanism to throttle 
> consumption of incoming messages. This becomes problematic in scenarios where 
> a large backlog of messages exists on the input queue. In this case, it is 
> likely that the cxf server will overload its internal work item queues 
> resulting in problems. A dynamic throttling mechanism on the async jms server 
> is required to avoid this problem.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to