I opened ARTEMIS-3801 [1] and sent a PR to fix what was broken. However, it's worth noting that your code will still not work as it did before because the previous behavior violated the MQTT 3.1.1 specification. As noted previously, both MQTT 3.1.1 and 5 specifications contain MQTT-4.7.2-1 which states:
> The Server MUST NOT match Topic Filters starting with a wildcard character (# or +) with Topic Names beginning with a $ character. Previously your "test-client-admin" client was subscribing to `#` and receiving messages from topics beginning with `$`. This won't work anymore. Justin [1] https://issues.apache.org/jira/browse/ARTEMIS-3801 On Wed, Apr 20, 2022 at 4:29 AM Modanese, Riccardo <riccardo.modan...@eurotech.com.invalid> wrote: > > Hello, > switching from Artemis broker 2.20 to 2.21 we experienced an issue > about message delivering. > It looks like MQTT devices don’t receive the messages matching their > subscriptions. > > The test code (***) run with an Artemis broker 2.19 or 2.20 as target > (wildcard addresses modified as (**)) produces the correct output: > waiting for messages > Client: test-client-1 - Delivery completed: > $EDC/kapua-sys/test-client-1/MQTT/BIRTH > Client: test-client-admin - Message arrived on topic: > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test > Client: test-client-1 - Message arrived on topic: > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test > === > Client: test-client-2 - Delivery completed: > $EDC/kapua-sys/test-client-2/MQTT/BIRTH > Client: test-client-2 - Message arrived on topic: > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test > Client: test-client-admin - Message arrived on topic: > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test > === > Client: test-client-admin - Delivery completed: > $EDC/kapua-sys/test-client-1/MQTT/APPS > Client: test-client-admin - Message arrived on topic: > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test > Client: test-client-1 - Message arrived on topic: > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test > … > > With broker 2.21 or 2.22 (configuration changes described in (*) ) as > target the output is: > waiting for messages > === > Client: test-client-1 - Delivery completed: > $EDC/kapua-sys/test-client-1/MQTT/BIRTH > === > Client: test-client-2 - Delivery completed: > $EDC/kapua-sys/test-client-2/MQTT/BIRTH > === > Client: test-client-admin - Delivery completed: > $EDC/kapua-sys/test-client-1/MQTT/APPS > … > > So the broker doesn’t send any message to the clients. > > May be we missed to configure something needed by 2.21 versions onward? > > Regards, > Riccardo Modanese > > > > (*) The 2.21 and 2.22 default broker.xml configuration file has changed in > this way: > set the broker name (message-broker) > removed double connector bound to 1883 (the broker with the default > configuration crashed) > allow only MQTT protocol for connector bound to 1883 port > removed broadcast connector and configuration > added custom wildcard configuration (**) > > (**) <wildcard-addresses> > <routing-enabled>true</routing-enabled> > <delimiter>/</delimiter> > <any-words>#</any-words> > <single-word>+</single-word> > </wildcard-addresses> > > (***) > > /******************************************************************************* > * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others > * > * This program and the accompanying materials are made > * available under the terms of the Eclipse Public License 2.0 > * which is available at https://www.eclipse.org/legal/epl-2.0/ > * > * SPDX-License-Identifier: EPL-2.0 > * > * Contributors: > * Eurotech - initial API and implementation > > *******************************************************************************/ > package org.eclipse.kapua.qa.common; > > import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; > import org.eclipse.paho.client.mqttv3.MqttCallback; > import org.eclipse.paho.client.mqttv3.MqttClient; > import org.eclipse.paho.client.mqttv3.MqttConnectOptions; > import org.eclipse.paho.client.mqttv3.MqttException; > import org.eclipse.paho.client.mqttv3.MqttMessage; > import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > public class TestMqttClient { > > protected static Logger logger = > LoggerFactory.getLogger(TestMqttClient.class); > > private static final String SERVER_URI = "tcp://localhost:1883"; > private static final String CLIENT_ID_ADMIN = "test-client-admin"; > private static final String CLIENT_ID_1 = "test-client-1"; > private static final String CLIENT_ID_2 = "test-client-2"; > private static final String USERNAME = "kapua-broker"; > private static final String PASSWORD = "kapua-password"; > private static final String USERNAME_ADMIN = "kapua-sys"; > private static final String PASSWORD_ADMIN = "kapua-password"; > > private TestMqttClient() { > } > > public static void main(String argv[]) throws MqttException { > MqttClient clientAdmin = new MqttClient(SERVER_URI, > CLIENT_ID_ADMIN, new MemoryPersistence()); > MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new > MemoryPersistence()); > MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new > MemoryPersistence()); > clientAdmin.setCallback(new > TestMqttClientCallback(CLIENT_ID_ADMIN)); > client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1)); > client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2)); > > clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN, > PASSWORD_ADMIN)); > client1.connect(getMqttConnectOptions(USERNAME, PASSWORD)); > client2.connect(getMqttConnectOptions(USERNAME, PASSWORD)); > System.out.println("waiting for messages"); > client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#"); > client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#"); > clientAdmin.subscribe("#"); > > client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH", > new MqttMessage("test".getBytes())); > System.out.println("==="); > client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH", > new MqttMessage("test".getBytes())); > System.out.println("==="); > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + > "/MQTT/APPS", new MqttMessage("test".getBytes())); > System.out.println("==="); > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + > "/MQTT/APPS", new MqttMessage("test".getBytes())); > System.out.println("==="); > > client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new > MqttMessage("test".getBytes())); > System.out.println("==="); > client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new > MqttMessage("test".getBytes())); > System.out.println("==="); > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", > new MqttMessage("test".getBytes())); > System.out.println("==="); > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", > new MqttMessage("test".getBytes())); > System.out.println("==="); > > clientAdmin.disconnect(); > client1.disconnect(); > client2.disconnect(); > } > > private static MqttConnectOptions getMqttConnectOptions(String > username, String password) { > MqttConnectOptions options = new MqttConnectOptions(); > options.setCleanSession(true); > options.setUserName(username); > options.setPassword(password.toCharArray()); > return options; > } > } > > class TestMqttClientCallback implements MqttCallback { > > private String clientId; > > TestMqttClientCallback(String clientId) { > this.clientId = clientId; > } > > @Override > public void messageArrived(String topic, MqttMessage message) throws > Exception { > System.out.println("Client: " + clientId + " - Message arrived on > topic: " + topic + " - message: " + new String(message.getPayload())); > } > > @Override > public void deliveryComplete(IMqttDeliveryToken token) { > System.out.println("Client: " + clientId + " - Delivery completed: > " + token.getTopics()[0]); > } > > @Override > public void connectionLost(Throwable cause) { > System.out.println("Client: " + clientId + " - Connection lost: " > + cause.getMessage()); > cause.printStackTrace(); > } > } >