Unsubscribe durable subscribers with ActiveMQ

up vote
0
down vote
favorite
I'm trying to UNSUBSCRIBE durable subscribers from TOPICS.

My app is a kind of social network : each user is a topic for other users.
So, each time a user is doing something, his friends are notified. Of
course, a subscriber may unsubscribe from a topic, wanting to receive
notifications about a user no more.

Each time I'm trying to unsubscribe a subscriber from a topic, I've got an
error telling me that : "javax.jms.JMSException: Durable consumer is in use"

Here are my 2 classes, the SENDER one and the RECEIVER one. Can someone
tell me what I'm doing wrong ??

SENDER Class :

package com.citizenweb.classes;

import java.util.Date;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.ObjectMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;

import com.citizenweb.interfaces.PostIF;
import com.citizenweb.interfaces.UserIF;

public class Sender {

    private ActiveMQConnectionFactory factory = null;
    private ActiveMQConnection connection = null;
    private ActiveMQSession session = null;
    private Destination destination = null;
    private MessageProducer producer = null;

    public Sender() {
    }

    public void connect(){
        try{
            factory = new
ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
            // TODO Mécanisme de sécurité d'ActiveMQ à rétablir en
production
            factory.setTrustAllPackages(true);
            connection = (ActiveMQConnection) factory.createConnection();
            connection.start();
            session = (ActiveMQSession) connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e){
            e.printStackTrace();
        }
    }

    public void sendPost(UserIF user,PostIF post) {
        if(session==null){connect();}
        try {
            destination = session.createTopic(user.toString());
            producer = session.createProducer(destination);
            ObjectMessage postMessage = session.createObjectMessage();
            postMessage.setObject(post);
            producer.send(postMessage);
            System.out.println("\n SENDER Object message sent");

        } catch (MessageFormatException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void sendInformation(UserIF user,String info){
        if(session==null){connect();}
        try {
            destination = session.createTopic(user.toString());
            producer = session.createProducer(destination);
            TextMessage infoMessage = session.createTextMessage();
            infoMessage.setText(info);
            producer.send(infoMessage);
            System.out.println("\n SENDER Information message sent");
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {

        UserIF u1, u2, u3;
        String[] nom = new String[5];
        String[] prenom = new String[5];
        String[] login = new String[5];
        String[] password = new String[5];
        Date[] naiss = new Date[5];
        String[] mail = new String[5];
        for (int i = 0; i < 5; i++) {
            nom[i] = "nom_" + i;
            prenom[i] = "prenom_" + i;
            login[i] = "login_" + i;
            password[i] = "password_" + i;
            naiss[i] = new Date();
            mail[i] = "mail_" + i;
        }

        System.out.println("\n SENDER AFFECTATION DES NOMS");
        u1 = new User(nom[0], prenom[0], login[0], password[0], naiss[0],
mail[0]);
        u2 = new User(nom[1], prenom[1], login[1], password[1], naiss[1],
mail[1]);
        u3 = new User(nom[2], prenom[2], login[2], password[2], naiss[2],
mail[2]);

        Sender sender = new Sender();

        sender.sendInformation(u1, "U1 notification");
        sender.sendInformation(u2, "U2 notification");
        sender.sendInformation(u3, "U3 notification");
        //PostIF post = new Post("Mon Post","Ceci est mon
message",u1,u1,"Classe Sender",((User) u1).getIdUser(),0);
        //sender.sendPost(user, post);
        sender.session.close();
        sender.connection.close();

    }

}
RECEIVER Class :

package com.citizenweb.classes;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.region.Destination;
import com.citizenweb.interfaces.PostIF;
import com.citizenweb.interfaces.UserIF;
import com.citizenweb.classes.Post;

public class Receiver implements MessageListener, Serializable {

    private static final long serialVersionUID = 1L;
    private ActiveMQConnectionFactory factory = null;
    private ActiveMQConnection connection = null;
    private ActiveMQSession session = null;
    private Topic destination = null;
    private MessageConsumer consumer = null;

    UserIF userTopic = new User();
    UserIF userSubscriber = new User();
    List<Message> listeMsg = new ArrayList<Message>();

    public Receiver(UserIF subscriber) {
        this.userSubscriber = subscriber;
    }

    public void connect() {
        try {
            factory = new
ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
            // TODO Mécanisme de sécurité d'ActiveMQ à rétablir en
production
            factory.setTrustAllPackages(true);
            connection = (ActiveMQConnection) factory.createConnection();
            // ClientID :
            //
https://qnalist.com/questions/2068823/create-durable-topic-subscriber
            connection.setClientID(userSubscriber.toString());
            connection.start();
            session = (ActiveMQSession) connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void receiveMessage(UserIF topic) {
        try {
            if (session == null) {
                connect();
            }
            destination = session.createTopic(topic.toString());
            String nomAbonnement = topic.toString() + "->" +
userSubscriber.toString();
            //String nomAbonnement = userSubscriber.toString();
            consumer = session.createDurableSubscriber(destination,
nomAbonnement);
            consumer.setMessageListener(this);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void unsubscribe(UserIF topic) {
        try {
            if (session == null) {
                connect();
            }
            System.out.println("\n RECEIVER Désinscription du topic " +
topic.toString());
            //consumer.close();
            String nomAbonnement = topic.toString() + "->" +
userSubscriber.toString();
            //String nomAbonnement = userSubscriber.toString();
            System.out.println("\n RECEIVER Abonnement à clore = " +
nomAbonnement);
            session.unsubscribe(nomAbonnement);
            System.out.println("\n RECEIVER " + userSubscriber.toString() +
" s'est désinscrit de " + nomAbonnement);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onMessage(Message message) {
        System.out.println("\n RECEIVER OnMessage triggered for " +
userSubscriber.toString());
        listeMsg.add(message);
        System.out.println("\n RECEIVER Nombre de messages reçus par " +
userSubscriber + " = " + listeMsg.size());
        String classe = message.getClass().getSimpleName();
        System.out.println("\n RECEIVER Classe de message : " + classe);
        try {
            if (message instanceof TextMessage) {
                TextMessage text = (TextMessage) message;
                System.out.println("\n RECEIVER Information : " +
text.getText());
            }
            if (message instanceof ObjectMessage) {
                System.out.println("\n RECEIVER ObjectMessage");
                ObjectMessage oMessage = (ObjectMessage) message;
                if (oMessage.getObject() instanceof PostIF) {
                    PostIF post = (PostIF) oMessage.getObject();
                    String s = ((Post) post).getCorpsMessage();
                    System.out.println("\n RECEIVER Post : " + s);
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws JMSException {

        /*
         * EACH USER IS A TOPIC FOR OTHER USERS
         * WHATEVER A USER DOES RESULTS IN A NOTIFICATION TO SUBSCRIBERS
        */

        //CREATE USER
        UserIF u1, u2, u3;
        String[] nom = new String[5];
        String[] prenom = new String[5];
        String[] login = new String[5];
        String[] password = new String[5];
        Date[] naiss = new Date[5];
        String[] mail = new String[5];
        for (int i = 0; i < 5; i++) {
            nom[i] = "nom_" + i;
            prenom[i] = "prenom_" + i;
            login[i] = "login_" + i;
            password[i] = "password_" + i;
            naiss[i] = new Date();
            mail[i] = "mail_" + i;
        }

        u1 = new User(nom[0], prenom[0], login[0], password[0], naiss[0],
mail[0]);
        u2 = new User(nom[1], prenom[1], login[1], password[1], naiss[1],
mail[1]);
        u3 = new User(nom[2], prenom[2], login[2], password[2], naiss[2],
mail[2]);

        /*
         * MAKE EACH USER A SUBSCRIBER
         */
        Receiver receiver1 = new Receiver(u1);
        Receiver receiver2 = new Receiver(u2);
        Receiver receiver3 = new Receiver(u3);

        /*
         * PUT A MESSAGE LISTENER FOR EACH USER
         */
        receiver1.receiveMessage(u2);
        receiver1.receiveMessage(u3);
        receiver2.receiveMessage(u1);
        receiver2.receiveMessage(u3);
        receiver3.receiveMessage(u1);
        receiver3.receiveMessage(u2);

        /*
         * CALL THE SENDER CLASS TO SEND MESSAGES
         */
        try {
            Sender.main(args);
        } catch (Exception e1) {
            e1.printStackTrace();
        }

        /*
         * A SLEEP TO HAVE ENOUGH TIME TO LOOK AT THE ACTIVEMQ CONSOLE
         * CAN BE REMOVE
         */
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            return;
        }

        /*
         * UNSUBSCRIBE SUBSCRIBERS FROM TOPICS
         */
        receiver1.unsubscribe(u2);
        receiver1.unsubscribe(u3);
        receiver2.unsubscribe(u1);
        receiver2.unsubscribe(u3);
        receiver3.unsubscribe(u1);
        receiver3.unsubscribe(u2);
    }

}
java jms activemq jms-topic durable-subscription
shareeditflag
asked 7 hours ago

Lovegiver
36
add a comment
2 Answers
active oldest votes
up vote
1
down vote
You can only unsubscribe a durable subscription if there is no active
subscriber currently consuming from it. It looks like your code creates
several subscriptions and does not stop the consumers so of course the
unsubscribe will fail, if you close down the consumers and then do the
unsubscribe you should get the result you are looking for.

An example of durable subscription unsubscribe is here.

shareeditflag
edited 12 mins ago
answered 7 hours ago

Tim Bish
8,73441731


Hi Tim and thanx for your help. When you say "You can only unsubscribe a
durable subscription if there are no active subscriber currently consuming
from it", you seem to talk about a topic. My problem is that one topic may
have many durable subscribers. One of these subscribers may want to
unsuscribe this topic, and there's no reason to stop the subscription of
the other subscribers for the same topic I want to stop a subscription for
a topic without killing the topic itself That's what I wanna do with the
unsubscribe() method of my Receiver class Isn't it possible ? – Lovegiver 5
hours ago


That's not what I said, read again carefully. A Topic subscription that is
active cannot be unsubscribed, you need to close the consumer that is using
the subscription you want to unsubscribe. – Tim Bish 4 hours ago


OK Tim so if I do a session.unsubscribe(durableID) as shown in my code,
this should stop the subscription for the given subscriber but only if I
have done a consumer.close() before ? I've tried this a lot, but always got
the same error message : durable consumer is in use What should I do more
than this : 'consumer.close();' 'session.unsubscribe(nomAbonnement);' –
Lovegiver 3 hours ago


If I have 2 topics named "Topic A" & "TopicB", and 3 durable subscribers
for each of these topics, "U1", "U2 and "U3", what should I do to stop the
"U1" subscription for TopicA ? The session.unsubscribe(durableID) statement
must specify the "durableID" which is a unique identifier for a subscriber
and a subscription. If "U1" suscribed to 3 topics, the DurableID should be
different for each. In my case, my DurableID is created this way :
"Topic1->U1". User2 would have "Topic1->U2" fort the same topic and
"Topic2->U2" for topic #2. What's wrong in my code ? – Lovegiver 2 hours ago


Afraid I can't debug your code, the best thing to do is use the broker
tooling such as the Web Console or JMX to validate that you have no active
subscriber when you are doing the unsubscribe – Tim Bish 2 hours ago
add a comment

up vote
-1
down vote
public void unsubscribe(UserIF topic) {
    try {
        if (session == null) {
            connect();
        }
        System.out.println("\n RECEIVER Désinscription du topic " +
topic.toString());
        //consumer.close();
        String nomAbonnement = topic.toString() + "->" +
userSubscriber.toString();
        //String nomAbonnement = userSubscriber.toString();
        System.out.println("\n RECEIVER Abonnement à clore = " +
nomAbonnement);
        consumer.close();
        session.unsubscribe(nomAbonnement);
        System.out.println("\n RECEIVER " + userSubscriber.toString() + "
s'est désinscrit de " + nomAbonnement);
    } catch (JMSException e) {
        e.printStackTrace();
    }
}
Here is the problem I think and why close method not resolve your error :

receiver1.receiveMessage(u2);
receiver1.receiveMessage(u3);
Calling receiveMessage many times on the same object as you designed will
overwrite your consumer reference and the old consumer is in progress but
you don't reference it

consumer = session.createDurableSubscriber(destination, nomAbonnement);

for this reason when you try to unsubscribe you close another consumer and
you try to unsubscribe the good subscription.

If you call only these methods and their correspondent unsubscribe this
will works fine with close method.
receiver1.receiveMessage(u2); receiver2.receiveMessage(u1);
receiver3.receiveMessage(u1);
Le 16 sept. 2016 22:05, "Lovegiver" <frederic.courc...@gmail.com> a écrit :

> ​​
> Good evening Tim,
>
> thanx a lot for responding me.
>
> The response is YES.
>
> I've got 3 topics living and each has got 2 subscribers (it's my test
> case).
>
> Topics have been created.
> DurableSubscribers have registered.
> In case they disconnect, a listener is ON for each topic.
>
> Each subscriber has got a ClientID -> his name
> Each subscription has got a unique DurableID generated like this (as
> written in code sample) : "topicName+ClientID"
>
> In the code I shared, I'm using only the "session.unsuscribe(durableID)"
> because when I prior use the "consumer.close()" statement, I got the same
> error message.
>
> I think you're telling me to do so :
> consumer.close();
> session.unsubscribe(durableID);
>
> Am I right ?
>
> It doesn't work, I don't know why...
>
>
>
> Frédéric COURCIER
> citizen-web <http://blog.citizen-web.com/>
>
> <http://goog_1602287852>
>
>
>
>
> 2016-09-16 20:47 GMT+02:00 Tim Bain [via ActiveMQ] <
> ml-node+s2283324n471660...@n4.nabble.com>:
>
> > At the time you try to unsubscribe, is a client with that ID connected?
> > If
> > so, you need to close the existing consumer before unsubscribing.
> >
> > Tim
> >
> > On Sep 16, 2016 8:01 AM, "Lovegiver" <[hidden email]
> > <http:///user/SendEmail.jtp?type=node&node=4716607&i=0>> wrote:
> >
> > > Hello there,I'm trying to *UNSUBSCRIBE durable subscribers from
> > TOPICS*.My
> > > app is a kind of social network : each user is a topic for other users.
> > So,
> > > each time a user is doing something, his friends are notified.Of
> course,
> > a
> > > subscriber may unsubscribe from a topic, wanting to receive
> > notifications
> > > about a user no more.Each time I'm trying to unsubscribe, I've got an
> > error
> > > telling me that : "javax.jms.JMSException: Durable consumer is in
> > use"Here
> > > are my 2 classes, the SENDER one and the RECEIVER one.Can someone tell
> > me
> > > what I'm doing wrong ??I'm just using basic features of
> > > ActiveMQ...Thanx.*SENDER :*package com.citizenweb.classes;import
> > > java.util.Date;import javax.jms.Connection;import
> > > javax.jms.ConnectionFactory;import javax.jms.Destination;import
> > > javax.jms.JMSException;import javax.jms.MessageFormatException;import
> > > javax.jms.MessageProducer;import javax.jms.Session;import
> > > javax.jms.TextMessage;import javax.jms.Topic;import
> > > javax.jms.ObjectMessage;import org.apache.activemq.
> > > ActiveMQConnection;import
> > > org.apache.activemq.ActiveMQConnectionFactory;import
> > > org.apache.activemq.ActiveMQSession;import
> > > com.citizenweb.interfaces.PostIF;import
> > > com.citizenweb.interfaces.UserIF;public class Sender {  private
> > > ActiveMQConnectionFactory factory = null;       private
> > ActiveMQConnection
> > > connection = null;      private ActiveMQSession session = null; private
> > > Destination destination = null; private MessageProducer producer =
> null;
> > > public Sender() {       }       public void connect(){          try{
> > >               factory = new
> > > ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
> > >                // TODO
> > > Mécanisme de sécurité d'ActiveMQ à rétablir en production
> > > factory.setTrustAllPackages(true);                      connection =
> > > (ActiveMQConnection)
> > > factory.createConnection();                     connection.start();
> > >              session =
> > > (ActiveMQSession) connection.createSession(false,
> > > Session.AUTO_ACKNOWLEDGE);
> > > } catch (JMSException e){                       e.printStackTrace();
> > >       }       }               public void
> > > sendPost(UserIF user,PostIF post) {
> > >  if(session==null){connect();}           try {
> > > destination = session.createTopic(user.toString());
> > >  producer =
> > > session.createProducer(destination);                    ObjectMessage
> > > postMessage =
> > > session.createObjectMessage();
> > > postMessage.setObject(post);
> > > producer.send(postMessage);                     System.out.println("\n
> > > SENDER Object message
> > > sent");                                         /*QueueBrowser qb =
> > > session.createBrowser((Queue) destination);
> > > Enumeration<?> msgs = qb.getEnumeration();                      if (
> > > !msgs.hasMoreElements() )
> > > {                           System.out.println("No messages in queue");
> > >              } else {                            while
> > > (msgs.hasMoreElements()) {                              Message tempMsg
> > =
> > > (Message)msgs.nextElement();
> > > System.out.println("Message: " +
> > > tempMsg);                           }                   }*/
> > >                      } catch (MessageFormatException e) {
> > > e.printStackTrace();            } catch (JMSException e) {
> > >       e.printStackTrace();            }       }
> > > public void sendInformation(UserIF user,String info){
> > > if(session==null){connect();}           try {
> > >  destination =
> > > session.createTopic(user.toString());                   producer =
> > > session.createProducer(destination);                    TextMessage
> > > infoMessage =
> > > session.createTextMessage();
> >  infoMessage.setText(info);
> > > producer.send(infoMessage);                     System.out.println("\n
> > > SENDER Information
> > > message sent");         } catch (JMSException e) {
> > > e.printStackTrace();            }       }       /**
> > > * @param args    * @throws Exception     */     public static void
> > > main(String[]
> > > args) throws Exception {                                UserIF u1, u2,
> > > u3;              String[] nom = new
> > > String[5];              String[] prenom = new String[5];
> > > String[] login = new
> > > String[5];              String[] password = new String[5];
> > > Date[] naiss = new Date[5];
> > > String[] mail = new String[5];          for (int i = 0; i < 5; i++) {
> > >              nom[i] =
> > > "nom_" + i;                     prenom[i] = "prenom_" + i;
> > >       login[i] = "login_" + i;
> > > password[i] = "password_" + i;                  naiss[i] = new Date();
> > >               mail[i] = "mail_"
> > > + i;            }               System.out.println("\n SENDER
> > AFFECTATION
> > > DES NOMS");           u1 = new
> > > User(nom[0], prenom[0], login[0], password[0], naiss[0], mail[0]);
> > >       u2 = new
> > > User(nom[1], prenom[1], login[1], password[1], naiss[1], mail[1]);
> > >       u3 = new
> > > User(nom[2], prenom[2], login[2], password[2], naiss[2], mail[2]);
> > > Sender sender = new Sender();
> > >  sender.sendInformation(u1, "U1
> > > notification");         sender.sendInformation(u2, "U2 notification");
> > > sender.sendInformation(u3, "U3 notification");          //PostIF post =
> > new
> > > Post("Mon Post","Ceci est mon message",u1,u1,"Classe Sender",((User)
> > > u1).getIdUser(),0);             //sender.sendPost(user, post);
> > > sender.session.close();
> > > sender.connection.close();      }}*RECEIVER :*package
> > > com.citizenweb.classes;import java.io.Serializable;import
> > > java.util.ArrayList;import java.util.Date;import java.util.List;import
> > > javax.jms.JMSException;import javax.jms.Message;import
> > > javax.jms.MessageConsumer;import javax.jms.MessageListener;import
> > > javax.jms.ObjectMessage;import javax.jms.Session;import
> > > javax.jms.TextMessage;import javax.jms.Topic;import
> > > org.apache.activemq.ActiveMQConnection;import
> > > org.apache.activemq.ActiveMQConnectionFactory;import
> > > org.apache.activemq.ActiveMQSession;import
> > > org.apache.activemq.broker.region.Destination;import
> > > com.citizenweb.interfaces.PostIF;import
> > > com.citizenweb.interfaces.UserIF;import com.citizenweb.classes.Post;
> public
> >
> > > class Receiver implements MessageListener, Serializable {       private
> > > static
> > > final long serialVersionUID = 1L;       private
> > ActiveMQConnectionFactory
> > > factory
> > > = null; private ActiveMQConnection connection = null;   private
> > > ActiveMQSession session = null; private Topic destination = null;
> > >  private
> > > MessageConsumer consumer = null;        UserIF userTopic = new User();
> > > UserIF
> > > userSubscriber = new User();    List listeMsg = new ArrayList();
> > > public
> > > Receiver(UserIF subscriber) {           this.userSubscriber =
> > subscriber;
> > >      }       public
> > > void connect() {                try {                   factory = new
> > > ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
> > >                // TODO
> > > Mécanisme de sécurité d'ActiveMQ à rétablir en production
> > > factory.setTrustAllPackages(true);                      connection =
> > > (ActiveMQConnection)
> > > factory.createConnection();                     // ClientID :
> > >      //
> > > https://qnalist.com/questions/2068823/create-durable-topic-subscriber
> > > connection.setClientID(userSubscriber.toString());
> > > connection.start();
> > > session = (ActiveMQSession) connection.createSession(false,
> > > Session.AUTO_ACKNOWLEDGE);              } catch (JMSException e) {
> > > e.printStackTrace();            }       }       public void
> > > receiveMessage(UserIF topic) {              try {
> > > if (session == null) {                          connect();
> > >       }                       destination =
> > > session.createTopic(topic.toString());                  String
> > > nomAbonnement =
> > > topic.toString() + "->" + userSubscriber.toString();
> > > //String
> > > nomAbonnement = userSubscriber.toString();
> consumer
> > =
> > > session.createDurableSubscriber(destination, nomAbonnement);
> > > consumer.setMessageListener(this);              } catch (JMSException
> > e) {
> > > e.printStackTrace();            }       }       public void
> > > unsubscribe(UserIF topic) {         try {
> > > System.out.println("\n RECEIVER Désinscription du topic " +
> > > topic.toString());                      //consumer.close();
> > >      String nomAbonnement =
> > > topic.toString() + "->" + userSubscriber.toString();
> > > //String
> > > nomAbonnement = userSubscriber.toString();
> > > System.out.println("\n RECEIVER
> > > Abonnement à clore = " + nomAbonnement);
> > > session.unsubscribe(nomAbonnement);
> > >  System.out.println("\n RECEIVER " +
> > > userSubscriber.toString() + " s'est désinscrit de " + nomAbonnement);
> > >      }
> > > catch (JMSException e) {                        e.printStackTrace();
> > >       }       }       @Override       public void
> > > onMessage(Message message) {            System.out.println("\n RECEIVER
> > > OnMessage
> > > triggered for " + userSubscriber.toString());
> > >  listeMsg.add(message);
> > > System.out.println("\n RECEIVER Nombre de messages reçus par " +
> > > userSubscriber + " = " + listeMsg.size());              String classe =
> > > message.getClass().getSimpleName();             System.out.println("\n
> > > RECEIVER Classe
> > > de message : " + classe);               try {                   if
> > > (message instanceof TextMessage) {
> > > TextMessage text = (TextMessage) message;
> > >  System.out.println("\n RECEIVER
> > > Information : " + text.getText());                      }
> > >      if (message instanceof
> > > ObjectMessage) {                                System.out.println("\n
> > > RECEIVER ObjectMessage");
> > > ObjectMessage oMessage = (ObjectMessage) message;
> > >      if
> > > (oMessage.getObject() instanceof PostIF) {
> > >       PostIF post = (PostIF)
> > > oMessage.getObject();                                   String s =
> > ((Post)
> > > post).getCorpsMessage();
> > > System.out.println("\n RECEIVER Post : " + s);
> >  }
> > >                      }               } catch
> > > (JMSException e) {                      e.printStackTrace();
> >  }
> > >      }               public static void
> > > main(String[] args) throws JMSException {               /*
> > *
> > > EACH USER IS A TOPIC FOR
> > > OTHER USERS              * WHATEVER A USER DOES RESULTS IN A
> > NOTIFICATION
> > > TO
> > > SUBSCRIBERS             */                              //CREATE USER
> > >      UserIF u1, u2, u3;              String[] nom = new
> > > String[5];              String[] prenom = new String[5];
> > > String[] login = new
> > > String[5];              String[] password = new String[5];
> > > Date[] naiss = new Date[5];
> > > String[] mail = new String[5];          for (int i = 0; i < 5; i++) {
> > >              nom[i] =
> > > "nom_" + i;                     prenom[i] = "prenom_" + i;
> > >       login[i] = "login_" + i;
> > > password[i] = "password_" + i;                  naiss[i] = new Date();
> > >               mail[i] = "mail_"
> > > + i;            }               u1 = new User(nom[0], prenom[0],
> > login[0],
> > > password[0], naiss[0],
> > > mail[0]);               u2 = new User(nom[1], prenom[1], login[1],
> > > password[1], naiss[1],
> > > mail[1]);               u3 = new User(nom[2], prenom[2], login[2],
> > > password[2], naiss[2],
> > > mail[2]);                               /*               * MAKE EACH
> > USER
> > > A SUBSCRIBER           */             Receiver receiver1 =
> > > new Receiver(u1);               Receiver receiver2 = new Receiver(u2);
> > >       Receiver
> > > receiver3 = new Receiver(u3);           /*               * PUT A
> MESSAGE
> > > LISTENER FOR EACH USER
> > > */              receiver1.receiveMessage(u2);
> > >  receiver1.receiveMessage(u3);
> > > receiver2.receiveMessage(u1);           receiver2.receiveMessage(u3);
> > > receiver3.receiveMessage(u1);           receiver3.receiveMessage(u2);
> > >                      /*               * CALL
> > > THE SENDER CLASS TO SEND MESSAGES                */             try {
> > >              Sender.main(args);              } catch
> > > (Exception e1) {                        e1.printStackTrace();
> > }
> > >                              /*               * A SLEEP TO HAVE ENOUGH
> > > TIME TO LOOK AT THE ACTIVEMQ CONSOLE             * CAN BE REMOVE
> > >        */             try {
> > > Thread.sleep(10000);            } catch (InterruptedException e) {
> > >               // TODO
> > > Auto-generated catch block                      e.printStackTrace();
> > >               return;         }               /*               *
> > > UNSUBSCRIBE SUBSCRIBERS FROM TOPICS              */
> > >  receiver1.unsubscribe(u2);
> > > receiver1.unsubscribe(u3);              receiver2.unsubscribe(u1);
> > > receiver2.unsubscribe(u3);              receiver3.unsubscribe(u1);
> > > receiver3.unsubscribe(u2);      }}
> > >
> > >
> > >
> > > --
> > > View this message in context: http://activemq.2283324.n4.
> > > nabble.com/Unsubscribing-DurableSubscribers-tp4716592.html
> > > Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> >
> >
> > ------------------------------
> > If you reply to this email, your message will be added to the discussion
> > below:
> > http://activemq.2283324.n4.nabble.com/Unsubscribing-DurableSubscribers-
> > tp4716592p4716607.html
> > To unsubscribe from Unsubscribing DurableSubscribers, click here
> > <http://activemq.2283324.n4.nabble.com/template/NamlServlet.jtp?macro=
> unsubscribe_by_code&node=4716592&code=ZnJlZGVyaWMuY291cmNpZXJAZ21haW
> wuY29tfDQ3MTY1OTJ8MjAyNTY3MjE0Nw==>
> > .
> > NAML
> > <http://activemq.2283324.n4.nabble.com/template/
> NamlServlet.jtp?macro=macro_viewer&id=instant_html%
> 21nabble%3Aemail.naml&base=nabble.naml.namespaces.
> BasicNamespace-nabble.view.web.template.NabbleNamespace-
> nabble.view.web.template.NodeNamespace&breadcrumbs=
> notify_subscribers%21nabble%3Aemail.naml-instant_emails%
> 21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
> >
>
>
>
>
> --
> View this message in context: http://activemq.2283324.n4.
> nabble.com/Unsubscribing-DurableSubscribers-tp4716592p4716609.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to