I've reproduced this problem and I'm investigating. At first glance it
appears related to the implementation of MQTT-4.7.2-1 which states:

> The Server MUST NOT match Topic Filters starting with a wildcard
character (# or +) with Topic Names beginning with a $ character.

This is part of both MQTT 3.1.1 and 5 specifications, but it wasn't
implemented previously. I implemented it as part of the work for MQTT 5
which is why the problem is showing up in the latest release.


Justin

On Wed, Apr 20, 2022 at 4:29 AM Modanese, Riccardo
<riccardo.modan...@eurotech.com.invalid> wrote:

>
> Hello,
>     switching from Artemis broker 2.20 to 2.21 we experienced an issue
> about message delivering.
> It looks like MQTT devices don’t receive the messages matching their
> subscriptions.
>
> The test code (***) run with an Artemis broker 2.19 or 2.20 as target
> (wildcard addresses modified as (**)) produces the correct output:
> waiting for messages
> Client: test-client-1 - Delivery completed:
> $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> Client: test-client-admin - Message arrived on topic:
> $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> Client: test-client-1 - Message arrived on topic:
> $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> ===
> Client: test-client-2 - Delivery completed:
> $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> Client: test-client-2 - Message arrived on topic:
> $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> Client: test-client-admin - Message arrived on topic:
> $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> ===
> Client: test-client-admin - Delivery completed:
> $EDC/kapua-sys/test-client-1/MQTT/APPS
> Client: test-client-admin - Message arrived on topic:
> $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> Client: test-client-1 - Message arrived on topic:
> $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> …
>
> With broker 2.21 or 2.22 (configuration changes described in (*) ) as
> target the output is:
> waiting for messages
> ===
> Client: test-client-1 - Delivery completed:
> $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> ===
> Client: test-client-2 - Delivery completed:
> $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> ===
> Client: test-client-admin - Delivery completed:
> $EDC/kapua-sys/test-client-1/MQTT/APPS
> …
>
> So the broker doesn’t send any message to the clients.
>
> May be we missed to configure something needed by 2.21 versions onward?
>
> Regards,
> Riccardo Modanese
>
>
>
> (*) The 2.21 and 2.22 default broker.xml configuration file has changed in
> this way:
> set the broker name (message-broker)
> removed double connector bound to 1883 (the broker with the default
> configuration crashed)
> allow only MQTT protocol for connector bound to 1883 port
> removed broadcast connector and configuration
> added custom wildcard configuration (**)
>
> (**) <wildcard-addresses>
>          <routing-enabled>true</routing-enabled>
>          <delimiter>/</delimiter>
>           <any-words>#</any-words>
>           <single-word>+</single-word>
>       </wildcard-addresses>
>
> (***)
>
> /*******************************************************************************
> * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
> *
> * This program and the accompanying materials are made
> * available under the terms of the Eclipse Public License 2.0
> * which is available at https://www.eclipse.org/legal/epl-2.0/
> *
> * SPDX-License-Identifier: EPL-2.0
> *
> * Contributors:
> *     Eurotech - initial API and implementation
>
> *******************************************************************************/
> package org.eclipse.kapua.qa.common;
>
> import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
> import org.eclipse.paho.client.mqttv3.MqttCallback;
> import org.eclipse.paho.client.mqttv3.MqttClient;
> import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
> import org.eclipse.paho.client.mqttv3.MqttException;
> import org.eclipse.paho.client.mqttv3.MqttMessage;
> import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> public class TestMqttClient {
>
>     protected static Logger logger =
> LoggerFactory.getLogger(TestMqttClient.class);
>
>     private static final String SERVER_URI = "tcp://localhost:1883";
>     private static final String CLIENT_ID_ADMIN = "test-client-admin";
>     private static final String CLIENT_ID_1 = "test-client-1";
>     private static final String CLIENT_ID_2 = "test-client-2";
>     private static final String USERNAME = "kapua-broker";
>     private static final String PASSWORD = "kapua-password";
>     private static final String USERNAME_ADMIN = "kapua-sys";
>     private static final String PASSWORD_ADMIN = "kapua-password";
>
>     private TestMqttClient() {
>     }
>
>     public static void main(String argv[]) throws MqttException {
>         MqttClient clientAdmin = new MqttClient(SERVER_URI,
> CLIENT_ID_ADMIN, new MemoryPersistence());
>         MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new
> MemoryPersistence());
>         MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new
> MemoryPersistence());
>         clientAdmin.setCallback(new
> TestMqttClientCallback(CLIENT_ID_ADMIN));
>         client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1));
>         client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2));
>
>         clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN,
> PASSWORD_ADMIN));
>         client1.connect(getMqttConnectOptions(USERNAME, PASSWORD));
>         client2.connect(getMqttConnectOptions(USERNAME, PASSWORD));
>         System.out.println("waiting for messages");
>         client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#");
>         client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#");
>         clientAdmin.subscribe("#");
>
>         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH",
> new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH",
> new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> "/MQTT/APPS", new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> "/MQTT/APPS", new MqttMessage("test".getBytes()));
>         System.out.println("===");
>
>         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new
> MqttMessage("test".getBytes()));
>         System.out.println("===");
>         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new
> MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC",
> new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC",
> new MqttMessage("test".getBytes()));
>         System.out.println("===");
>
>         clientAdmin.disconnect();
>         client1.disconnect();
>         client2.disconnect();
>     }
>
>     private static MqttConnectOptions getMqttConnectOptions(String
> username, String password) {
>         MqttConnectOptions options = new MqttConnectOptions();
>         options.setCleanSession(true);
>         options.setUserName(username);
>         options.setPassword(password.toCharArray());
>         return options;
>     }
> }
>
> class TestMqttClientCallback implements MqttCallback {
>
>     private String clientId;
>
>     TestMqttClientCallback(String clientId) {
>         this.clientId = clientId;
>     }
>
>     @Override
>     public void messageArrived(String topic, MqttMessage message) throws
> Exception {
>         System.out.println("Client: " + clientId + " - Message arrived on
> topic: " + topic + " - message: " + new String(message.getPayload()));
>     }
>
>     @Override
>     public void deliveryComplete(IMqttDeliveryToken token) {
>         System.out.println("Client: " + clientId + " - Delivery completed:
> " + token.getTopics()[0]);
>     }
>
>     @Override
>     public void connectionLost(Throwable cause) {
>         System.out.println("Client: " + clientId + " - Connection lost: "
> + cause.getMessage());
>         cause.printStackTrace();
>     }
> }
>

Reply via email to