One more note -- here is a kicker.
If I change the consumers to producer's IP, comment out hte producer,
and restart the program -- they get all their messages!
This is driving me nuts!
On 2013-08-14 18:59:55 +0000, Oleg Dulin said:
----------------17595334911828795876
Content-Type: text/plain; charset=iso-8859-1; format=flowed
Content-Transfer-Encoding: 8bit
Dear Distinguished Colleagues:
I have the following configuration.
I have broker1 (7.107) set up as follows:
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>
</transportConnectors>
My second broker (7.106) is setup like this:
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>
</transportConnectors>
<networkConnectors>
<networkConnector name="connector106.107"
uri="static:(tcp://192.168.7.107:3200)"
duplex="true" />
</networkConnectors>
Once in awhile, but consistently, the federation gets into a funny
state where if consumers are on 106, but producer is on 107, 106
doesn't get any messages.
Same happens if roducer is on 106, and consumers are on 107.
If they are all on the same broker, either one, all is well.
Once I restart the brokers, everything's fine for awhile, until it is
not. Usually a moderate to heavy volume of messages is enough to get
the system into this state.
I am convinced it is either a misconfiguration on my part, or a bug in
ActiveMQ. Hopefully, this is just a misconfiguration.
Here is the test code. I change the IPs for both consumers and producer
to test it out.
public class FederationTest
{
public static void main(String args[]) throws JMSException,
InterruptedException
{
ActiveMQConnectionFactory factory1 = new
ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");
factory1.setUseAsyncSend(true);
initConsumer("c1",factory1);
ActiveMQConnectionFactory factory2 = new
ActiveMQConnectionFactory("failover:(tcp://192.168.7.106:3200)");
factory2.setUseAsyncSend(true);
initConsumer("c2",factory2);
ActiveMQConnectionFactory factory3 = new
ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");
factory3.setUseAsyncSend(true);
initProducer(factory3);
while(true)
{
Thread.sleep(1000);
}
}
private static void initProducer(ActiveMQConnectionFactory factory)
throws JMSException
{
QueueConnection
queueConnection1=factory.createQueueConnection();
queueConnection1.start();
final QueueSession qs1=queueConnection1.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
Queue q1=qs1.createQueue("TEST");
final QueueSender qSender=qs1.createSender(q1);
new Thread(new Runnable() {
@Override
public void run()
{
int counter=0;
while(true)
{
try
{
TextMessage
msg=qs1.createTextMessage();
msg.setText("counter="+counter);
System.out.println("p:"+counter);
counter++;
qSender.send(msg);
Thread.sleep(1000);
}
catch(Exception exp)
{
exp.printStackTrace();
}
}
}
}).start();
}
public static void initConsumer(final String
cname,ActiveMQConnectionFactory factory) throws JMSException
{
QueueConnection
queueConnection1=factory.createQueueConnection();
queueConnection1.start();
QueueSession qs1=queueConnection1.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
Queue q1=qs1.createQueue("TEST");
MessageConsumer c1=qs1.createConsumer(q1);
c1.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg)
{
try
{
TextMessage txt=(TextMessage) msg;
System.out.println(cname+":"+txt.getText());
Thread.sleep((long)
(1000*Math.random()));
}
catch(Exception exp)
{
exp.printStackTrace();
}
}
});
}
}
--
Regards,
Oleg Dulin
http://www.olegdulin.com
----------------17595334911828795876--
--
Regards,
Oleg Dulin
http://www.olegdulin.com