Figured out the problem. I have to do
            connector.setNetworkTTL(2);

On Fri, Jun 19, 2015 at 1:52 PM, pubudu gunawardena <pubud...@gmail.com> wrote:
> Hi All,
>
> I have the following setup. There is a producer which sends messages
> to an embedded broker. There is a consumer that consumes messages from
> another embedded broker. I have created a network of brokers with the
> two embedded brokers connected to the standalone broker. But the
> messages don't get passed from the producer to the consumer. I have
> created an example which shows this behavior. Can someone point out to
> me what I am doing wrong or if this is not possible? Following is the
> code to reproduce what I have mentioned.
>
>
> import java.net.URI;
>
> import javax.jms.Connection;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.Session;
> import javax.jms.TextMessage;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.broker.BrokerService;
> import org.apache.activemq.network.DiscoveryNetworkConnector;
> import org.apache.activemq.network.NetworkConnector;
>
> public class Test {
>
>     public static void main(String[] args) {
>         try {
>             startBroker1();
>             startBroker2();
>             runProducer();
>             runConsumer();
>
>         } catch (Exception e) {
>             // TODO Auto-generated catch block
>             e.printStackTrace();
>         }
>     }
>
>     private static void runConsumer() {
>         new Thread(new Consumer()).start();
>
>     }
>
>     private static final class Consumer implements Runnable, MessageListener {
>         @Override
>         public void run() {
>             try {
>                 ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory("tcp://localhost:61615");
>                 Connection connection = factory.createConnection();
>                 connection.start();
>                 Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>                 Queue destination = session.createQueue("topic");
>                 MessageConsumer consumer = 
> session.createConsumer(destination);
>                 consumer.setMessageListener(this);
>             } catch (JMSException e) {
>                 e.printStackTrace();
>             }
>         }
>
>         @Override
>         public void onMessage(Message message) {
>             try {
>                 TextMessage text = (TextMessage) message;
>                 System.out.println("Message is : " + text.getText());
>             } catch (JMSException e) {
>                 e.printStackTrace();
>             }
>         }
>     }
>
>     private static void runProducer() {
>         new Thread(new Runnable() {
>             @Override
>             public void run() {
>                 try {
>                     ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory("tcp://localhost:61617");
>                     // ActiveMQConnection.DEFAULT_BROKER_URL =
>                     // failover://tcp://localhost:61616
>                     Connection connection;
>                     connection = factory.createConnection();
>                     connection.start();
>                     Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>                     Queue destination = session.createQueue("topic");
>                     MessageProducer producer =
> session.createProducer(destination);
>                     TextMessage message = session.createTextMessage();
>                     message.setText("This is the message");
>                     producer.send(message);
>                     System.out.println("Sent: " + message.getText());
>                 } catch (JMSException e) {
>                     // TODO Auto-generated catch block
>                     e.printStackTrace();
>                 }
>             }
>         }).start();
>
>     }
>
>     private static void startBroker1() throws Exception {
>         new Thread(new Runnable() {
>             @Override
>             public void run() {
>                 startBroker("broker1", "tcp://localhost:61617");
>             }
>         }).start();
>
>     }
>
>     private static void startBroker2() throws Exception {
>         new Thread(new Runnable() {
>             @Override
>             public void run() {
>                 startBroker("broker2", "tcp://localhost:61615");
>             }
>         }).start();
>     }
>
>     private static void startBroker(String name, String uri) {
>         try {
>             BrokerService broker = new BrokerService();
>             broker.setBrokerName(name);
>             broker.addConnector(uri);
>             NetworkConnector connector = new
> DiscoveryNetworkConnector(new URI("static://" +
> "tcp://localhost:61616"));
>             connector.setDuplex(true);
>             broker.addNetworkConnector(connector);
>             broker.start();
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>
>     }
>
> }
>
>
> --
> Thanks,
> Pubudu



-- 
Thanks,
Pubudu

Reply via email to