Hi all,
I'm just trying to evaluate the JMSXGroupID functionality in ActiveMQ 4.0.2. But seems to not been working as described on activeMQ page ( http://activemq.apache.org/message-groups.html). This page says : When a message is being dispatched to a consumer, the JMSXGroupID is
checked. If one is present then the broker checks to see if a consumer owns that message group. (Since there could be a massive number of individual message groups we use hash buckets rather than the actual JMSXGroupID string). If no consumer is associated with a message group a consumer is chosen. That JMS MessageConsumer will receive all further messages with the same JMSXGroupID value until - the consumer closes (or the client which created the consumer dies etc) - someone closes the message group by sending a message with a JMSXGroupSeq value of zero (see below for more details) So I assume (*correct me I'm wrong*):
- if I have two consumers on the same topic, only one consumer should receive messages published on that topic at any time - This does not seem to work. In the following example, when 2 consumers are started, both receive all messages on the topic. - I don't need to define selector on my consumers to get one focus on a particular group. - This seems to be broken also. If I don't define a selector on consumers they all receive all messages from the topic. *Any help appreciated.* TIA Harry, *package* com.test; *import* javax.jms.Connection; *import* javax.jms.DeliveryMode; *import* javax.jms.Destination; *import* javax.jms.JMSException; *import* javax.jms.MessageProducer; *import* javax.jms.Session; *import* javax.jms.Topic; *import* org.apache.activemq.ActiveMQConnectionFactory; *import* org.apache.activemq.command.ActiveMQTextMessage; *public* *class* GreetingCardFromHolidays { *private* String where; *private* *int* holidaysDelay; Connection connection; *private* Session session; *private* Topic destination; *private* MessageProducer producer; *private* *static* *class* CardSender *implements* Runnable { *private* GreetingCardFromHolidays from; CardSender(GreetingCardFromHolidays from) { *this*.from = from; } *public* *void* run() { *for* (; *this*.from.holidaysDelay > 0; *this*.from. holidaysDelay--) { *this*.from.send(); *try* { Thread.*sleep*(2000); } *catch* (InterruptedException e) { e.printStackTrace(); } *finally* { // Thread.currentThread().notifyAll(); } } } } GreetingCardFromHolidays(String where, *int* holidaysDelay) { *this*.where = where; *this*.holidaysDelay = holidaysDelay; *this*.init(); } *public* *static* *void* main(String[] args) { *final* GreetingCardFromHolidays tanganika = *new*GreetingCardFromHolidays( "Tanganika", 2 * 7); *final* GreetingCardFromHolidays victoria = *new*GreetingCardFromHolidays( "Victoria", 4 * 7); *new* Thread(*new* CardSender(tanganika), tanganika.where ).start(); *new* Thread(*new* CardSender(victoria), victoria.where ).start(); } *private* *void* send() { *try* { // Create a Session Session session = connection.createSession(*false*, Session.*AUTO_ACKNOWLEDGE*); // Create the destination (Topic or Queue) Destination destination = session.createTopic(" GONE.FISHING"); // Create a MessageProducer from the Session to the Topic or Queue MessageProducer producer = session.createProducer (destination); // producer.setTimeToLive(10000); producer.setDeliveryMode(DeliveryMode.*PERSISTENT*); // Create a message String text = "Hello my friend! From [" + where + "]. I've gone fishing since [" + *this*. holidaysDelay + "] days."; ActiveMQTextMessage message = (ActiveMQTextMessage) * this*.session .createTextMessage(text); message.setStringProperty("JMSXGroupID", *this*.where); // message.setStringProperty("where",where); // Tell the producer to send the message *this*.producer.send(message); System.*out*.println(*this* + "Sending card :[" + text + "]. GroupID [" + message.getGroupID() + "]"); } *catch* (Exception e) { System.*out*.println("Caught: " + e); e.printStackTrace(); } } *private* *void* init() { // Create a ConnectionFactory ActiveMQConnectionFactory connectionFactory = *new*ActiveMQConnectionFactory( "tcp://localhost:61616"); // Create a Connection *try* { *this*.connection = connectionFactory.createConnection(* this*.where, *this*.where); connection.setClientID(*this*.where); connection.start(); // Create a Session *this*.session = connection.createSession(*false*, Session.*AUTO_ACKNOWLEDGE*); // Create the destination (Topic or Queue) *this*.destination = session.createTopic("GONE.FISHING" ); // Create a MessageProducer from the Session to the Topic or Queue *this*.producer = session.createProducer(destination); // producer.setTimeToLive(10000); *this*.producer.setDeliveryMode(DeliveryMode.*PERSISTENT *); } *catch* (JMSException e) { e.printStackTrace(); } } *protected* *void* finalize() *throws* Throwable { System.*out*.println("Finalizing [" + *this* + "]"); // Clean up *if* (*this*.session != *null*) session.close(); *if* (*this*.connection != *null*) *this*.connection.close(); *super*.finalize(); } *public* String toString() { *return* "On holiday :: " + *this*.where; } } *package* com.test; *import* javax.jms.Connection; *import* javax.jms.Destination; *import* javax.jms.JMSException; *import* javax.jms.Message; *import* javax.jms.MessageConsumer; *import* javax.jms.MessageListener; *import* javax.jms.Session; *import* javax.jms.TextMessage; *import* org.apache.activemq.ActiveMQConnectionFactory; *public* *class* GreetingsCardWaiter { *private* String where; Connection connection; Session session; *private* *static* *class* CardReceiver *implements* Runnable{ *private* GreetingsCardWaiter from; CardReceiver(GreetingsCardWaiter from){ *this*.from=from; } *public* *void* run(){ *this*.from.waitForAcard(); } } GreetingsCardWaiter(String where){ *this*.where = where; *this*.init(); } *public* *static* *void* main(String[] args) { *new* Thread(*new* CardReceiver(*new*GreetingsCardWaiter(args[0])), args[0]).start(); } *private* *void* waitForAcard(){ *try* { // Create the destination (Topic or Queue) Destination destination = session.createTopic(" GONE.FISHING"); // Create a MessageProducer from the Session to the Topic or Queue MessageConsumer consumer = session .createConsumer(destination); *final* String receiver = *this*.toString(); consumer.setMessageListener(*new* MessageListener(){ *public* *void* onMessage(Message arg0) { *try* { System.*out*.println(receiver + " >> COOL!!! I've received a card from:["+arg0.getStringProperty("JMSXGroupID")+ "]."+System.*getProperty*("line.separator")+" Card message is " +((TextMessage)arg0).getText()); } *catch* (JMSException e) { // *TODO* Auto-generated catch block e.printStackTrace(); } } }); *this*.connection.start(); System.*out*.println(receiver + " starts waiting for new Cards. Selector is ["+consumer.getMessageSelector()+"]"); // Clean up } *catch* (Exception e) { System.*out*.println("Caught: " + e); e.printStackTrace(); } } *private* *void* init() { // Create a ConnectionFactory ActiveMQConnectionFactory connectionFactory = *new*ActiveMQConnectionFactory( "tcp://dmc17525:61616"); // Create a Connection *try* { *this*.connection = connectionFactory.createConnection(* this*.where, *this*.where); connection.setClientID("Waiting on -" + *this*.where + " "+ System.*currentTimeMillis*()); // Create a Session session = connection.createSession(*false*, Session.*AUTO_ACKNOWLEDGE*); } *catch* (JMSException e) { e.printStackTrace(); } } *protected* *void* finalize() *throws* Throwable { System.*out*.println("Finalizing [" + *this* +"]"); *if*(*this*.session!=*null*)session.close(); *if*(*this*.connection!=*null*) *this*.connection .close(); *super*.finalize(); } *public* String toString(){ *return* "Card receiver :: " + *this*.where; } }