Please see the FAQ entry... http://activemq.apache.org/how-do-i-use-jms-efficiently.html
you are not meant to create a consumer or producer for each message; but reuse the same consumer If you are new to JMS and have not yet grokked how to use it efficiently, try using the JmsTemplate / MessageListenerContainer abstractions in Spring along with the JMS pool in ActiveMQ http://activemq.apache.org/jmstemplate-gotchas.html Your problem could well be producer flow control kicking in BTW - but see how things behave when you use JMS correctly 2008/10/14 Steven Van Loon <[EMAIL PROTECTED]>: > Hi, > > I managed to reproduce the problem in the simple sample below. What I do is > to create a connection that is shared between a consuming thread and a > producing thread. The consumer is slower than the producer. When I run this > sample, either the consumer stops consuming although lots of messages are > available, or the producer stops producing. > > Anybody any ideas what's wrong or what I am doing wrong? > > > > package test.jms; > > import java.util.Hashtable; > > import javax.jms.JMSException; > import javax.jms.Message; > import javax.jms.MessageConsumer; > import javax.jms.MessageProducer; > import javax.jms.Queue; > import javax.jms.QueueConnection; > import javax.jms.QueueConnectionFactory; > import javax.jms.QueueSession; > import javax.jms.Session; > import javax.jms.TextMessage; > import javax.naming.InitialContext; > import javax.naming.NamingException; > > public class ConsumingProducer { > > public final static String INITIAL_CONTEXT_FACTORY = > "org.apache.activemq.jndi.ActiveMQInitialContextFactory"; > public final static String PROVIDER_URL = "tcp://localhost:61616"; > > public final static String CONNECTION_FACTORY_NAME = > "ConnectionFactory"; > public final static String DESTINATION_NAME = "TEST.QUEUE"; > > public final static long WRITE_DELAY = 10; > public final static long READ_DELAY = 1000; > > public static QueueConnectionFactory connectionFactory; > public static QueueConnection connection; > > public static void initialize() throws NamingException, JMSException { > Hashtable<String, String> env = new Hashtable<String, > String>(); > env > .put(InitialContext.INITIAL_CONTEXT_FACTORY, > INITIAL_CONTEXT_FACTORY); > env.put(InitialContext.PROVIDER_URL, PROVIDER_URL); > InitialContext initialContext = new InitialContext(env); > > connectionFactory = (QueueConnectionFactory) initialContext > .lookup(CONNECTION_FACTORY_NAME); > > connection = connectionFactory.createQueueConnection(); > connection.start(); > } > > public static void startConsumer() { > > Thread t = new Thread() { > public void run() { > try { > QueueSession session = > connection.createQueueSession(false, > > Session.AUTO_ACKNOWLEDGE); > Queue queue = > session.createQueue(DESTINATION_NAME); > while (true) { > MessageConsumer consumer = > session > > .createConsumer(queue); > Message msg = > consumer.receive(); > if (msg instanceof > TextMessage) { > > System.out.println(((TextMessage) msg).getText()); > } else if (msg != null) { > > System.out.println("Message received"); > } > consumer.close(); > sleep(READ_DELAY); > } > } catch (Exception ex) { > ex.printStackTrace(); > } > } > }; > t.setName("Consumer"); > t.start(); > } > > public static void startProducer() { > > Thread t = new Thread() { > public void run() { > try { > QueueSession session = > connection.createQueueSession(false, > > Session.AUTO_ACKNOWLEDGE); > Queue queue = > session.createQueue(DESTINATION_NAME); > int msgctr = 1; > while (true) { > MessageProducer producer = > session > > .createProducer(queue); > Message msg = > session.createTextMessage("Message " > + msgctr++); > producer.send(msg); > producer.close(); > > sleep(WRITE_DELAY); > } > } catch (Exception ex) { > ex.printStackTrace(); > } > } > }; > t.setName("Producer"); > t.start(); > } > > public static void main(String[] args) { > try { > initialize(); > startConsumer(); > startProducer(); > } catch (Exception ex) { > ex.printStackTrace(); > } > } > } > > > > > -----Original Message----- > From: Joe Fernandez [mailto:[EMAIL PROTECTED] > Sent: maandag 13 oktober 2008 13:06 > To: users@activemq.apache.org > Subject: Re: Concurrent use of connections > > > Sessions are single-threaded. So could the issue be related more to your use > of sessions? > > Joe > Get a free ActiveMQ user guide @ http://www.ttmsolutions.com > > > Steven Van Loon-2 wrote: >> >> Hi all, >> >> Is anybody aware of possible problems when (re)using a same Connection to >> activeMQ by different threads? According to the JMS specification, >> implementations should support concurrent use of Connections, so I created >> one connection to be used by all consumer / producers. >> >> However, I experienced the problem that from a certain moment, no more >> messages were read from the queue although a lot of messages were still >> waiting. When another programs connects to the queue and reads a message, >> it can still fetch messages. We were able to track down the problem to >> activeMQ since using another queuing system solved the problem, it kept >> reading all the messages, no matter how many were waiting on the queue. >> Eventually, making a connection for each thread solved the problem also >> for activeMQ. Hence my question. >> >> My apologies I can provide you only with a vague description of the >> problem and no test case to reproduce the problem but I'm wondering >> whether anybody else experienced similar problems and can provide me with >> more insight in the problem. >> >> Thanks! >> Steven. >> >> >> > > -- > View this message in context: > http://www.nabble.com/Concurrent-use-of-connections-tp19952062p19952819.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > > -- James ------- http://macstrac.blogspot.com/ Open Source Integration http://open.iona.com