On Tue, Sep 16, 2008 at 10:33 AM, Badri <[EMAIL PROTECTED]> wrote: > > Hi > > I am trying to simulate slow consumer using the classes > (TopicPublisher/TopicListener) provided in examples. > > I wanted to see if Web Console prints the Advisory topic for Slow Consumer. > But I am facing Out Of Memory Error & the consumer stops abruptly. > > Following are changes I did: > > activemq.xml: > > <transportConnectors> > <transportConnector name="openwire" > uri="tcp://localhost:61616?maxInactivityDuration=-1"/> > </transportConnectors> > > > TopicListener: I added a sleep of 1000 to induce some slowness. > if (++count % 1000 == 0) { > System.out.println("Received " + count + " messages."); > try > { > Thread.sleep(1000); > } > catch (Exception e) > { > > } > } > > I give java -Xmx256m -Xms128m when using TopicListener. > After maybe 5 minutes it crashes. After receiving anywhere between 15-20 > batches of 1000 messages, it crashes. > > Exception: > Received 9000 messages. > Received 10000 messages. > javax.jms.JMSException: Unexpected error occured > at > org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSuppo > rt.java:62) > at > org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnect > ion.java:1255) > at > org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1659) > at > org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProdu > cer.java:227) > at > org.apache.activemq.ActiveMQMessageProducerSupport.send(ActiveMQMessa > geProducerSupport.java:241) > at TopicListener.onMessage(TopicListener.java:96) > at > org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageC > onsumer.java:983) > at > org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionE > xecutor.java:122) > at > org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionEx > ecutor.java:192) > at > org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner. > java:122) > at > org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.ja > va:43) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec > utor.java:885) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor > .java:907) > at java.lang.Thread.run(Thread.java:619) > Caused by: java.io.IOException: Unexpected error occured > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java: > 191) > ... 1 more > Caused by: java.lang.OutOfMemoryError: Java heap space > at > org.apache.activemq.openwire.v3.BaseDataStreamMarshaller.tightUnmarsh > alByteSequence(BaseDataStreamMarshaller.java:440) > at > org.apache.activemq.openwire.v3.MessageMarshaller.tightUnmarshal(Mess > ageMarshaller.java:68) > at > org.apache.activemq.openwire.v3.ActiveMQMessageMarshaller.tightUnmars > hal(ActiveMQMessageMarshaller.java:67) > at > org.apache.activemq.openwire.v3.ActiveMQBytesMessageMarshaller.tightU > nmarshal(ActiveMQBytesMessageMarshaller.java:67) > at > org.apache.activemq.openwire.OpenWireFormat.tightUnmarshalNestedObjec > t(OpenWireFormat.java:453) > at > org.apache.activemq.openwire.v3.BaseDataStreamMarshaller.tightUnmarsa > lNestedObject(BaseDataStreamMarshaller.java:126) > at > org.apache.activemq.openwire.v3.MessageDispatchMarshaller.tightUnmars > hal(MessageDispatchMarshaller.java:72) > at > org.apache.activemq.openwire.OpenWireFormat.doUnmarshal(OpenWireForma > t.java:362) > at > org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat. > java:276) > at > org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTranspo > rt.java:209) > at > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.jav > a:201) > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java: > 184) > ... 1 more > Received 1000 messages. > Received 2000 messages. > > TopicPublisher: > private long batch(int msgCount) throws Exception { > // Commented out call to waitForCompletion > // waitForCompletion(); > } > > QN 1) I just want to know if I am configuring something wrong. Or should I > increase -Xmx512m when running TopicListener to get rid of OOME?
You shouldn't need to raise the memory at all as there should be nothing causing memory to grow. I just added the sleep above to the TopicListener using ActiveMQ 5.2.0 and I've produced and consumed over 300,000 messages in a matter of moments without any issue whatsoever. I'm on MacOS X using Java 1.5.0_13. What OS and Java version are you using? > QN 2) Will the advisory topic: > ActiveMQ.Advisory.SlowConsumer.Topic.topictest.messages appear only if the > consumer starts discarding messages? What is the basis for this topic to > appear in Web console? I'm not able to see the *SlowConsumer* topic appear at all. I don't know if that's because I'm using a dual core Intel processor or what. What type of processor does your machine have? Bruce -- perl -e 'print unpack("u30","D0G)[EMAIL PROTECTED]&5R\"F)R=6-E+G-N>61E<D\!G;6%I;\"YC;VT*" );' Apache ActiveMQ - http://activemq.org/ Apache Camel - http://activemq.org/camel/ Apache ServiceMix - http://servicemix.org/ Blog: http://bruceblog.org/