Which client are you using? can you put together a small unit test that shows this? I cannot reproduce with the small test I tried.
On Wed, Jul 3, 2013 at 2:09 AM, linuxcrazy <linuxcr...@126.com> wrote: > Client:mqttv3,Server: > apache-activemq-5.9.0.redhat-610-20130702.003414-4-bin.zip > > code: > String clientId = "mqtt_test"; > MqttClient client = new > MqttClient("tcp://localhost:1883",clientId); > CallBack callback = new CallBack(); > client.setCallback(callback); > MqttConnectOptions conOptions = new > MqttConnectOptions(); > conOptions.setUserName("mqttMessage"); > > conOptions.setPassword("mqttMessage".toCharArray()); > conOptions.setCleanSession(false); > client.connect(conOptions); > String[] topics = > {"topic.test","topic.test1","topic.test2","topic.test"}; > client.subscribe(topics); > > > AMQ Server Errors: > ======= > WARN | Async error occurred: javax.jms.JMSException: Durable consumer is in > use > for client: mqtt_test and subscriptionName: mqtt_test topic.test > javax.jms.JMSException: Durable consumer is in use for client: mqtt_test > and > subscriptionName: mqtt_test > at > > org.apache.activemq.broker.region.TopicRegion.addConsumer(TopicRegion.java:123) > at > > org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:390) > at > > org.apache.activemq.broker.jmx.ManagedRegionBroker.addConsumer(ManagedRegionBroker.java:229) > at > org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97) > at > > org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:76) > at > org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97) > at > org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97) > at > org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:97) > at > > org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:102) > at > > org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:619) > at > org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:347) > at > > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:329) > at > > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:184) > at > > org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45) > at > > org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.onCommand(MQTTInactivityMonitor.java:147) > at > > org.apache.activemq.transport.mqtt.MQTTTransportFilter.sendToActiveMQ(MQTTTransportFilter.java:91) > at > > org.apache.activemq.transport.mqtt.MQTTProtocolConverter.sendToActiveMQ(MQTTProtocolConverter.java:133) > at > > org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onSubscribe(MQTTProtocolConverter.java:325) > at > > org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onSubscribe(MQTTProtocolConverter.java:292) > at > > org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onMQTTCommand(MQTTProtocolConverter.java:167) > at > > org.apache.activemq.transport.mqtt.MQTTTransportFilter.onCommand(MQTTTransportFilter.java:79) > at > > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) > at > > org.apache.activemq.transport.mqtt.MQTTCodec.processCommand(MQTTCodec.java:114) > at > org.apache.activemq.transport.mqtt.MQTTCodec.parse(MQTTCodec.java:84) > at > > org.apache.activemq.transport.mqtt.MQTTNIOTransport.serviceRead(MQTTNIOTransport.java:105) > at > > org.apache.activemq.transport.mqtt.MQTTNIOTransport.access$000(MQTTNIOTransport.java:43) > at > > org.apache.activemq.transport.mqtt.MQTTNIOTransport$1.onSelect(MQTTNIOTransport.java:66) > at > > org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94) > at > > org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:662) > > > > Connect to ActiveMq Server again. > > code: > > String clientId = "mqtt_test"; > MqttClient client = new > MqttClient("tcp://localhost:1883",clientId); > CallBack callback = new CallBack(); > client.setCallback(callback); > MqttConnectOptions conOptions = new > MqttConnectOptions(); > conOptions.setUserName("mqttMessage"); > > conOptions.setPassword("mqttMessage".toCharArray()); > conOptions.setCleanSession(false); > client.connect(conOptions); > String[] topics = {"topic.test","topic.test1","topic.test2"}; > client.subscribe(topics); > > ActiveMq Server logs: > WARN | Exception occurred processing:null: javax.jms.JMSException: Durable > consumer is in use for client: mqtt_test and subscriptionName: > mqtt_testtopic.test > WARN | Failed to add Connection ID:alphae6400-4104-1372812313970-2:3, > reason: javax.jms.InvalidClientIDException: Broker: localhost - Client: > mqtt_test already connected from null > > The Client(clientId:mqtt_test) can't connect to the AmqServer agian. > > > > -- > View this message in context: > http://activemq.2283324.n4.nabble.com/Problem-about-Repeat-subscribe-message-by-mqttv3-and-the-same-clientId-tp4668829.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > -- *Christian Posta* http://www.christianposta.com/blog twitter: @christianposta