Client:mqttv3,Server:
apache-activemq-5.9.0.redhat-610-20130702.003414-4-bin.zip 

code:
                        String clientId = "mqtt_test";
                        MqttClient client = new 
MqttClient("tcp://localhost:1883",clientId);
                        CallBack callback = new CallBack();
                        client.setCallback(callback);
                        MqttConnectOptions conOptions = new 
MqttConnectOptions();
                        conOptions.setUserName("mqttMessage");
                        conOptions.setPassword("mqttMessage".toCharArray());
                        conOptions.setCleanSession(false);
                        client.connect(conOptions);
            String[] topics =
{"topic.test","topic.test1","topic.test2","topic.test"};
            client.subscribe(topics);


AMQ Server Errors:
=======                 
WARN | Async error occurred: javax.jms.JMSException: Durable consumer is in
use
 for client: mqtt_test and subscriptionName: mqtt_test topic.test
javax.jms.JMSException: Durable consumer is in use for client: mqtt_test and
subscriptionName: mqtt_test
        at
org.apache.activemq.broker.region.TopicRegion.addConsumer(TopicRegion.java:123)
        at
org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:390)
        at
org.apache.activemq.broker.jmx.ManagedRegionBroker.addConsumer(ManagedRegionBroker.java:229)
        at
org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97)
        at
org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:76)
        at
org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97)
        at
org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97)
        at
org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97)
        at
org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:102)
        at
org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:619)
        at
org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:347)
        at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:329)
        at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:184)
        at
org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
        at
org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.onCommand(MQTTInactivityMonitor.java:147)
        at
org.apache.activemq.transport.mqtt.MQTTTransportFilter.sendToActiveMQ(MQTTTransportFilter.java:91)
        at
org.apache.activemq.transport.mqtt.MQTTProtocolConverter.sendToActiveMQ(MQTTProtocolConverter.java:133)
        at
org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onSubscribe(MQTTProtocolConverter.java:325)
        at
org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onSubscribe(MQTTProtocolConverter.java:292)
        at
org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onMQTTCommand(MQTTProtocolConverter.java:167)
        at
org.apache.activemq.transport.mqtt.MQTTTransportFilter.onCommand(MQTTTransportFilter.java:79)
        at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at
org.apache.activemq.transport.mqtt.MQTTCodec.processCommand(MQTTCodec.java:114)
        at
org.apache.activemq.transport.mqtt.MQTTCodec.parse(MQTTCodec.java:84)
        at
org.apache.activemq.transport.mqtt.MQTTNIOTransport.serviceRead(MQTTNIOTransport.java:105)
        at
org.apache.activemq.transport.mqtt.MQTTNIOTransport.access$000(MQTTNIOTransport.java:43)
        at
org.apache.activemq.transport.mqtt.MQTTNIOTransport$1.onSelect(MQTTNIOTransport.java:66)
        at
org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94)
        at
org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)
 
 
 
Connect to ActiveMq Server again. 

code:

                        String clientId = "mqtt_test";
                        MqttClient client = new 
MqttClient("tcp://localhost:1883",clientId);
                        CallBack callback = new CallBack();
                        client.setCallback(callback);
                        MqttConnectOptions conOptions = new 
MqttConnectOptions();
                        conOptions.setUserName("mqttMessage");
                        conOptions.setPassword("mqttMessage".toCharArray());
                        conOptions.setCleanSession(false);
                        client.connect(conOptions);
            String[] topics = {"topic.test","topic.test1","topic.test2"};
            client.subscribe(topics);

 ActiveMq Server logs:
 WARN | Exception occurred processing:null: javax.jms.JMSException: Durable
consumer is in use for client: mqtt_test and subscriptionName:
mqtt_testtopic.test
 WARN | Failed to add Connection ID:alphae6400-4104-1372812313970-2:3,
reason: javax.jms.InvalidClientIDException: Broker: localhost - Client:
mqtt_test already connected from null

The Client(clientId:mqtt_test) can't connect to the AmqServer agian.



--
View this message in context: 
http://activemq.2283324.n4.nabble.com/Problem-about-Repeat-subscribe-message-by-mqttv3-and-the-same-clientId-tp4668829.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to