Hello, I have the problem below.
What I do - There is a MQTT client - Android application. It connects to server and creates topic with name ServiceTracker.Inbound.userName (may be, the name is necessary go log analysis). - Sent there message with all default settings and payload {"name":"1st message”} by ActiveMQ console. Message is successfully delivered. If open Active Subscribers, enqueued = 1, and dequeued = 1 - Turn on Airplane mode at Android, so there is no connection really, but ActiveMQ still shows it in Active Subscribers. - Send message WITH checkbox Persistent delivery and payload {"name":"persistent”} If open Active Subscribers, enqueued = 2, and dequeued = 2 - Send message WITHOUT checkbox Persistent delivery and payload {"name”:”not persistent”} If open Active Subscribers, enqueued = 3, and dequeued = 2 - Turn off Airplane mode at Android, connection appears and only one message is delivered - not persistent. What I tried - Restart ActiveMQ. - Send a few persistent and a few not persistent messages - not persistent are delivered, persistent aren't - Delete ActiveMQ and extract again. - Rebuild Android application and clean all data at phone. Messages, sent after keepAliveInterval exceed are delivered successfully. I reproduced it at least 20 times. I found http://activemq.2283324.n4.nabble.com/Random-persistent-messages-lost-tt3826331.html , but it seems, that I have the other problem. Configuration ActiveMQ version: apache-activemq-5.10.0 $ java -version java version "1.6.0_65" Java(TM) SE Runtime Environment (build 1.6.0_65-b14-462-11M4609) Java HotSpot(TM) 64-Bit Server VM (build 20.65-b04-462, mixed mode) OS: Maс OS X 10.9.5 conf/activemq.xml - default, with change 1 line (twice added “+nio") <transportConnector name="mqtt+nio" uri="mqtt+nio://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> data/activemq.log attached My code for Android: (if it’s not enough, let me know) public boolean connectIfNecessary() throws MqttException, IOException { synchronized (synchLock) { MqttConnectOptions connectionOptions = new MqttConnectOptions(); connectionOptions.setCleanSession(false); connectionOptions.setUserName(userName); connectionOptions.setPassword(password.toCharArray()); if (mqttClient == null) { stash = new MessageStash(applicationRoot + "/" + userName); mqttClient = new MqttAsyncClient( brokerUrl, userName, new MqttDefaultFilePersistence(applicationRoot + "/" + userName) ); Log.d(TAG, "Broker URL: " + brokerUrl); Log.d(TAG, "Connection clientId: " + userName); Log.d(TAG, "Application path: " + applicationRoot); mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { Log.e(TAG, "Connection lost. Cause: " + cause.toString()); service.onConnectionLost(); notification.updateStatus(Notification.STATUS_DISCONNECTED); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { ConnectionBinder recipient = recipients.get(getTopicFromInbound(topic)); if (recipient != null) recipient.onMessageReceived(message.toString()); Log.d(TAG, "Message " + message + " received"); } @Override public void deliveryComplete(IMqttDeliveryToken token) { Log.d(TAG, "Message delivery complete"); } }); } if(mqttClient.isConnected()) // connection is already active - we can subscribe to the topic synchronously (see connect method) return true; if (connecting) // connecting was earlier initiated from a different thread - just let things take their course return false; connecting = true; notification.updateStatus(Notification.STATUS_CONNECTING); mqttClient.connect(connectionOptions, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { connecting = false; Log.d(TAG, "connected"); for (MessageStash.Message message : stash.get()) { try { send(message.topic(), message.body()); message.commit(); } catch (IOException e) { // we can safely ignore it here because this code is executed after the connection is restored // so there will be no need to stash the message, but even the connection will be lost while // resubmitting messages here there will be no need to worry - the message will remain stashed // because message.commit will not be executed } } for (Map.Entry<String, ConnectionBinder> binder : recipients.entrySet()) subscribe(binder.getKey()); notification.updateStatus(Notification.STATUS_CONNECTED); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { connecting = false; // todo: onConnectionLost only on recoverable exceptions Log.e(TAG, "Failed to connect :" + exception.toString()); service.onConnectionLost(); notification.updateStatus(Notification.STATUS_DISCONNECTED); } }); return false; } } private void subscribe(String topic) { final String inboundTopic = getInboundTopic(topic); try { mqttClient.subscribe(inboundTopic, QoS_EXACLY_ONCE, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken iMqttToken) { Log.d(TAG, "Successfully subscribed to " + inboundTopic); } @Override public void onFailure(IMqttToken iMqttToken, Throwable throwable) { Log.e(TAG, "Subscribe to " + inboundTopic + " failed: " + throwable.toString()); } }); } catch (MqttException e) { logger.log(String.format("Exception subscribing to %s", inboundTopic), e); } } public void unregisterSubscriber(String topic) { String inboundTopic = getInboundTopic(topic); if (mqttClient.isConnected()) try { mqttClient.unsubscribe(inboundTopic); } catch (MqttException e) { logger.log(String.format("Exception unsubscribing from %s", inboundTopic), e); } recipients.remove(topic); } public void send(String topic, String message) throws IOException { String outboundTopic = getOutboundTopic(topic); try { mqttClient.publish(outboundTopic, message.getBytes(), QoS_EXACLY_ONCE, true); Log.d(TAG, "published to " + outboundTopic + " :" + message); } catch (MqttException e) { switch (e.getReasonCode()) { // todo: double check this is the only recoverable failure // it seems likely that REASON_CODE_CLIENT_DISCONNECTING should also be here // I am not 100% sure, but I've seen a message 'Publish of blah failed ' with this reason code case MqttException.REASON_CODE_CLIENT_DISCONNECTING: case MqttException.REASON_CODE_CLIENT_NOT_CONNECTED: stash.put(topic, message); // stash it for when the connection comes back online; break; default: logger.log(String.format("Exception publishing to %s", outboundTopic), e); break; } } } public static final String TAG = "MQTT Connection"; private static final int QoS_EXACLY_ONCE = 2; private final Service service; private final Notification notification; private final Object synchLock = new Object(); private final String applicationRoot; private final ILogger logger; private MqttAsyncClient mqttClient; private HashMap<String, ConnectionBinder> recipients = new HashMap<String, ConnectionBinder>(); private MessageStash stash; private boolean connecting; private String brokerUrl = null; private String userName; private String password; -- View this message in context: http://activemq.2283324.n4.nabble.com/Lost-persistent-message-sent-before-keepAliveInterval-exceed-tp4688049.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.