Hi all,


I'm just trying to evaluate the JMSXGroupID functionality in ActiveMQ 4.0.2.

But seems to not been working as described on activeMQ page (
http://activemq.apache.org/message-groups.html).
This page says :

When a message is being dispatched to a consumer, the JMSXGroupID is
checked. If one is present then the broker checks to see if a consumer owns
that message group. (Since there could be a massive number of individual
message groups we use hash buckets rather than the actual JMSXGroupID
string).

If no consumer is associated with a message group a consumer is chosen.
That JMS MessageConsumer will receive all further messages with the same
JMSXGroupID value until

   - the consumer closes (or the client which created the consumer dies
   etc)
   - someone closes the message group by sending a message with a
   JMSXGroupSeq value of zero (see below for more details)

So I assume (*correct me I'm wrong*):

  - if I have two consumers on the same topic, only one consumer should
  receive messages published on that topic at any time
     - This does not seem to work. In the following example, when 2
     consumers are started, both receive all messages on the topic.
  - I don't need to define selector on my consumers to get one focus on
  a particular group.
     - This seems to be broken also. If I don't define a selector on
     consumers they all receive all messages from the topic.

*Any help appreciated.*



TIA



Harry,



*package* com.test;



*import* javax.jms.Connection;

*import* javax.jms.DeliveryMode;

*import* javax.jms.Destination;

*import* javax.jms.JMSException;

*import* javax.jms.MessageProducer;

*import* javax.jms.Session;

*import* javax.jms.Topic;



*import* org.apache.activemq.ActiveMQConnectionFactory;

*import* org.apache.activemq.command.ActiveMQTextMessage;



*public* *class* GreetingCardFromHolidays {



      *private* String where;



      *private* *int* holidaysDelay;



      Connection connection;



      *private* Session session;



      *private* Topic destination;



      *private* MessageProducer producer;



      *private* *static* *class* CardSender *implements* Runnable {

            *private* GreetingCardFromHolidays from;



            CardSender(GreetingCardFromHolidays from) {

                   *this*.from = from;

            }



            *public* *void* run() {

                   *for* (; *this*.from.holidaysDelay > 0; *this*.from.
holidaysDelay--) {

                          *this*.from.send();

                          *try* {

                                 Thread.*sleep*(2000);

                          } *catch* (InterruptedException e) {

                                 e.printStackTrace();

                          } *finally* {

                                 // Thread.currentThread().notifyAll();

                          }

                   }



            }

      }



      GreetingCardFromHolidays(String where, *int* holidaysDelay) {

            *this*.where = where;

            *this*.holidaysDelay = holidaysDelay;

            *this*.init();

      }


      *public* *static* *void* main(String[] args) {

            *final* GreetingCardFromHolidays tanganika =
*new*GreetingCardFromHolidays(

                          "Tanganika", 2 * 7);

            *final* GreetingCardFromHolidays victoria =
*new*GreetingCardFromHolidays(

                          "Victoria", 4 * 7);

            *new* Thread(*new* CardSender(tanganika), tanganika.where
).start();

            *new* Thread(*new* CardSender(victoria), victoria.where
).start();

      }



      *private* *void* send() {

            *try* {

                   // Create a Session

                   Session session = connection.createSession(*false*,

                                 Session.*AUTO_ACKNOWLEDGE*);



                   // Create the destination (Topic or Queue)

                   Destination destination = session.createTopic("
GONE.FISHING");



                   // Create a MessageProducer from the Session to the
Topic or Queue

                   MessageProducer producer = session.createProducer
(destination);

                   // producer.setTimeToLive(10000);

                   producer.setDeliveryMode(DeliveryMode.*PERSISTENT*);



                   // Create a message

                   String text = "Hello my friend! From [" + where

                                 + "]. I've gone fishing since [" + *this*.
holidaysDelay

                                 + "] days.";

                   ActiveMQTextMessage message = (ActiveMQTextMessage) *
this*.session

                                 .createTextMessage(text);

                   message.setStringProperty("JMSXGroupID", *this*.where);

                   // message.setStringProperty("where",where);



                   // Tell the producer to send the message

                   *this*.producer.send(message);

                   System.*out*.println(*this* + "Sending card :[" + text +
"]. GroupID ["

                                 + message.getGroupID() + "]");



            } *catch* (Exception e) {

                   System.*out*.println("Caught: " + e);

                   e.printStackTrace();

            }

      }



      *private* *void* init() {

            // Create a ConnectionFactory

            ActiveMQConnectionFactory connectionFactory =
*new*ActiveMQConnectionFactory(

                          "tcp://localhost:61616");



            // Create a Connection

            *try* {

                   *this*.connection = connectionFactory.createConnection(*
this*.where,

                                 *this*.where);

                   connection.setClientID(*this*.where);

                   connection.start();

                   // Create a Session

                   *this*.session = connection.createSession(*false*,

                                 Session.*AUTO_ACKNOWLEDGE*);



                   // Create the destination (Topic or Queue)

                   *this*.destination = session.createTopic("GONE.FISHING"
);



                   // Create a MessageProducer from the Session to the
Topic or Queue

                   *this*.producer = session.createProducer(destination);

                   // producer.setTimeToLive(10000);

                   *this*.producer.setDeliveryMode(DeliveryMode.*PERSISTENT
*);



            } *catch* (JMSException e) {

                   e.printStackTrace();

            }



      }



      *protected* *void* finalize() *throws* Throwable {

            System.*out*.println("Finalizing [" + *this* + "]");

            // Clean up

            *if* (*this*.session != *null*)

                   session.close();

            *if* (*this*.connection != *null*)

                   *this*.connection.close();

            *super*.finalize();

      }



      *public* String toString() {

            *return* "On holiday :: " + *this*.where;

      }

}





*package* com.test;



*import* javax.jms.Connection;

*import* javax.jms.Destination;

*import* javax.jms.JMSException;

*import* javax.jms.Message;

*import* javax.jms.MessageConsumer;

*import* javax.jms.MessageListener;

*import* javax.jms.Session;

*import* javax.jms.TextMessage;



*import* org.apache.activemq.ActiveMQConnectionFactory;



*public* *class* GreetingsCardWaiter {

      *private* String where;

      Connection connection;

      Session session;

      *private* *static* *class* CardReceiver *implements* Runnable{

            *private* GreetingsCardWaiter from;

            CardReceiver(GreetingsCardWaiter from){

                   *this*.from=from;

            }

            *public* *void* run(){

                   *this*.from.waitForAcard();

            }

      }

      GreetingsCardWaiter(String where){

            *this*.where = where;

            *this*.init();

      }

      *public* *static* *void* main(String[] args) {

            *new* Thread(*new*
CardReceiver(*new*GreetingsCardWaiter(args[0])), args[0]).start();

      }

      *private* *void* waitForAcard(){

            *try* {

                   // Create the destination (Topic or Queue)

                   Destination destination = session.createTopic("
GONE.FISHING");



                   // Create a MessageProducer from the Session to the
Topic or Queue

                   MessageConsumer consumer = session
.createConsumer(destination);

                   *final* String receiver = *this*.toString();

                   consumer.setMessageListener(*new* MessageListener(){

                          *public* *void* onMessage(Message arg0) {

                                 *try* {

                                       System.*out*.println(receiver + " >>
COOL!!! I've received a card from:["+arg0.getStringProperty("JMSXGroupID")+
"]."+System.*getProperty*("line.separator")+"       Card message is "
+((TextMessage)arg0).getText());

                                 } *catch* (JMSException e) {

                                       // *TODO* Auto-generated catch block

                                       e.printStackTrace();

                                 }

                          }



                   });



                   *this*.connection.start();

                   System.*out*.println(receiver + " starts waiting for new
Cards. Selector is ["+consumer.getMessageSelector()+"]");

                   // Clean up

            } *catch* (Exception e) {

                   System.*out*.println("Caught: " + e);

                   e.printStackTrace();

            }



      }

      *private* *void* init() {

            // Create a ConnectionFactory

            ActiveMQConnectionFactory connectionFactory =
*new*ActiveMQConnectionFactory(

                          "tcp://dmc17525:61616");



            // Create a Connection

            *try* {

                   *this*.connection = connectionFactory.createConnection(*
this*.where, *this*.where);

                   connection.setClientID("Waiting on -" + *this*.where + "
"+ System.*currentTimeMillis*());

                   // Create a Session

                   session = connection.createSession(*false*,

                                 Session.*AUTO_ACKNOWLEDGE*);



            } *catch* (JMSException e) {

                   e.printStackTrace();

            }



}



      *protected* *void* finalize() *throws* Throwable {

            System.*out*.println("Finalizing [" + *this* +"]");

            *if*(*this*.session!=*null*)session.close();

            *if*(*this*.connection!=*null*)         *this*.connection
.close();

            *super*.finalize();

      }

      *public* String toString(){

            *return* "Card receiver :: " + *this*.where;

      }

}

Reply via email to