Which client are you using? can you put together a small unit test that
shows this?
I cannot reproduce with the small test I tried.


On Wed, Jul 3, 2013 at 2:09 AM, linuxcrazy <linuxcr...@126.com> wrote:

> Client:mqttv3,Server:
> apache-activemq-5.9.0.redhat-610-20130702.003414-4-bin.zip
>
> code:
>                         String clientId = "mqtt_test";
>                         MqttClient client = new
> MqttClient("tcp://localhost:1883",clientId);
>                         CallBack callback = new CallBack();
>                         client.setCallback(callback);
>                         MqttConnectOptions conOptions = new
> MqttConnectOptions();
>                         conOptions.setUserName("mqttMessage");
>
> conOptions.setPassword("mqttMessage".toCharArray());
>                         conOptions.setCleanSession(false);
>                         client.connect(conOptions);
>             String[] topics =
> {"topic.test","topic.test1","topic.test2","topic.test"};
>             client.subscribe(topics);
>
>
> AMQ Server Errors:
> =======
> WARN | Async error occurred: javax.jms.JMSException: Durable consumer is in
> use
>  for client: mqtt_test and subscriptionName: mqtt_test topic.test
> javax.jms.JMSException: Durable consumer is in use for client: mqtt_test
> and
> subscriptionName: mqtt_test
>         at
>
> org.apache.activemq.broker.region.TopicRegion.addConsumer(TopicRegion.java:123)
>         at
>
> org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:390)
>         at
>
> org.apache.activemq.broker.jmx.ManagedRegionBroker.addConsumer(ManagedRegionBroker.java:229)
>         at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97)
>         at
>
> org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:76)
>         at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97)
>         at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97)
>         at
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97)
>         at
>
> org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:102)
>         at
>
> org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:619)
>         at
> org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:347)
>         at
>
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:329)
>         at
>
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:184)
>         at
>
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
>         at
>
> org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.onCommand(MQTTInactivityMonitor.java:147)
>         at
>
> org.apache.activemq.transport.mqtt.MQTTTransportFilter.sendToActiveMQ(MQTTTransportFilter.java:91)
>         at
>
> org.apache.activemq.transport.mqtt.MQTTProtocolConverter.sendToActiveMQ(MQTTProtocolConverter.java:133)
>         at
>
> org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onSubscribe(MQTTProtocolConverter.java:325)
>         at
>
> org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onSubscribe(MQTTProtocolConverter.java:292)
>         at
>
> org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onMQTTCommand(MQTTProtocolConverter.java:167)
>         at
>
> org.apache.activemq.transport.mqtt.MQTTTransportFilter.onCommand(MQTTTransportFilter.java:79)
>         at
>
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
>         at
>
> org.apache.activemq.transport.mqtt.MQTTCodec.processCommand(MQTTCodec.java:114)
>         at
> org.apache.activemq.transport.mqtt.MQTTCodec.parse(MQTTCodec.java:84)
>         at
>
> org.apache.activemq.transport.mqtt.MQTTNIOTransport.serviceRead(MQTTNIOTransport.java:105)
>         at
>
> org.apache.activemq.transport.mqtt.MQTTNIOTransport.access$000(MQTTNIOTransport.java:43)
>         at
>
> org.apache.activemq.transport.mqtt.MQTTNIOTransport$1.onSelect(MQTTNIOTransport.java:66)
>         at
>
> org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94)
>         at
>
> org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
>
>
>
> Connect to ActiveMq Server again.
>
> code:
>
>                         String clientId = "mqtt_test";
>                         MqttClient client = new
> MqttClient("tcp://localhost:1883",clientId);
>                         CallBack callback = new CallBack();
>                         client.setCallback(callback);
>                         MqttConnectOptions conOptions = new
> MqttConnectOptions();
>                         conOptions.setUserName("mqttMessage");
>
> conOptions.setPassword("mqttMessage".toCharArray());
>                         conOptions.setCleanSession(false);
>                         client.connect(conOptions);
>             String[] topics = {"topic.test","topic.test1","topic.test2"};
>             client.subscribe(topics);
>
>  ActiveMq Server logs:
>  WARN | Exception occurred processing:null: javax.jms.JMSException: Durable
> consumer is in use for client: mqtt_test and subscriptionName:
> mqtt_testtopic.test
>  WARN | Failed to add Connection ID:alphae6400-4104-1372812313970-2:3,
> reason: javax.jms.InvalidClientIDException: Broker: localhost - Client:
> mqtt_test already connected from null
>
> The Client(clientId:mqtt_test) can't connect to the AmqServer agian.
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Problem-about-Repeat-subscribe-message-by-mqttv3-and-the-same-clientId-tp4668829.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Reply via email to