Figured out the problem. I have to do connector.setNetworkTTL(2);
On Fri, Jun 19, 2015 at 1:52 PM, pubudu gunawardena <pubud...@gmail.com> wrote: > Hi All, > > I have the following setup. There is a producer which sends messages > to an embedded broker. There is a consumer that consumes messages from > another embedded broker. I have created a network of brokers with the > two embedded brokers connected to the standalone broker. But the > messages don't get passed from the producer to the consumer. I have > created an example which shows this behavior. Can someone point out to > me what I am doing wrong or if this is not possible? Following is the > code to reproduce what I have mentioned. > > > import java.net.URI; > > import javax.jms.Connection; > import javax.jms.JMSException; > import javax.jms.Message; > import javax.jms.MessageConsumer; > import javax.jms.MessageListener; > import javax.jms.MessageProducer; > import javax.jms.Queue; > import javax.jms.Session; > import javax.jms.TextMessage; > > import org.apache.activemq.ActiveMQConnectionFactory; > import org.apache.activemq.broker.BrokerService; > import org.apache.activemq.network.DiscoveryNetworkConnector; > import org.apache.activemq.network.NetworkConnector; > > public class Test { > > public static void main(String[] args) { > try { > startBroker1(); > startBroker2(); > runProducer(); > runConsumer(); > > } catch (Exception e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > } > > private static void runConsumer() { > new Thread(new Consumer()).start(); > > } > > private static final class Consumer implements Runnable, MessageListener { > @Override > public void run() { > try { > ActiveMQConnectionFactory factory = new > ActiveMQConnectionFactory("tcp://localhost:61615"); > Connection connection = factory.createConnection(); > connection.start(); > Session session = connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); > Queue destination = session.createQueue("topic"); > MessageConsumer consumer = > session.createConsumer(destination); > consumer.setMessageListener(this); > } catch (JMSException e) { > e.printStackTrace(); > } > } > > @Override > public void onMessage(Message message) { > try { > TextMessage text = (TextMessage) message; > System.out.println("Message is : " + text.getText()); > } catch (JMSException e) { > e.printStackTrace(); > } > } > } > > private static void runProducer() { > new Thread(new Runnable() { > @Override > public void run() { > try { > ActiveMQConnectionFactory factory = new > ActiveMQConnectionFactory("tcp://localhost:61617"); > // ActiveMQConnection.DEFAULT_BROKER_URL = > // failover://tcp://localhost:61616 > Connection connection; > connection = factory.createConnection(); > connection.start(); > Session session = connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); > Queue destination = session.createQueue("topic"); > MessageProducer producer = > session.createProducer(destination); > TextMessage message = session.createTextMessage(); > message.setText("This is the message"); > producer.send(message); > System.out.println("Sent: " + message.getText()); > } catch (JMSException e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > } > }).start(); > > } > > private static void startBroker1() throws Exception { > new Thread(new Runnable() { > @Override > public void run() { > startBroker("broker1", "tcp://localhost:61617"); > } > }).start(); > > } > > private static void startBroker2() throws Exception { > new Thread(new Runnable() { > @Override > public void run() { > startBroker("broker2", "tcp://localhost:61615"); > } > }).start(); > } > > private static void startBroker(String name, String uri) { > try { > BrokerService broker = new BrokerService(); > broker.setBrokerName(name); > broker.addConnector(uri); > NetworkConnector connector = new > DiscoveryNetworkConnector(new URI("static://" + > "tcp://localhost:61616")); > connector.setDuplex(true); > broker.addNetworkConnector(connector); > broker.start(); > } catch (Exception e) { > e.printStackTrace(); > } > > } > > } > > > -- > Thanks, > Pubudu -- Thanks, Pubudu