I was using ActiveMQ 5.3.0 . I faced a memory leak issue which is very
similar to https://issues.apache.org/jira/browse/AMQ-4222. As per the JIRA,
I updated ActiveMQ to 5.8.0 and the memory leak didn't occur. However
upgrading to 5.8.0 broke some of my existing code.

I would like to stick to 5.3.0. Is there any workaround I can do to prevent
the memory leak , so that I can avoid the upgrade?


This is my code, that's sending the message with 'timeoutInMillis' as
timeout.

public Document sendMessage(final String message, final int timeoutInMillis)
    {
        if (!isConnected)
        {
            logger.error("can't sent message without connection...");
            throw new ConnectionUnavailableException();
        }

        try
        {
            final Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
            final MessageProducer producer =
session.createProducer(prodQueue);

            final TextMessage textMessage = session.createTextMessage();
            final TemporaryQueue replyToQueue =
session.createTemporaryQueue();

            textMessage.setText(message);
            textMessage.setJMSReplyTo(replyToQueue);

            producer.send(textMessage);

            final MessageConsumer consumer =
session.createConsumer(replyToQueue);

            // Wait the specified time for receipt.
            final ObjectMessage receivedMessage =
(ObjectMessage)consumer.receive(timeoutInMillis);

            try
            {
                session.close();
            }
            catch (final JMSException jmse)
            {
                logger.warn("Closing JMS session failed!", jmse);
            }

            // If the message is null, we timed out.
            MessageAcknowledgement ack = null;
            if (receivedMessage == null)
            {
                logger.warn("Timed out waiting for response.");
                replyToQueue.delete();
                throw new MessagingTimeoutException("Timed out waiting for
response to message [message=" + message + "]");
            }
            else
            {
                final String response = new
String(((MyMessage)receivedMessage.getObject()).getBody());
                ack = MessageAcknowledgementImpl.createInstance(response);

                if (ack.getStatusCode() != 0)
                {
                    replyToQueue.delete();
                    throw new InvalidStatusException(ack);
                }
            }

            replyToQueue.delete();

            return ack.getXmlMessage();
        }
        catch (final JMSException jmse)
        {
            logger.error("JMSException: " + jmse.getMessage());

            if (jmse.getMessage().indexOf(NO_CONNECTION) != -1)
            {
                throw new ConnectionUnavailableException();
            }

            throw new MessagingException("Error sending message to My
queue.", jmse);
        }
}


This is the JMS handler code:
    
public void handle(final MyMessage myMessage)
    {
        if (System.currentTimeMillis() - requestTime >
DEFAULT_TIME_ELAPSE_UNTIL_HANDLER_TIMEOUT)
        {
            final Long elapseTimeInMsec = System.currentTimeMillis() -
requestTime;
            logger.debug("Message Handler should have been removed due to
timeout. Elapse time in msec:" + elapseTimeInMsec);
        }

        try
        {
            final Message message = createJmsMessage(myMessage);

            Queue forwardQueue;

            if (originatingMessage != null)
            {
                forwardQueue = (Queue) originatingMessage.getJMSReplyTo();
            }
            else
            {
                forwardQueue = getQueueByName(myMessage.getDestination());
            }

            final MessageProducer producer =
jmsSession.createProducer(forwardQueue);
            try
            {
                logger.debug("Forwarding message to JMS queue: " +
forwardQueue.getQueueName());
                producer.send(message);
            }
            finally
            {
                producer.close();
            }
        }
        catch (final NamingException ne)
        {
            logger.error("Queue destination specified by MY message not
found.", ne);
        }
        catch (final InvalidDestinationException ide)
        {
            logger.warn("Could not forward MY message to queue destination:
" + ide.getMessage());
        }
        catch (final JMSException jmse)
        {
            throw new RuntimeException("Error forwarding MY message to
queue.", jmse);
        }
    }



--
View this message in context: 
http://activemq.2283324.n4.nabble.com/Looking-for-an-alternative-of-AMQ-4222-tp4714918.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to