At the time you try to unsubscribe, is a client with that ID connected? If so, you need to close the existing consumer before unsubscribing.
Tim On Sep 16, 2016 8:01 AM, "Lovegiver" <frederic.courc...@gmail.com> wrote: > Hello there,I'm trying to *UNSUBSCRIBE durable subscribers from TOPICS*.My > app is a kind of social network : each user is a topic for other users. So, > each time a user is doing something, his friends are notified.Of course, a > subscriber may unsubscribe from a topic, wanting to receive notifications > about a user no more.Each time I'm trying to unsubscribe, I've got an error > telling me that : "javax.jms.JMSException: Durable consumer is in use"Here > are my 2 classes, the SENDER one and the RECEIVER one.Can someone tell me > what I'm doing wrong ??I'm just using basic features of > ActiveMQ...Thanx.*SENDER :*package com.citizenweb.classes;import > java.util.Date;import javax.jms.Connection;import > javax.jms.ConnectionFactory;import javax.jms.Destination;import > javax.jms.JMSException;import javax.jms.MessageFormatException;import > javax.jms.MessageProducer;import javax.jms.Session;import > javax.jms.TextMessage;import javax.jms.Topic;import > javax.jms.ObjectMessage;import org.apache.activemq. > ActiveMQConnection;import > org.apache.activemq.ActiveMQConnectionFactory;import > org.apache.activemq.ActiveMQSession;import > com.citizenweb.interfaces.PostIF;import > com.citizenweb.interfaces.UserIF;public class Sender { private > ActiveMQConnectionFactory factory = null; private ActiveMQConnection > connection = null; private ActiveMQSession session = null; private > Destination destination = null; private MessageProducer producer = null; > public Sender() { } public void connect(){ try{ > factory = new > ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); > // TODO > Mécanisme de sécurité d'ActiveMQ à rétablir en production > factory.setTrustAllPackages(true); connection = > (ActiveMQConnection) > factory.createConnection(); connection.start(); > session = > (ActiveMQSession) connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); > } catch (JMSException e){ e.printStackTrace(); > } } public void > sendPost(UserIF user,PostIF post) { > if(session==null){connect();} try { > destination = session.createTopic(user.toString()); > producer = > session.createProducer(destination); ObjectMessage > postMessage = > session.createObjectMessage(); > postMessage.setObject(post); > producer.send(postMessage); System.out.println("\n > SENDER Object message > sent"); /*QueueBrowser qb = > session.createBrowser((Queue) destination); > Enumeration<?> msgs = qb.getEnumeration(); if ( > !msgs.hasMoreElements() ) > { System.out.println("No messages in queue"); > } else { while > (msgs.hasMoreElements()) { Message tempMsg = > (Message)msgs.nextElement(); > System.out.println("Message: " + > tempMsg); } }*/ > } catch (MessageFormatException e) { > e.printStackTrace(); } catch (JMSException e) { > e.printStackTrace(); } } > public void sendInformation(UserIF user,String info){ > if(session==null){connect();} try { > destination = > session.createTopic(user.toString()); producer = > session.createProducer(destination); TextMessage > infoMessage = > session.createTextMessage(); infoMessage.setText(info); > producer.send(infoMessage); System.out.println("\n > SENDER Information > message sent"); } catch (JMSException e) { > e.printStackTrace(); } } /** > * @param args * @throws Exception */ public static void > main(String[] > args) throws Exception { UserIF u1, u2, > u3; String[] nom = new > String[5]; String[] prenom = new String[5]; > String[] login = new > String[5]; String[] password = new String[5]; > Date[] naiss = new Date[5]; > String[] mail = new String[5]; for (int i = 0; i < 5; i++) { > nom[i] = > "nom_" + i; prenom[i] = "prenom_" + i; > login[i] = "login_" + i; > password[i] = "password_" + i; naiss[i] = new Date(); > mail[i] = "mail_" > + i; } System.out.println("\n SENDER AFFECTATION > DES NOMS"); u1 = new > User(nom[0], prenom[0], login[0], password[0], naiss[0], mail[0]); > u2 = new > User(nom[1], prenom[1], login[1], password[1], naiss[1], mail[1]); > u3 = new > User(nom[2], prenom[2], login[2], password[2], naiss[2], mail[2]); > Sender sender = new Sender(); > sender.sendInformation(u1, "U1 > notification"); sender.sendInformation(u2, "U2 notification"); > sender.sendInformation(u3, "U3 notification"); //PostIF post = new > Post("Mon Post","Ceci est mon message",u1,u1,"Classe Sender",((User) > u1).getIdUser(),0); //sender.sendPost(user, post); > sender.session.close(); > sender.connection.close(); }}*RECEIVER :*package > com.citizenweb.classes;import java.io.Serializable;import > java.util.ArrayList;import java.util.Date;import java.util.List;import > javax.jms.JMSException;import javax.jms.Message;import > javax.jms.MessageConsumer;import javax.jms.MessageListener;import > javax.jms.ObjectMessage;import javax.jms.Session;import > javax.jms.TextMessage;import javax.jms.Topic;import > org.apache.activemq.ActiveMQConnection;import > org.apache.activemq.ActiveMQConnectionFactory;import > org.apache.activemq.ActiveMQSession;import > org.apache.activemq.broker.region.Destination;import > com.citizenweb.interfaces.PostIF;import > com.citizenweb.interfaces.UserIF;import com.citizenweb.classes.Post;public > class Receiver implements MessageListener, Serializable { private > static > final long serialVersionUID = 1L; private ActiveMQConnectionFactory > factory > = null; private ActiveMQConnection connection = null; private > ActiveMQSession session = null; private Topic destination = null; > private > MessageConsumer consumer = null; UserIF userTopic = new User(); > UserIF > userSubscriber = new User(); List listeMsg = new ArrayList(); > public > Receiver(UserIF subscriber) { this.userSubscriber = subscriber; > } public > void connect() { try { factory = new > ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); > // TODO > Mécanisme de sécurité d'ActiveMQ à rétablir en production > factory.setTrustAllPackages(true); connection = > (ActiveMQConnection) > factory.createConnection(); // ClientID : > // > https://qnalist.com/questions/2068823/create-durable-topic-subscriber > connection.setClientID(userSubscriber.toString()); > connection.start(); > session = (ActiveMQSession) connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { > e.printStackTrace(); } } public void > receiveMessage(UserIF topic) { try { > if (session == null) { connect(); > } destination = > session.createTopic(topic.toString()); String > nomAbonnement = > topic.toString() + "->" + userSubscriber.toString(); > //String > nomAbonnement = userSubscriber.toString(); consumer = > session.createDurableSubscriber(destination, nomAbonnement); > consumer.setMessageListener(this); } catch (JMSException e) { > e.printStackTrace(); } } public void > unsubscribe(UserIF topic) { try { > System.out.println("\n RECEIVER Désinscription du topic " + > topic.toString()); //consumer.close(); > String nomAbonnement = > topic.toString() + "->" + userSubscriber.toString(); > //String > nomAbonnement = userSubscriber.toString(); > System.out.println("\n RECEIVER > Abonnement à clore = " + nomAbonnement); > session.unsubscribe(nomAbonnement); > System.out.println("\n RECEIVER " + > userSubscriber.toString() + " s'est désinscrit de " + nomAbonnement); > } > catch (JMSException e) { e.printStackTrace(); > } } @Override public void > onMessage(Message message) { System.out.println("\n RECEIVER > OnMessage > triggered for " + userSubscriber.toString()); > listeMsg.add(message); > System.out.println("\n RECEIVER Nombre de messages reçus par " + > userSubscriber + " = " + listeMsg.size()); String classe = > message.getClass().getSimpleName(); System.out.println("\n > RECEIVER Classe > de message : " + classe); try { if > (message instanceof TextMessage) { > TextMessage text = (TextMessage) message; > System.out.println("\n RECEIVER > Information : " + text.getText()); } > if (message instanceof > ObjectMessage) { System.out.println("\n > RECEIVER ObjectMessage"); > ObjectMessage oMessage = (ObjectMessage) message; > if > (oMessage.getObject() instanceof PostIF) { > PostIF post = (PostIF) > oMessage.getObject(); String s = ((Post) > post).getCorpsMessage(); > System.out.println("\n RECEIVER Post : " + s); } > } } catch > (JMSException e) { e.printStackTrace(); } > } public static void > main(String[] args) throws JMSException { /* * > EACH USER IS A TOPIC FOR > OTHER USERS * WHATEVER A USER DOES RESULTS IN A NOTIFICATION > TO > SUBSCRIBERS */ //CREATE USER > UserIF u1, u2, u3; String[] nom = new > String[5]; String[] prenom = new String[5]; > String[] login = new > String[5]; String[] password = new String[5]; > Date[] naiss = new Date[5]; > String[] mail = new String[5]; for (int i = 0; i < 5; i++) { > nom[i] = > "nom_" + i; prenom[i] = "prenom_" + i; > login[i] = "login_" + i; > password[i] = "password_" + i; naiss[i] = new Date(); > mail[i] = "mail_" > + i; } u1 = new User(nom[0], prenom[0], login[0], > password[0], naiss[0], > mail[0]); u2 = new User(nom[1], prenom[1], login[1], > password[1], naiss[1], > mail[1]); u3 = new User(nom[2], prenom[2], login[2], > password[2], naiss[2], > mail[2]); /* * MAKE EACH USER > A SUBSCRIBER */ Receiver receiver1 = > new Receiver(u1); Receiver receiver2 = new Receiver(u2); > Receiver > receiver3 = new Receiver(u3); /* * PUT A MESSAGE > LISTENER FOR EACH USER > */ receiver1.receiveMessage(u2); > receiver1.receiveMessage(u3); > receiver2.receiveMessage(u1); receiver2.receiveMessage(u3); > receiver3.receiveMessage(u1); receiver3.receiveMessage(u2); > /* * CALL > THE SENDER CLASS TO SEND MESSAGES */ try { > Sender.main(args); } catch > (Exception e1) { e1.printStackTrace(); } > /* * A SLEEP TO HAVE ENOUGH > TIME TO LOOK AT THE ACTIVEMQ CONSOLE * CAN BE REMOVE > */ try { > Thread.sleep(10000); } catch (InterruptedException e) { > // TODO > Auto-generated catch block e.printStackTrace(); > return; } /* * > UNSUBSCRIBE SUBSCRIBERS FROM TOPICS */ > receiver1.unsubscribe(u2); > receiver1.unsubscribe(u3); receiver2.unsubscribe(u1); > receiver2.unsubscribe(u3); receiver3.unsubscribe(u1); > receiver3.unsubscribe(u2); }} > > > > -- > View this message in context: http://activemq.2283324.n4. > nabble.com/Unsubscribing-DurableSubscribers-tp4716592.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com.