At the time you try to unsubscribe, is a client with that ID connected?  If
so, you need to close the existing consumer before unsubscribing.

Tim

On Sep 16, 2016 8:01 AM, "Lovegiver" <frederic.courc...@gmail.com> wrote:

> Hello there,I'm trying to *UNSUBSCRIBE durable subscribers from TOPICS*.My
> app is a kind of social network : each user is a topic for other users. So,
> each time a user is doing something, his friends are notified.Of course, a
> subscriber may unsubscribe from a topic, wanting to receive notifications
> about a user no more.Each time I'm trying to unsubscribe, I've got an error
> telling me that : "javax.jms.JMSException: Durable consumer is in use"Here
> are my 2 classes, the SENDER one and the RECEIVER one.Can someone tell me
> what I'm doing wrong ??I'm just using basic features of
> ActiveMQ...Thanx.*SENDER :*package com.citizenweb.classes;import
> java.util.Date;import javax.jms.Connection;import
> javax.jms.ConnectionFactory;import javax.jms.Destination;import
> javax.jms.JMSException;import javax.jms.MessageFormatException;import
> javax.jms.MessageProducer;import javax.jms.Session;import
> javax.jms.TextMessage;import javax.jms.Topic;import
> javax.jms.ObjectMessage;import org.apache.activemq.
> ActiveMQConnection;import
> org.apache.activemq.ActiveMQConnectionFactory;import
> org.apache.activemq.ActiveMQSession;import
> com.citizenweb.interfaces.PostIF;import
> com.citizenweb.interfaces.UserIF;public class Sender {  private
> ActiveMQConnectionFactory factory = null;       private ActiveMQConnection
> connection = null;      private ActiveMQSession session = null; private
> Destination destination = null; private MessageProducer producer = null;
> public Sender() {       }       public void connect(){          try{
>               factory = new
> ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
>                // TODO
> Mécanisme de sécurité d'ActiveMQ à rétablir en production
> factory.setTrustAllPackages(true);                      connection =
> (ActiveMQConnection)
> factory.createConnection();                     connection.start();
>              session =
> (ActiveMQSession) connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> } catch (JMSException e){                       e.printStackTrace();
>       }       }               public void
> sendPost(UserIF user,PostIF post) {
>  if(session==null){connect();}           try {
> destination = session.createTopic(user.toString());
>  producer =
> session.createProducer(destination);                    ObjectMessage
> postMessage =
> session.createObjectMessage();
> postMessage.setObject(post);
> producer.send(postMessage);                     System.out.println("\n
> SENDER Object message
> sent");                                         /*QueueBrowser qb =
> session.createBrowser((Queue) destination);
> Enumeration<?> msgs = qb.getEnumeration();                      if (
> !msgs.hasMoreElements() )
> {                           System.out.println("No messages in queue");
>              } else {                            while
> (msgs.hasMoreElements()) {                              Message tempMsg =
> (Message)msgs.nextElement();
> System.out.println("Message: " +
> tempMsg);                           }                   }*/
>                      } catch (MessageFormatException e) {
> e.printStackTrace();            } catch (JMSException e) {
>       e.printStackTrace();            }       }
> public void sendInformation(UserIF user,String info){
> if(session==null){connect();}           try {
>  destination =
> session.createTopic(user.toString());                   producer =
> session.createProducer(destination);                    TextMessage
> infoMessage =
> session.createTextMessage();                    infoMessage.setText(info);
> producer.send(infoMessage);                     System.out.println("\n
> SENDER Information
> message sent");         } catch (JMSException e) {
> e.printStackTrace();            }       }       /**
> * @param args    * @throws Exception     */     public static void
> main(String[]
> args) throws Exception {                                UserIF u1, u2,
> u3;              String[] nom = new
> String[5];              String[] prenom = new String[5];
> String[] login = new
> String[5];              String[] password = new String[5];
> Date[] naiss = new Date[5];
> String[] mail = new String[5];          for (int i = 0; i < 5; i++) {
>              nom[i] =
> "nom_" + i;                     prenom[i] = "prenom_" + i;
>       login[i] = "login_" + i;
> password[i] = "password_" + i;                  naiss[i] = new Date();
>               mail[i] = "mail_"
> + i;            }               System.out.println("\n SENDER AFFECTATION
> DES NOMS");           u1 = new
> User(nom[0], prenom[0], login[0], password[0], naiss[0], mail[0]);
>       u2 = new
> User(nom[1], prenom[1], login[1], password[1], naiss[1], mail[1]);
>       u3 = new
> User(nom[2], prenom[2], login[2], password[2], naiss[2], mail[2]);
> Sender sender = new Sender();
>  sender.sendInformation(u1, "U1
> notification");         sender.sendInformation(u2, "U2 notification");
> sender.sendInformation(u3, "U3 notification");          //PostIF post = new
> Post("Mon Post","Ceci est mon message",u1,u1,"Classe Sender",((User)
> u1).getIdUser(),0);             //sender.sendPost(user, post);
> sender.session.close();
> sender.connection.close();      }}*RECEIVER :*package
> com.citizenweb.classes;import java.io.Serializable;import
> java.util.ArrayList;import java.util.Date;import java.util.List;import
> javax.jms.JMSException;import javax.jms.Message;import
> javax.jms.MessageConsumer;import javax.jms.MessageListener;import
> javax.jms.ObjectMessage;import javax.jms.Session;import
> javax.jms.TextMessage;import javax.jms.Topic;import
> org.apache.activemq.ActiveMQConnection;import
> org.apache.activemq.ActiveMQConnectionFactory;import
> org.apache.activemq.ActiveMQSession;import
> org.apache.activemq.broker.region.Destination;import
> com.citizenweb.interfaces.PostIF;import
> com.citizenweb.interfaces.UserIF;import com.citizenweb.classes.Post;public
> class Receiver implements MessageListener, Serializable {       private
> static
> final long serialVersionUID = 1L;       private ActiveMQConnectionFactory
> factory
> = null; private ActiveMQConnection connection = null;   private
> ActiveMQSession session = null; private Topic destination = null;
>  private
> MessageConsumer consumer = null;        UserIF userTopic = new User();
> UserIF
> userSubscriber = new User();    List listeMsg = new ArrayList();
> public
> Receiver(UserIF subscriber) {           this.userSubscriber = subscriber;
>      }       public
> void connect() {                try {                   factory = new
> ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
>                // TODO
> Mécanisme de sécurité d'ActiveMQ à rétablir en production
> factory.setTrustAllPackages(true);                      connection =
> (ActiveMQConnection)
> factory.createConnection();                     // ClientID :
>      //
> https://qnalist.com/questions/2068823/create-durable-topic-subscriber
> connection.setClientID(userSubscriber.toString());
> connection.start();
> session = (ActiveMQSession) connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);              } catch (JMSException e) {
> e.printStackTrace();            }       }       public void
> receiveMessage(UserIF topic) {              try {
> if (session == null) {                          connect();
>       }                       destination =
> session.createTopic(topic.toString());                  String
> nomAbonnement =
> topic.toString() + "->" + userSubscriber.toString();
> //String
> nomAbonnement = userSubscriber.toString();                      consumer =
> session.createDurableSubscriber(destination, nomAbonnement);
> consumer.setMessageListener(this);              } catch (JMSException e) {
> e.printStackTrace();            }       }       public void
> unsubscribe(UserIF topic) {         try {
> System.out.println("\n RECEIVER Désinscription du topic " +
> topic.toString());                      //consumer.close();
>      String nomAbonnement =
> topic.toString() + "->" + userSubscriber.toString();
> //String
> nomAbonnement = userSubscriber.toString();
> System.out.println("\n RECEIVER
> Abonnement à clore = " + nomAbonnement);
> session.unsubscribe(nomAbonnement);
>  System.out.println("\n RECEIVER " +
> userSubscriber.toString() + " s'est désinscrit de " + nomAbonnement);
>      }
> catch (JMSException e) {                        e.printStackTrace();
>       }       }       @Override       public void
> onMessage(Message message) {            System.out.println("\n RECEIVER
> OnMessage
> triggered for " + userSubscriber.toString());
>  listeMsg.add(message);
> System.out.println("\n RECEIVER Nombre de messages reçus par " +
> userSubscriber + " = " + listeMsg.size());              String classe =
> message.getClass().getSimpleName();             System.out.println("\n
> RECEIVER Classe
> de message : " + classe);               try {                   if
> (message instanceof TextMessage) {
> TextMessage text = (TextMessage) message;
>  System.out.println("\n RECEIVER
> Information : " + text.getText());                      }
>      if (message instanceof
> ObjectMessage) {                                System.out.println("\n
> RECEIVER ObjectMessage");
> ObjectMessage oMessage = (ObjectMessage) message;
>      if
> (oMessage.getObject() instanceof PostIF) {
>       PostIF post = (PostIF)
> oMessage.getObject();                                   String s = ((Post)
> post).getCorpsMessage();
> System.out.println("\n RECEIVER Post : " + s);                          }
>                      }               } catch
> (JMSException e) {                      e.printStackTrace();            }
>      }               public static void
> main(String[] args) throws JMSException {               /*               *
> EACH USER IS A TOPIC FOR
> OTHER USERS              * WHATEVER A USER DOES RESULTS IN A NOTIFICATION
> TO
> SUBSCRIBERS             */                              //CREATE USER
>      UserIF u1, u2, u3;              String[] nom = new
> String[5];              String[] prenom = new String[5];
> String[] login = new
> String[5];              String[] password = new String[5];
> Date[] naiss = new Date[5];
> String[] mail = new String[5];          for (int i = 0; i < 5; i++) {
>              nom[i] =
> "nom_" + i;                     prenom[i] = "prenom_" + i;
>       login[i] = "login_" + i;
> password[i] = "password_" + i;                  naiss[i] = new Date();
>               mail[i] = "mail_"
> + i;            }               u1 = new User(nom[0], prenom[0], login[0],
> password[0], naiss[0],
> mail[0]);               u2 = new User(nom[1], prenom[1], login[1],
> password[1], naiss[1],
> mail[1]);               u3 = new User(nom[2], prenom[2], login[2],
> password[2], naiss[2],
> mail[2]);                               /*               * MAKE EACH USER
> A SUBSCRIBER           */             Receiver receiver1 =
> new Receiver(u1);               Receiver receiver2 = new Receiver(u2);
>       Receiver
> receiver3 = new Receiver(u3);           /*               * PUT A MESSAGE
> LISTENER FOR EACH USER
> */              receiver1.receiveMessage(u2);
>  receiver1.receiveMessage(u3);
> receiver2.receiveMessage(u1);           receiver2.receiveMessage(u3);
> receiver3.receiveMessage(u1);           receiver3.receiveMessage(u2);
>                      /*               * CALL
> THE SENDER CLASS TO SEND MESSAGES                */             try {
>              Sender.main(args);              } catch
> (Exception e1) {                        e1.printStackTrace();           }
>                              /*               * A SLEEP TO HAVE ENOUGH
> TIME TO LOOK AT THE ACTIVEMQ CONSOLE             * CAN BE REMOVE
>        */             try {
> Thread.sleep(10000);            } catch (InterruptedException e) {
>               // TODO
> Auto-generated catch block                      e.printStackTrace();
>               return;         }               /*               *
> UNSUBSCRIBE SUBSCRIBERS FROM TOPICS              */
>  receiver1.unsubscribe(u2);
> receiver1.unsubscribe(u3);              receiver2.unsubscribe(u1);
> receiver2.unsubscribe(u3);              receiver3.unsubscribe(u1);
> receiver3.unsubscribe(u2);      }}
>
>
>
> --
> View this message in context: http://activemq.2283324.n4.
> nabble.com/Unsubscribing-DurableSubscribers-tp4716592.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to