Hi Marco,

Thanks for your response.  I agree it is strange.  I now have some more
information to share.  It now looks like it has something to do with my
message producer, rather than core ActiveMQ behaviour.

If I send messages using the ActiveMQ console on Broker A, they are
transmitted and received correctly by the receiver.  If I send messages
using the attached program, it only works until the failover happens.  What
is super strange is that the producer isn't connecting to the failover pair
(B and C).  It is connected to the satellite (A).  Furthermore, that
process is ephemeral.  It is restarted each time a send is made.

My test is now a bit different than before. I first setup the brokers, and
initiate a "failover"

   1. Start Brokers B and C configured to use the same data directory so
   that they are in a failover configuration.  Note that B is acting as the
   primary, and C is in standby
   2. Start Broker A, configured to point to Brokers B and C via the
   connection string static:failover:(tcp://100.127.41.128:61616,tcp://
   100.127.41.128:61617). Note that it connects to B
   3. Start consumer to consume from Brokers B and C using the same
   failover connection string. Note that it connects to B (in this example)
   4. Start producer to send to broker A.  Note that messages are received
   by the consumer.  Repeat to confirm that this continues to operate.
   5. Send a message using the console, and note that this also works.
   6. Stop Broker B.  Wait for both Broker A and the consumer to reconnect
   to C, once it has obtained its locks and started.

Once failover has occurred, proving that sending from the console still
works:

   1. Connect all processes together, do a failover, and wait for
   reconnection
   2. Send message in Broker A's Console, on the queue page
   3. Message is received by consumer
   4. See Enqueued and Dequeued metric go up (after a browser refresh) -
   This indicates that the message has been dequeued by the network connection
   and forwarded on to the HA pair.
   5. Check the console on Broker C and note that its enqueue and dequeue
   count have incremented.

The above shows everything works as expected, now let us try my producer
program

   1. Run the main program below with argument "send" - This connects to
   the satellite broker (Broker A) and publishes a message
   2. Note that no message is received by the consumer connected to C (once
   the failover has occurred)
   3. Note that Broker A's console queue page again, and see that the
   Enqueued and Dequeued metric have incremented as before.
   4. Check the broker C queue console page, and note that this time the
   Enqueued and Dequeued metric have _not_ been incremented.  It is as if A
   thinks it has sent the message, but the pair C never receives it.

I feel like I must have misconfigured my producer code in some fashion, but
it was simply based on examples I found.  Is there something obvious I have
missed?


package au.com.mechination.integ.test;

import java.util.Date;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Hello world!
*
*/
public class Main {
    Connection conn = null;
    ActiveMQConnectionFactory connectionFactory;
    private Session session;

    static final Logger logger = LoggerFactory.getLogger(Main.class);

    public static void main(String[] args) throws JMSException {
        Main doer = new Main();

        if (args.length > 0 && args[0].equals("send")) {
            doer.send();
        } else {
            doer.receive();
        }
    }

    public void connect(String url, String username, String password) throws
JMSException {
        final ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(url);

        // Pass the sign-in credentials.
        connectionFactory.setUserName(username);
        connectionFactory.setPassword(password);

        // Establish a connection for the producer.
        this.conn = connectionFactory.createConnection();
        conn.start();

        session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    void send() throws JMSException {
        String url = "tcp://localhost:61616";
        String username = "bruce";
        String password = "sekrit";

        System.out.println("Sender connecting to " + url);
        connect(url, username, password);

        // Create a queue named "MyQueue".
        final Destination producerDestination = session.createQueue(
"TestQueue");

        // Create a producer from the session to the queue.
        final MessageProducer producer = session.createProducer
(producerDestination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        System.out.println("TTL is " + producer.getTimeToLive()); // Prints
0 - Unlimited

        for (int i = 0; i < 1; i++) {
            String text = "Hello from Amazon MQ!: " + i + " " + new Date().
toInstant().toString();
            System.out.println("Sending message " + text);
            TextMessage producerMessage = session.createTextMessage(text);
            producer.send(producerMessage);
        }

        producer.close();

        System.out.println("Send completed");
        System.exit(0);
    }

    void receive() throws JMSException {
        String url = "failover:(tcp://100.127.41.128:61616,tcp://
100.127.41.128:61617)";
        String username = "bruce";
        String password = "sekrit";

        System.out.println("Receiver connecting to " + url);
        connect(url, username, password);

        final Destination consumerQueue = session.createQueue("TestQueue");
        final MessageConsumer consumer = session.createConsumer
(consumerQueue);

        while (true) {
            final Message msg = consumer.receive(2500);
            if (msg == null) {
                System.out.print(".");
            } else if (msg instanceof TextMessage) {
                System.out.println("Received msg " + ((TextMessage) msg).
getText());
            } else {
                System.err.println("Something unexpected received");
            }
        }
    }
}

Reply via email to