Dear Distinguished Colleagues:

I have the following configuration.

I have broker1 (7.107) set up as follows:

<transportConnectors>
          <transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>
</transportConnectors>

My second broker (7.106) is setup like this:

<transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>
</transportConnectors>
<networkConnectors>
           <networkConnector name="connector106.107"
uri="static:(tcp://192.168.7.107:3200)" duplex="true" />
</networkConnectors>

Once in awhile, but consistently, the federation gets into a funny state where if consumers are on 106, but producer is on 107, 106 doesn't get any messages.

Same happens if roducer is on 106, and consumers are on 107.

If they are all on the same broker, either one, all is well.

Once I restart the brokers, everything's fine for awhile, until it is not. Usually a moderate to heavy volume of messages is enough to get the system into this state.

I am convinced it is either a misconfiguration on my part, or a bug in ActiveMQ. Hopefully, this is just a misconfiguration.

Here is the test code. I change the IPs for both consumers and producer to test it out.

public class FederationTest
{
public static void main(String args[]) throws JMSException, InterruptedException
        {
ActiveMQConnectionFactory factory1 = new ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");
                factory1.setUseAsyncSend(true);
                initConsumer("c1",factory1);
                
                
ActiveMQConnectionFactory factory2 = new ActiveMQConnectionFactory("failover:(tcp://192.168.7.106:3200)");
                factory2.setUseAsyncSend(true);
                initConsumer("c2",factory2);
                
                
ActiveMQConnectionFactory factory3 = new ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");
                factory3.setUseAsyncSend(true);
                initProducer(factory3);
                while(true)
                {
                        Thread.sleep(1000);
                }
                
        }

private static void initProducer(ActiveMQConnectionFactory factory) throws JMSException
        {
                QueueConnection 
queueConnection1=factory.createQueueConnection();
                queueConnection1.start();
final QueueSession qs1=queueConnection1.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                Queue q1=qs1.createQueue("TEST");
                final QueueSender qSender=qs1.createSender(q1);
                new Thread(new Runnable() {

                        @Override
                        public void run()
                        {
                                int counter=0;
                                while(true)
                                {
                                        try
                                        {
                                                TextMessage 
msg=qs1.createTextMessage();
                                                msg.setText("counter="+counter);
                                                
System.out.println("p:"+counter);
                                                counter++;
                                                qSender.send(msg);
                                                
                                                Thread.sleep(1000);
                                        }
                                        catch(Exception exp)
                                        {
                                                exp.printStackTrace();
                                        }
                                        
                                }
                        }
                        
                }).start();
                
                
        }

public static void initConsumer(final String cname,ActiveMQConnectionFactory factory) throws JMSException
        {
                QueueConnection 
queueConnection1=factory.createQueueConnection();
                queueConnection1.start();
QueueSession qs1=queueConnection1.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                Queue q1=qs1.createQueue("TEST");
                MessageConsumer c1=qs1.createConsumer(q1);
                c1.setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message msg)
                        {
                                try
                                {
                                        TextMessage txt=(TextMessage) msg;
                                        
System.out.println(cname+":"+txt.getText());
                                        Thread.sleep((long) 
(1000*Math.random()));
                                }
                                catch(Exception exp)
                                {
                                        exp.printStackTrace();
                                }
                        }
                        
                });
        }
}




--
Regards,
Oleg Dulin
http://www.olegdulin.com

Reply via email to