Another factoid -- if I don't use "duplex" then all seems to be well.

On 2013-08-14 19:03:36 +0000, Oleg Dulin said:

One more note -- here is a kicker.

If I change the consumers to producer's IP, comment out hte producer, and restart the program -- they get all their messages!

This is driving me nuts!

On 2013-08-14 18:59:55 +0000, Oleg Dulin said:

----------------17595334911828795876

Content-Type: text/plain; charset=iso-8859-1; format=flowed

Content-Transfer-Encoding: 8bit



Dear Distinguished Colleagues:



I have the following configuration.



I have broker1 (7.107) set up as follows:



<transportConnectors>

<transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>

</transportConnectors>



My second broker (7.106) is setup like this:



<transportConnectors>

<transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>

</transportConnectors>

<networkConnectors>

<networkConnector name="connector106.107"

uri="static:(tcp://192.168.7.107:3200)"
duplex="true" />

</networkConnectors>



Once in awhile, but consistently, the federation gets into a funny
state where if consumers are on 106, but producer is on 107, 106
doesn't get any messages.



Same happens if roducer is on 106, and consumers are on 107.



If they are all on the same broker, either one, all is well.



Once I restart the brokers, everything's fine for awhile, until it is
not. Usually a moderate to heavy volume of messages is enough to get
the system into this state.



I am convinced it is either a misconfiguration on my part, or a bug in
ActiveMQ. Hopefully, this is just a misconfiguration.



Here is the test code. I change the IPs for both consumers and producer
to test it out.



public class FederationTest

{

        public static void main(String args[]) throws JMSException,
InterruptedException

        {

                ActiveMQConnectionFactory factory1 = new
ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");

                factory1.setUseAsyncSend(true);

                initConsumer("c1",factory1);

                

                

                ActiveMQConnectionFactory factory2 = new
ActiveMQConnectionFactory("failover:(tcp://192.168.7.106:3200)");

                factory2.setUseAsyncSend(true);

                initConsumer("c2",factory2);

                

                

                ActiveMQConnectionFactory factory3 = new
ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");

                factory3.setUseAsyncSend(true);

                initProducer(factory3);

                while(true)

                {

                        Thread.sleep(1000);

                }

                

        }



        private static void initProducer(ActiveMQConnectionFactory factory)
throws JMSException

        {

                QueueConnection 
queueConnection1=factory.createQueueConnection();

                queueConnection1.start();

                final QueueSession 
qs1=queueConnection1.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);

                Queue q1=qs1.createQueue("TEST");

                final QueueSender qSender=qs1.createSender(q1);

                new Thread(new Runnable() {



                        @Override

                        public void run()

                        {

                                int counter=0;

                                while(true)

                                {

                                        try

                                        {

                                                TextMessage 
msg=qs1.createTextMessage();

                                                msg.setText("counter="+counter);

                                                
System.out.println("p:"+counter);

                                                counter++;

                                                qSender.send(msg);

                                                

                                                Thread.sleep(1000);

                                        }

                                        catch(Exception exp)

                                        {

                                                exp.printStackTrace();

                                        }

                                        

                                }

                        }

                        

                }).start();

                

                

        }



        public static void initConsumer(final String
cname,ActiveMQConnectionFactory factory) throws JMSException

        {

                QueueConnection 
queueConnection1=factory.createQueueConnection();

                queueConnection1.start();

                QueueSession qs1=queueConnection1.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);

                Queue q1=qs1.createQueue("TEST");

                MessageConsumer c1=qs1.createConsumer(q1);

                c1.setMessageListener(new MessageListener() {

                        @Override

                        public void onMessage(Message msg)

                        {

                                try

                                {

                                        TextMessage txt=(TextMessage) msg;

                                        
System.out.println(cname+":"+txt.getText());

                                        Thread.sleep((long) 
(1000*Math.random()));

                                }

                                catch(Exception exp)

                                {

                                        exp.printStackTrace();

                                }

                        }

                        

                });

        }

}









--

Regards,

Oleg Dulin

http://www.olegdulin.com

----------------17595334911828795876--


--
Regards,
Oleg Dulin
http://www.olegdulin.com


Reply via email to