The example here may be an indicator of my problem:

I started to try and create a standalone example using this:
http://activemq.apache.org/hello-world.html

I'm using 5.7.0 stock config on Windows XP64 -- JDK/JRE 1.7.0_21

Had to fix the example otherwise the message numbers don't match (code was
using the wrong hashCode)
                // Tell the producer to send the message
                System.out.println("Sent message: "+ this.hashCode() + " : "
+ Thread.currentThread().getName());

Also...is it proper during an exception to NOT close session and consumer
explicitly?

The problem....last receive gets a NULL.  Thread-0 message never gets
received (that explains the last null)
If I remove the sleep's from the code everything gets received correctly.
So that may be emulating my WAN condition.
I'm going to proceed to generate one which uses topics and emulates my WAN
latency.
But if this one doesn't work I'm concerned that I may be seeing bad
behavior.

Sent message: 138587397 : Thread-0
Sent message: 1687846520 : Thread-8
Sent message: 1524968417 : Thread-1
Sent message: 2074341221 : Thread-10
Received: Hello world! From: Thread-10 : 2074341221
Received: Hello world! From: Thread-8 : 1687846520
Received: Hello world! From: Thread-1 : 1524968417
Sent message: 1819559760 : Thread-13
Sent message: 1520240894 : Thread-16
Sent message: 482561938 : Thread-17
Received: Hello world! From: Thread-17 : 482561938
Received: Hello world! From: Thread-13 : 1819559760
Received: Hello world! From: Thread-16 : 1520240894
Sent message: 1168316156 : Thread-27
Sent message: 1446649371 : Thread-25
Sent message: 166853375 : Thread-32
Sent message: 1088540816 : Thread-29
Sent message: 692305719 : Thread-22
Received: Hello world! From: Thread-25 : 1446649371
Received: Hello world! From: Thread-32 : 166853375
Received: Hello world! From: Thread-22 : 692305719
Received: Hello world! From: Thread-27 : 1168316156
Received: Hello world! From: Thread-29 : 1088540816
Received: null

Then I started testing different sleep times.
Sleep(1) works fine. It shows all the sends followed by all the receives.
Sleep(300) works fine.
Sleep(400) produces this (this is repeatable...300 works...change to 400 get
error...back to 300 works...etc)
Sent message: 1282948747 : Thread-14
Sent message: 134376562 : Thread-11
Sent message: 913691711 : Thread-0
Sent message: 1461564537 : Thread-1
Sent message: 1033855723 : Thread-15
Sent message: 2066777693 : Thread-7
Sent message: 1783774949 : Thread-5
1 Received: Hello world! From: Thread-14 : 1282948747
2 Received: Hello world! From: Thread-7 : 2066777693
5 Received: Hello world! From: Thread-1 : 1461564537
6 Received: Hello world! From: Thread-0 : 913691711
3 Received: Hello world! From: Thread-11 : 134376562
4 Received: Hello world! From: Thread-5 : 1783774949
Caught: javax.jms.JMSException: Could not create Transport. Reason:
javax.management.InstanceAlreadyExistsException:
org.apache.activemq:BrokerName=localhost,Type=Broker
javax.jms.JMSException: Could not create Transport. Reason:
javax.management.InstanceAlreadyExistsException:
org.apache.activemq:BrokerName=localhost,Type=Broker
        at
org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java
:35)
        at
org.apache.activemq.ActiveMQConnectionFactory.createTransport(ActiveMQConnec
tionFactory.java:252)
        at
org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(Activ
eMQConnectionFactory.java:265)
        at
org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(Activ
eMQConnectionFactory.java:238)
        at
org.apache.activemq.ActiveMQConnectionFactory.createConnection(ActiveMQConne
ctionFactory.java:184)
        at App$HelloWorldProducer.run(App.java:71)
        at java.lang.Thread.run(Unknown Source)
Caused by: javax.management.InstanceAlreadyExistsException:
org.apache.activemq:BrokerName=localhost,Type=Broker
        at com.sun.jmx.mbeanserver.Repository.addMBean(Unknown Source)
        at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository
(Unknown Source)
        at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(U
nknown Source)
        at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(Unknown
Source)
        at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(Unknown
Source)
        at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(Unknown
Source)
        at
org.apache.activemq.broker.jmx.ManagementContext.registerMBean(ManagementCon
text.java:366)
        at
org.apache.activemq.broker.jmx.AnnotatedMBean.registerMBean(AnnotatedMBean.j
ava:72)
        at
org.apache.activemq.broker.BrokerService.startManagementContext(BrokerServic
e.java:2370)
        at
org.apache.activemq.broker.BrokerService.start(BrokerService.java:567)
        at
org.apache.activemq.transport.vm.VMTransportFactory.doCompositeConnect(VMTra
nsportFactory.java:124)
        at
org.apache.activemq.transport.vm.VMTransportFactory.doConnect(VMTransportFac
tory.java:54)
        at
org.apache.activemq.transport.TransportFactory.connect(TransportFactory.java
:67)
        at
org.apache.activemq.ActiveMQConnectionFactory.createTransport(ActiveMQConnec
tionFactory.java:250)
        ... 5 more

So here's my code:
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * Hello world!
 */
public class App {
        static AtomicInteger n;
    public static void main(String[] args) {
        int sleep=400;
        n = new AtomicInteger(0);
        try {
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        Thread.sleep(sleep);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        Thread.sleep(sleep);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldProducer(), false);
        Thread.sleep(sleep);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldConsumer(), false);
        thread(new HelloWorldProducer(), false);
        }
        catch (Exception e) {
                e.printStackTrace();
        }
    }

    public static void thread(Runnable runnable, boolean daemon) {
        Thread brokerThread = new Thread(runnable);
        brokerThread.setDaemon(daemon);
        brokerThread.start();
    }

    public static class HelloWorldProducer implements Runnable {
        public void run() {
            try {
                // Create a ConnectionFactory
                ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("vm://localhost");

                // Create a Connection
                Connection connection =
connectionFactory.createConnection();
                connection.start();

                // Create a Session
                Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);

                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("TEST.FOO");

                // Create a MessageProducer from the Session to the Topic or
Queue
                MessageProducer producer =
session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

                // Create a messages
                String text = "Hello world! From: " +
Thread.currentThread().getName() + " : " + this.hashCode();
                TextMessage message = session.createTextMessage(text);

                // Tell the producer to send the message
                System.out.println("Sent message: "+ this.hashCode() + " : "
+ Thread.currentThread().getName());
                producer.send(message);

                // Clean up
                session.close();
                connection.close();
            }
            catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
    }

    public static class HelloWorldConsumer implements Runnable,
ExceptionListener {
        public void run() {
            try {

                // Create a ConnectionFactory
                ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("vm://localhost");

                // Create a Connection
                Connection connection =
connectionFactory.createConnection();
                connection.start();

                connection.setExceptionListener(this);

                // Create a Session
                Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);

                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("TEST.FOO");

                // Create a MessageConsumer from the Session to the Topic or
Queue
                MessageConsumer consumer =
session.createConsumer(destination);

                // Wait for a message
                Message message = consumer.receive(1000);

                if (message instanceof TextMessage) {
                        int count = n.incrementAndGet();
                    TextMessage textMessage = (TextMessage) message;
                    String text = textMessage.getText();
                    System.out.println(count+" Received: " + text);
                } else {
                    System.out.println("Received: " + message);
                }

                consumer.close();
                session.close();
                connection.close();
            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }

        public synchronized void onException(JMSException ex) {
            System.out.println("JMS Exception occured.  Shutting down
client.");
        }
    }
}

-----Original Message-----
From: Christian Posta [mailto:christian.po...@gmail.com] 
Sent: Friday, April 26, 2013 5:54 PM
To: users@activemq.apache.org
Subject: Re: Dispatched Queue Size

Let us know what happens if you run on a LAN. If you can post your
code/configs, or better yet, create a unit test that shows this, it'll be
much easier to tell you what's going on.


Reply via email to