On Tue, Sep 16, 2008 at 10:33 AM, Badri
<[EMAIL PROTECTED]> wrote:
>
> Hi
>
> I am trying to simulate slow consumer using the classes
> (TopicPublisher/TopicListener) provided in examples.
>
> I wanted to see if Web Console prints the Advisory topic for Slow Consumer.
> But I am facing Out Of Memory Error & the consumer stops abruptly.
>
> Following are changes I did:
>
> activemq.xml:
>
>        <transportConnectors>
>            <transportConnector name="openwire"
> uri="tcp://localhost:61616?maxInactivityDuration=-1"/>
>        </transportConnectors>
>
>
> TopicListener: I added a sleep of 1000 to induce some slowness.
>            if (++count % 1000 == 0) {
>                System.out.println("Received " + count + " messages.");
>                                try
>                                {
>                                        Thread.sleep(1000);
>                                }
>                                catch (Exception e)
>                                {
>
>                                }
>            }
>
> I give java -Xmx256m -Xms128m when using TopicListener.
> After maybe 5 minutes it crashes. After receiving anywhere between 15-20
> batches of 1000 messages, it crashes.
>
> Exception:
> Received 9000 messages.
> Received 10000 messages.
> javax.jms.JMSException: Unexpected error occured
>        at
> org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSuppo
> rt.java:62)
>        at
> org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnect
> ion.java:1255)
>        at
> org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1659)
>        at
> org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProdu
> cer.java:227)
>        at
> org.apache.activemq.ActiveMQMessageProducerSupport.send(ActiveMQMessa
> geProducerSupport.java:241)
>        at TopicListener.onMessage(TopicListener.java:96)
>        at
> org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageC
> onsumer.java:983)
>        at
> org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionE
> xecutor.java:122)
>        at
> org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionEx
> ecutor.java:192)
>        at
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.
> java:122)
>        at
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.ja
> va:43)
>        at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
> utor.java:885)
>        at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
> .java:907)
>        at java.lang.Thread.run(Thread.java:619)
> Caused by: java.io.IOException: Unexpected error occured
>        at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:
> 191)
>        ... 1 more
> Caused by: java.lang.OutOfMemoryError: Java heap space
>        at
> org.apache.activemq.openwire.v3.BaseDataStreamMarshaller.tightUnmarsh
> alByteSequence(BaseDataStreamMarshaller.java:440)
>        at
> org.apache.activemq.openwire.v3.MessageMarshaller.tightUnmarshal(Mess
> ageMarshaller.java:68)
>        at
> org.apache.activemq.openwire.v3.ActiveMQMessageMarshaller.tightUnmars
> hal(ActiveMQMessageMarshaller.java:67)
>        at
> org.apache.activemq.openwire.v3.ActiveMQBytesMessageMarshaller.tightU
> nmarshal(ActiveMQBytesMessageMarshaller.java:67)
>        at
> org.apache.activemq.openwire.OpenWireFormat.tightUnmarshalNestedObjec
> t(OpenWireFormat.java:453)
>        at
> org.apache.activemq.openwire.v3.BaseDataStreamMarshaller.tightUnmarsa
> lNestedObject(BaseDataStreamMarshaller.java:126)
>        at
> org.apache.activemq.openwire.v3.MessageDispatchMarshaller.tightUnmars
> hal(MessageDispatchMarshaller.java:72)
>        at
> org.apache.activemq.openwire.OpenWireFormat.doUnmarshal(OpenWireForma
> t.java:362)
>        at
> org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.
> java:276)
>        at
> org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTranspo
> rt.java:209)
>        at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.jav
> a:201)
>        at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:
> 184)
>        ... 1 more
> Received 1000 messages.
> Received 2000 messages.
>
> TopicPublisher:
>  private long batch(int msgCount) throws Exception {
>  // Commented out call to waitForCompletion
>  // waitForCompletion();
> }
>
> QN 1) I just want to know if I am configuring something wrong. Or should I
> increase -Xmx512m when running TopicListener to get rid of OOME?

You shouldn't need to raise the memory at all as there should be
nothing causing memory to grow. I just added the sleep above to the
TopicListener using ActiveMQ 5.2.0 and I've produced and consumed over
300,000 messages in a matter of moments without any issue whatsoever.
I'm on MacOS X using Java 1.5.0_13. What OS and Java version are you
using?

> QN 2) Will the advisory topic:
> ActiveMQ.Advisory.SlowConsumer.Topic.topictest.messages appear only if the
> consumer starts discarding messages? What is the basis for this topic to
> appear in Web console?

I'm not able to see the *SlowConsumer* topic appear at all. I don't
know if that's because I'm using a dual core Intel processor or what.
What type of processor does your machine have?

Bruce
-- 
perl -e 'print unpack("u30","D0G)[EMAIL 
PROTECTED]&5R\"F)R=6-E+G-N>61E<D\!G;6%I;\"YC;VT*"
);'

Apache ActiveMQ - http://activemq.org/
Apache Camel - http://activemq.org/camel/
Apache ServiceMix - http://servicemix.org/

Blog: http://bruceblog.org/

Reply via email to