Below is the source for  MessageConsumer.java

package ss.integration.util; 

import javax.jms.TopicConnectionFactory; 
import javax.jms.TopicConnection; 
import javax.jms.Topic; 
import javax.jms.TopicSession; 
import javax.jms.TopicSubscriber; 
import javax.jms.MessageListener; 
import javax.jms.JMSException; 

import ss.util.logging.Logger; 
import ss.util.SystemException; 
import ss.util.JNDILookup; 

import ss.integration.util.ExListener; 

public class MessageConsumer 
{ 

    private TopicConnectionFactory _topicConnectionFactory; 
    private TopicConnection _topicConnection; 
    private Topic _topic; 
    private TopicSession _session; 
    private TopicSubscriber _subscriber; 

    private String _strClientID; 
    private String _strTopicConnectionFactory; 
    private String _strTopic; 
    private String _strSelector; 
    private String _strContext; 
    private String _strURL; 

    private int _reconnectRetry; 
    private long _reconnectTime; 
    private boolean _firstTimeConnection; 

    // flag to indicated that a shutdown has been requested 
    private boolean _shutdownRequested; 

    private static ExListener _exListener; 
    private static MessageListener _msgListener; 


    public MessageConsumer(String clientID_, 
                           String topicConnectionFactory_, 
                           String topic_, 
                           String selector_, 
                           String context_, 
                           String url_) throws SystemException 
    { 
        _strClientID = clientID_; 
        _strTopicConnectionFactory = topicConnectionFactory_; 
        _strTopic = topic_; 
        _strSelector = selector_; 
        _strContext = context_; 
        _strURL = url_; 

        _exListener = null; 
        _msgListener = null; 

        _reconnectRetry = 0;        // reconnect is currently not used. 
        _reconnectTime = 1 * 1000; 
        _firstTimeConnection = true; 

        _shutdownRequested = false; 
    } 

    public void init() throws SystemException 
    { 
        try 
        { 
            Logger.trace("MessageConsumer, setting the topicConnection,
client: " + _strClientID); 

            _topicConnectionFactory = 
               
(TopicConnectionFactory)JNDILookup.lookup(_strTopicConnectionFactory,
_strContext, _strURL); 

            _topicConnection =
_topicConnectionFactory.createTopicConnection(); 

            _topicConnection.setClientID(_strClientID); 

            _topic = (Topic)JNDILookup.lookup(_strTopic, _strContext,
_strURL); 

            _session = _topicConnection.createTopicSession(true, 0); 

            _subscriber = _session.createDurableSubscriber(_topic, 
                                                           _strClientID, 
                                                           _strSelector, 
                                                           true); 
Logger.trace("_topic=" + _topic + " _strClientID=" + _strClientID + 
"  _strSelector = " + _strSelector); 
Logger.trace("_subscriber=" + _subscriber); 

            if (_exListener == null) 
            { 
                _exListener = new ExListener(this); 
            } 
        } 
        catch (JMSException je) 
        { 
            Logger.trace("MessageConsumer init() failed: JMSException"); 
            throw new SystemException(je); 
        } 
        catch (SystemException se) 
        { 
            Logger.trace("MessageConsumer init() failed: SystemException"); 
            throw new SystemException(se); 
        } 
        catch (Exception e) 
        { 
            Logger.trace("MessageConsumer init() failed: Exception"); 
            throw new SystemException(e); 
        } 
    } 


    public void start() throws SystemException 
    { 
        synchronized (this) 
        { 
            while (!_shutdownRequested) 
            { 
                try 
                { 
                    startConnection(); 

                    Logger.trace("Wait()..." ); 
                    wait(); 
                    Logger.trace("Waiting thread awaken" ); 
                } 
                catch (Exception e) 
                { 
                    Logger.error("Error in MessageConsumer.start: " +
e.getMessage()); 
                    throw new SystemException(e); 
                } 
                finally 
                { 
                    close(); 
                } 
            } 
        } 
    } 



    private void signalShutdown() 
    { 
        Logger.trace("Shutting down..."); 

        _shutdownRequested = true; 

        notifyWaitingThread(); 
    } 



    // to signal a waiting thread (waiting inside start() method) 
    public synchronized void notifyWaitingThread() 
    { 
        unregisterListener(); 

        Logger.trace("Notifying/signaling a waiting thread..."); 
        notify(); 
    } 


    public void setListener (MessageListener listener_) 
    { 
        _msgListener = listener_; 
    } 


    private void registerMsgListener() throws SystemException 
    { 
        if (_msgListener != null) 
        { 
            try 
            { 
                Logger.trace("Registering Message Listener..."); 

                // install asynchronous listener 
                _subscriber.setMessageListener(_msgListener); 
            } 
            catch (JMSException je) 
            { 
                throw new SystemException(je); 

            } 
            catch (Exception e) 
            { 
                throw new SystemException(e); 
            } 
        } 
        else 
        { 
            Logger.warning("MessageListener is null"); 
        } 
    } 


    private void unregisterListener() 
    { 
        try 
        { 
            Logger.trace("Unregistering Message Listener..."); 
            _subscriber.setMessageListener(null); 
        } 
        catch (Exception e) 
        { 
            Logger.error(e); 
        } 
    } 


    private void registerExceptionListener() throws SystemException 
    { 
        if (_exListener != null) 
        { 
            try 
            { 
                Logger.trace("Registering JMS Exception Listener..."); 

                // install asynchronous listener 
                _topicConnection.setExceptionListener(_exListener); 
            } 
            catch (JMSException je) 
            { 
                throw new SystemException(je); 
            } 
            catch (Exception e) 
            { 
                throw new SystemException(e); 
            } 
        } 
        else 
        { 
            Logger.warning("ExceptionListener is null"); 
        } 
    } 


    public void doCommit () throws SystemException 
    { 
        try 
        { 
            _session.commit(); 
        } 
        catch (JMSException je) 
        { 
            throw new SystemException(je); 
        } 
        catch (Exception e) 
        { 
            throw new SystemException(e); 
        } 
    } 


    public void doRollback () throws SystemException 
    { 
        try 
        { 
            Logger.trace("doRollback()  Rolling-back message..."); 
            _session.rollback();                            // Be careful
when using this. Sometimes the rollback doesn't work. 
            Logger.trace("doRollback()  Message rolled-back"); 

            signalShutdown(); 
        } 
        catch (JMSException je) 
        { 
            Logger.trace("doRollback() JMS Exception: " + je.getMessage()); 
            throw new SystemException(je); 
        } 
        catch (Exception e) 
        { 
            Logger.trace("doRollback() Exception :" + e.getMessage()); 
            throw new SystemException(e); 
        } 
    } 



    public void close() throws SystemException 
    { 
        try 
        { 
            if (_subscriber != null) 
            { 
                Logger.trace("closing subscriber..."); 
                _subscriber.close(); 
                _subscriber = null; 
                Logger.trace("subscriber closed"); 
            } 

            if (_session != null) 
            { 
                Logger.trace("closing session..."); 
                _session.close(); 
                _session = null; 
                Logger.trace("session closed"); 
            } 

            if (_topicConnection != null) 
            { 
                Logger.trace("closing topic connection..."); 
                _topicConnection.close(); 
                _topicConnection = null; 
                Logger.trace("topicConnection closed"); 
            } 

            _topicConnectionFactory = null; 
            _topic = null; 

        } 
        catch (Exception e) 
        { 
            Logger.trace("Exception in close()"); 
        } 
    } 


    private void startConnection() throws SystemException 
    { 
        boolean success = false; 

        for (int i=0; i<_reconnectRetry || _firstTimeConnection; i++) 
        { 
            _firstTimeConnection = false; 

            try 
            { 
                Logger.trace("Closing all connections... "); 
                close(); 

                if (_exListener != null) 
                { 
                    wait(_reconnectTime); 
                } 

                Logger.trace("Starting all connections... (" + (i+1) + "/" +
_reconnectRetry + ")"); 
                init(); 
                Logger.trace("Init successfull"); 

                registerMsgListener(); 
                Logger.trace("RegisterMsgListener successfull"); 

                registerExceptionListener(); 
                Logger.trace("RegisterExceptionListener successfull"); 

                Logger.trace("Starting TopicConnection..."); 
                _topicConnection.start(); 
                Logger.trace("TopicConnection started..."); 

                success = true; 

                break; 
            } 
            catch (Exception e) 
            { 
                Logger.error("startConnection() failed"); 
                Logger.error(e); 
            } 
        } 

        if (!success) 
        { 
            close(); 
            Logger.error("Unable to connect to the JMS Server"); 
            throw new SystemException("Unable to connect to the JMS
Server"); 
        } 
    } 
} 




activemqnewbie wrote:
> 
> Subscriber closes with an exception(below)after consuming certain
> messages.This happens at different intervals. 
> Appreciate if someone could let me know the reason and solution for this. 
> Am using activemq 5.0,java 5. 
> Is there any property/setting that i need to do ? 
> Below is the stack trace. 
> <ERROR><2008/01/10 10:43:23.127><21621663><> ExceptionListener receives a
> JMS exception <TRACE><2008/01/10 10:43:23.127><21621663><> Unregistering
> Message Listener... <TRACE><2008/01/10 10:43:23.127><21621663><>
> Notifying/signaling a waiting thread... <ERROR><2008/01/10
> 10:43:23.144><21621663><> ExceptionListener receives a JMS exception
> <TRACE><2008/01/10 10:43:23.144><21621663><> Unregistering Message
> Listener... <ERROR><2008/01/10 10:43:23.153><21621663><>
> javax.jms.IllegalStateException: The Consumer is closed at
> org.apache.activemq.ActiveMQMessageConsumer.checkClosed(ActiveMQMessageConsumer.java:679)
> at
> org.apache.activemq.ActiveMQMessageConsumer.setMessageListener(ActiveMQMessageConsumer.java:352)
>  
> 
> 
> Thanks 
> 
> 

-- 
View this message in context: 
http://www.nabble.com/Consumer-is-closed-Exception-tp15022223s2354p15023988.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to