Hi Marco, Thanks for checking back in.
from pom.xml <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.8</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.8</version> </dependency> The server is 5.17.6 When you asked this, I realised my dependencies were kind of old, so I updated it to <dependency> <groupId>jakarta.jms</groupId> <artifactId>jakarta.jms-api</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>6.0.1</version> </dependency> ported my imports to jakarta.jms, and re-ran. I still get the same result. It works for NON_PERSISTENT, it does not for PERSISTENT messages. On Mon, 26 Feb 2024 at 16:54, Marco Garavello <marco.garave...@carel.com.invalid> wrote: > Hello Bruce, > > thanks for sharing precise info > That sounds really very strange and sounds like a bug of AMQ (or a > configuration issue). > A question, maybe trivial: which version of AMQ jar library are you using > on your Java producer? > > > *Distinti Saluti / *Kind Regards > M.G. > > > > Il giorno lun 26 feb 2024 alle ore 05:08 Bruce Cooper < > br...@mechination.com.au> ha scritto: > > > Now I have to rescind my previous comments. It runs out NON_PERSISTENT > > messages from clients or the browser console are forwarded and consumed > > correctly, but PERSISTENT messages are not transmitted correctly. This > is > > true whether I send from the console, from JAVA using the JMS/OpenWire > > client or from Javascript rhea/AMQP. I have turned on TRACE logging on > the > > servers, but do not see any hints. > > > > Any suggestions? I saw that Herbert suggested I had only set up two > > parallel brokers, but I followed the instructions at > > > > > https://urldefense.com/v3/__https://activemq.apache.org/components/classic/documentation/shared-file-system-master-slave__;!!Ck4v2Rc!kCJ7o6_l-zIBD-UBlJOFj6X_R52z-t95TS3L_Cd175LwuDgk0_1ZAOfTLPv7spvhXcll_c6yLN8WY1oHFIZc52E$ > > > > > > > > > > On Mon, 26 Feb 2024 at 10:00, Bruce Cooper <br...@mechination.com.au> > > wrote: > > > > > As a follow up, I can now confirm that using a different client (an > AMQP > > > client written in javascript using the rhea library) works as expected. > > It > > > is just this Java process that is broken. Any assistance you can > provide > > > would be greatly appreciated. > > > > > > On Mon, 26 Feb 2024 at 09:21, Bruce Cooper <br...@mechination.com.au> > > > wrote: > > > > > >> Hi Marco, > > >> > > >> Thanks for your response. I agree it is strange. I now have some > more > > >> information to share. It now looks like it has something to do with > my > > >> message producer, rather than core ActiveMQ behaviour. > > >> > > >> If I send messages using the ActiveMQ console on Broker A, they are > > >> transmitted and received correctly by the receiver. If I send > messages > > >> using the attached program, it only works until the failover happens. > > What > > >> is super strange is that the producer isn't connecting to the failover > > pair > > >> (B and C). It is connected to the satellite (A). Furthermore, that > > >> process is ephemeral. It is restarted each time a send is made. > > >> > > >> My test is now a bit different than before. I first setup the brokers, > > >> and initiate a "failover" > > >> > > >> 1. Start Brokers B and C configured to use the same data directory > so > > >> that they are in a failover configuration. Note that B is acting > as > > the > > >> primary, and C is in standby > > >> 2. Start Broker A, configured to point to Brokers B and C via the > > >> connection string static:failover:(tcp://100.127.41.128:61616 > ,tcp:// > > >> 100.127.41.128:61617). Note that it connects to B > > >> 3. Start consumer to consume from Brokers B and C using the same > > >> failover connection string. Note that it connects to B (in this > > example) > > >> 4. Start producer to send to broker A. Note that messages are > > >> received by the consumer. Repeat to confirm that this continues to > > operate. > > >> 5. Send a message using the console, and note that this also works. > > >> 6. Stop Broker B. Wait for both Broker A and the consumer to > > >> reconnect to C, once it has obtained its locks and started. > > >> > > >> Once failover has occurred, proving that sending from the console > still > > >> works: > > >> > > >> 1. Connect all processes together, do a failover, and wait for > > >> reconnection > > >> 2. Send message in Broker A's Console, on the queue page > > >> 3. Message is received by consumer > > >> 4. See Enqueued and Dequeued metric go up (after a browser > refresh) - > > >> This indicates that the message has been dequeued by the network > > connection > > >> and forwarded on to the HA pair. > > >> 5. Check the console on Broker C and note that its enqueue and > > >> dequeue count have incremented. > > >> > > >> The above shows everything works as expected, now let us try my > producer > > >> program > > >> > > >> 1. Run the main program below with argument "send" - This connects > to > > >> the satellite broker (Broker A) and publishes a message > > >> 2. Note that no message is received by the consumer connected to C > > >> (once the failover has occurred) > > >> 3. Note that Broker A's console queue page again, and see that the > > >> Enqueued and Dequeued metric have incremented as before. > > >> 4. Check the broker C queue console page, and note that this time > the > > >> Enqueued and Dequeued metric have _not_ been incremented. It is as > > if A > > >> thinks it has sent the message, but the pair C never receives it. > > >> > > >> I feel like I must have misconfigured my producer code in some > fashion, > > >> but it was simply based on examples I found. Is there something > > obvious I > > >> have missed? > > >> > > >> > > >> package au.com.mechination.integ.test; > > >> > > >> import java.util.Date; > > >> > > >> import javax.jms.Connection; > > >> import javax.jms.DeliveryMode; > > >> import javax.jms.Destination; > > >> import javax.jms.JMSException; > > >> import javax.jms.Message; > > >> import javax.jms.MessageConsumer; > > >> import javax.jms.MessageProducer; > > >> import javax.jms.Session; > > >> import javax.jms.TextMessage; > > >> > > >> import org.apache.activemq.ActiveMQConnectionFactory; > > >> import org.slf4j.Logger; > > >> import org.slf4j.LoggerFactory; > > >> > > >> /** > > >> * Hello world! > > >> * > > >> */ > > >> public class Main { > > >> Connection conn = null; > > >> ActiveMQConnectionFactory connectionFactory; > > >> private Session session; > > >> > > >> static final Logger logger = LoggerFactory.getLogger(Main.class); > > >> > > >> public static void main(String[] args) throws JMSException { > > >> Main doer = new Main(); > > >> > > >> if (args.length > 0 && args[0].equals("send")) { > > >> doer.send(); > > >> } else { > > >> doer.receive(); > > >> } > > >> } > > >> > > >> public void connect(String url, String username, String password) > > >> throws JMSException { > > >> final ActiveMQConnectionFactory connectionFactory = new > > >> ActiveMQConnectionFactory(url); > > >> > > >> // Pass the sign-in credentials. > > >> connectionFactory.setUserName(username); > > >> connectionFactory.setPassword(password); > > >> > > >> // Establish a connection for the producer. > > >> this.conn = connectionFactory.createConnection(); > > >> conn.start(); > > >> > > >> session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); > > >> } > > >> > > >> void send() throws JMSException { > > >> String url = "tcp://localhost:61616"; > > >> String username = "bruce"; > > >> String password = "sekrit"; > > >> > > >> System.out.println("Sender connecting to " + url); > > >> connect(url, username, password); > > >> > > >> // Create a queue named "MyQueue". > > >> final Destination producerDestination = session.createQueue( > > >> "TestQueue"); > > >> > > >> // Create a producer from the session to the queue. > > >> final MessageProducer producer = session.createProducer > > >> (producerDestination); > > >> producer.setDeliveryMode(DeliveryMode.PERSISTENT); > > >> System.out.println("TTL is " + producer.getTimeToLive()); // > > >> Prints 0 - Unlimited > > >> > > >> for (int i = 0; i < 1; i++) { > > >> String text = "Hello from Amazon MQ!: " + i + " " + new > Date > > >> ().toInstant().toString(); > > >> System.out.println("Sending message " + text); > > >> TextMessage producerMessage = session.createTextMessage > > >> (text); > > >> producer.send(producerMessage); > > >> } > > >> > > >> producer.close(); > > >> > > >> System.out.println("Send completed"); > > >> System.exit(0); > > >> } > > >> > > >> void receive() throws JMSException { > > >> String url = "failover:(tcp://100.127.41.128:61616,tcp:// > > >> 100.127.41.128:61617)"; > > >> String username = "bruce"; > > >> String password = "sekrit"; > > >> > > >> System.out.println("Receiver connecting to " + url); > > >> connect(url, username, password); > > >> > > >> final Destination consumerQueue = > > session.createQueue("TestQueue" > > >> ); > > >> final MessageConsumer consumer = session.createConsumer > > >> (consumerQueue); > > >> > > >> while (true) { > > >> final Message msg = consumer.receive(2500); > > >> if (msg == null) { > > >> System.out.print("."); > > >> } else if (msg instanceof TextMessage) { > > >> System.out.println("Received msg " + ((TextMessage) > > msg). > > >> getText()); > > >> } else { > > >> System.err.println("Something unexpected received"); > > >> } > > >> } > > >> } > > >> } > > >> > > >> > > >> > > >> > > >> > > > > > > -- > > > > > > https://urldefense.com/v3/__https://mechination.com.au/__;!!Ck4v2Rc!kCJ7o6_l-zIBD-UBlJOFj6X_R52z-t95TS3L_Cd175LwuDgk0_1ZAOfTLPv7spvhXcll_c6yLN8WY1oHayvR7KQ$ > > > Ph: 0448 341 729 > > > > > > > > > -- > > > > > https://urldefense.com/v3/__https://mechination.com.au/__;!!Ck4v2Rc!kCJ7o6_l-zIBD-UBlJOFj6X_R52z-t95TS3L_Cd175LwuDgk0_1ZAOfTLPv7spvhXcll_c6yLN8WY1oHayvR7KQ$ > > Ph: 0448 341 729 > > > -- https://mechination.com.au/ Ph: 0448 341 729