As a follow up, I can now confirm that using a different client (an AMQP client written in javascript using the rhea library) works as expected. It is just this Java process that is broken. Any assistance you can provide would be greatly appreciated.
On Mon, 26 Feb 2024 at 09:21, Bruce Cooper <[email protected]> wrote: > Hi Marco, > > Thanks for your response. I agree it is strange. I now have some more > information to share. It now looks like it has something to do with my > message producer, rather than core ActiveMQ behaviour. > > If I send messages using the ActiveMQ console on Broker A, they are > transmitted and received correctly by the receiver. If I send messages > using the attached program, it only works until the failover happens. What > is super strange is that the producer isn't connecting to the failover pair > (B and C). It is connected to the satellite (A). Furthermore, that > process is ephemeral. It is restarted each time a send is made. > > My test is now a bit different than before. I first setup the brokers, and > initiate a "failover" > > 1. Start Brokers B and C configured to use the same data directory so > that they are in a failover configuration. Note that B is acting as the > primary, and C is in standby > 2. Start Broker A, configured to point to Brokers B and C via the > connection string static:failover:(tcp://100.127.41.128:61616,tcp:// > 100.127.41.128:61617). Note that it connects to B > 3. Start consumer to consume from Brokers B and C using the same > failover connection string. Note that it connects to B (in this example) > 4. Start producer to send to broker A. Note that messages are > received by the consumer. Repeat to confirm that this continues to > operate. > 5. Send a message using the console, and note that this also works. > 6. Stop Broker B. Wait for both Broker A and the consumer to > reconnect to C, once it has obtained its locks and started. > > Once failover has occurred, proving that sending from the console still > works: > > 1. Connect all processes together, do a failover, and wait for > reconnection > 2. Send message in Broker A's Console, on the queue page > 3. Message is received by consumer > 4. See Enqueued and Dequeued metric go up (after a browser refresh) - > This indicates that the message has been dequeued by the network connection > and forwarded on to the HA pair. > 5. Check the console on Broker C and note that its enqueue and dequeue > count have incremented. > > The above shows everything works as expected, now let us try my producer > program > > 1. Run the main program below with argument "send" - This connects to > the satellite broker (Broker A) and publishes a message > 2. Note that no message is received by the consumer connected to C > (once the failover has occurred) > 3. Note that Broker A's console queue page again, and see that the > Enqueued and Dequeued metric have incremented as before. > 4. Check the broker C queue console page, and note that this time the > Enqueued and Dequeued metric have _not_ been incremented. It is as if A > thinks it has sent the message, but the pair C never receives it. > > I feel like I must have misconfigured my producer code in some fashion, > but it was simply based on examples I found. Is there something obvious I > have missed? > > > package au.com.mechination.integ.test; > > import java.util.Date; > > import javax.jms.Connection; > import javax.jms.DeliveryMode; > import javax.jms.Destination; > import javax.jms.JMSException; > import javax.jms.Message; > import javax.jms.MessageConsumer; > import javax.jms.MessageProducer; > import javax.jms.Session; > import javax.jms.TextMessage; > > import org.apache.activemq.ActiveMQConnectionFactory; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > /** > * Hello world! > * > */ > public class Main { > Connection conn = null; > ActiveMQConnectionFactory connectionFactory; > private Session session; > > static final Logger logger = LoggerFactory.getLogger(Main.class); > > public static void main(String[] args) throws JMSException { > Main doer = new Main(); > > if (args.length > 0 && args[0].equals("send")) { > doer.send(); > } else { > doer.receive(); > } > } > > public void connect(String url, String username, String password) > throws JMSException { > final ActiveMQConnectionFactory connectionFactory = new > ActiveMQConnectionFactory(url); > > // Pass the sign-in credentials. > connectionFactory.setUserName(username); > connectionFactory.setPassword(password); > > // Establish a connection for the producer. > this.conn = connectionFactory.createConnection(); > conn.start(); > > session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); > } > > void send() throws JMSException { > String url = "tcp://localhost:61616"; > String username = "bruce"; > String password = "sekrit"; > > System.out.println("Sender connecting to " + url); > connect(url, username, password); > > // Create a queue named "MyQueue". > final Destination producerDestination = session.createQueue( > "TestQueue"); > > // Create a producer from the session to the queue. > final MessageProducer producer = session.createProducer > (producerDestination); > producer.setDeliveryMode(DeliveryMode.PERSISTENT); > System.out.println("TTL is " + producer.getTimeToLive()); // > Prints 0 - Unlimited > > for (int i = 0; i < 1; i++) { > String text = "Hello from Amazon MQ!: " + i + " " + new Date > ().toInstant().toString(); > System.out.println("Sending message " + text); > TextMessage producerMessage = session.createTextMessage(text); > producer.send(producerMessage); > } > > producer.close(); > > System.out.println("Send completed"); > System.exit(0); > } > > void receive() throws JMSException { > String url = "failover:(tcp://100.127.41.128:61616,tcp:// > 100.127.41.128:61617)"; > String username = "bruce"; > String password = "sekrit"; > > System.out.println("Receiver connecting to " + url); > connect(url, username, password); > > final Destination consumerQueue = session.createQueue("TestQueue" > ); > final MessageConsumer consumer = session.createConsumer > (consumerQueue); > > while (true) { > final Message msg = consumer.receive(2500); > if (msg == null) { > System.out.print("."); > } else if (msg instanceof TextMessage) { > System.out.println("Received msg " + ((TextMessage) msg). > getText()); > } else { > System.err.println("Something unexpected received"); > } > } > } > } > > > > > -- https://mechination.com.au/ Ph: 0448 341 729
