I've reproduced this problem and I'm investigating. At first glance it appears related to the implementation of MQTT-4.7.2-1 which states:
> The Server MUST NOT match Topic Filters starting with a wildcard character (# or +) with Topic Names beginning with a $ character. This is part of both MQTT 3.1.1 and 5 specifications, but it wasn't implemented previously. I implemented it as part of the work for MQTT 5 which is why the problem is showing up in the latest release. Justin On Wed, Apr 20, 2022 at 4:29 AM Modanese, Riccardo <riccardo.modan...@eurotech.com.invalid> wrote: > > Hello, > switching from Artemis broker 2.20 to 2.21 we experienced an issue > about message delivering. > It looks like MQTT devices don’t receive the messages matching their > subscriptions. > > The test code (***) run with an Artemis broker 2.19 or 2.20 as target > (wildcard addresses modified as (**)) produces the correct output: > waiting for messages > Client: test-client-1 - Delivery completed: > $EDC/kapua-sys/test-client-1/MQTT/BIRTH > Client: test-client-admin - Message arrived on topic: > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test > Client: test-client-1 - Message arrived on topic: > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test > === > Client: test-client-2 - Delivery completed: > $EDC/kapua-sys/test-client-2/MQTT/BIRTH > Client: test-client-2 - Message arrived on topic: > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test > Client: test-client-admin - Message arrived on topic: > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test > === > Client: test-client-admin - Delivery completed: > $EDC/kapua-sys/test-client-1/MQTT/APPS > Client: test-client-admin - Message arrived on topic: > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test > Client: test-client-1 - Message arrived on topic: > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test > … > > With broker 2.21 or 2.22 (configuration changes described in (*) ) as > target the output is: > waiting for messages > === > Client: test-client-1 - Delivery completed: > $EDC/kapua-sys/test-client-1/MQTT/BIRTH > === > Client: test-client-2 - Delivery completed: > $EDC/kapua-sys/test-client-2/MQTT/BIRTH > === > Client: test-client-admin - Delivery completed: > $EDC/kapua-sys/test-client-1/MQTT/APPS > … > > So the broker doesn’t send any message to the clients. > > May be we missed to configure something needed by 2.21 versions onward? > > Regards, > Riccardo Modanese > > > > (*) The 2.21 and 2.22 default broker.xml configuration file has changed in > this way: > set the broker name (message-broker) > removed double connector bound to 1883 (the broker with the default > configuration crashed) > allow only MQTT protocol for connector bound to 1883 port > removed broadcast connector and configuration > added custom wildcard configuration (**) > > (**) <wildcard-addresses> > <routing-enabled>true</routing-enabled> > <delimiter>/</delimiter> > <any-words>#</any-words> > <single-word>+</single-word> > </wildcard-addresses> > > (***) > > /******************************************************************************* > * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others > * > * This program and the accompanying materials are made > * available under the terms of the Eclipse Public License 2.0 > * which is available at https://www.eclipse.org/legal/epl-2.0/ > * > * SPDX-License-Identifier: EPL-2.0 > * > * Contributors: > * Eurotech - initial API and implementation > > *******************************************************************************/ > package org.eclipse.kapua.qa.common; > > import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; > import org.eclipse.paho.client.mqttv3.MqttCallback; > import org.eclipse.paho.client.mqttv3.MqttClient; > import org.eclipse.paho.client.mqttv3.MqttConnectOptions; > import org.eclipse.paho.client.mqttv3.MqttException; > import org.eclipse.paho.client.mqttv3.MqttMessage; > import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > public class TestMqttClient { > > protected static Logger logger = > LoggerFactory.getLogger(TestMqttClient.class); > > private static final String SERVER_URI = "tcp://localhost:1883"; > private static final String CLIENT_ID_ADMIN = "test-client-admin"; > private static final String CLIENT_ID_1 = "test-client-1"; > private static final String CLIENT_ID_2 = "test-client-2"; > private static final String USERNAME = "kapua-broker"; > private static final String PASSWORD = "kapua-password"; > private static final String USERNAME_ADMIN = "kapua-sys"; > private static final String PASSWORD_ADMIN = "kapua-password"; > > private TestMqttClient() { > } > > public static void main(String argv[]) throws MqttException { > MqttClient clientAdmin = new MqttClient(SERVER_URI, > CLIENT_ID_ADMIN, new MemoryPersistence()); > MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new > MemoryPersistence()); > MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new > MemoryPersistence()); > clientAdmin.setCallback(new > TestMqttClientCallback(CLIENT_ID_ADMIN)); > client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1)); > client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2)); > > clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN, > PASSWORD_ADMIN)); > client1.connect(getMqttConnectOptions(USERNAME, PASSWORD)); > client2.connect(getMqttConnectOptions(USERNAME, PASSWORD)); > System.out.println("waiting for messages"); > client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#"); > client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#"); > clientAdmin.subscribe("#"); > > client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH", > new MqttMessage("test".getBytes())); > System.out.println("==="); > client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH", > new MqttMessage("test".getBytes())); > System.out.println("==="); > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + > "/MQTT/APPS", new MqttMessage("test".getBytes())); > System.out.println("==="); > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + > "/MQTT/APPS", new MqttMessage("test".getBytes())); > System.out.println("==="); > > client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new > MqttMessage("test".getBytes())); > System.out.println("==="); > client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new > MqttMessage("test".getBytes())); > System.out.println("==="); > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", > new MqttMessage("test".getBytes())); > System.out.println("==="); > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", > new MqttMessage("test".getBytes())); > System.out.println("==="); > > clientAdmin.disconnect(); > client1.disconnect(); > client2.disconnect(); > } > > private static MqttConnectOptions getMqttConnectOptions(String > username, String password) { > MqttConnectOptions options = new MqttConnectOptions(); > options.setCleanSession(true); > options.setUserName(username); > options.setPassword(password.toCharArray()); > return options; > } > } > > class TestMqttClientCallback implements MqttCallback { > > private String clientId; > > TestMqttClientCallback(String clientId) { > this.clientId = clientId; > } > > @Override > public void messageArrived(String topic, MqttMessage message) throws > Exception { > System.out.println("Client: " + clientId + " - Message arrived on > topic: " + topic + " - message: " + new String(message.getPayload())); > } > > @Override > public void deliveryComplete(IMqttDeliveryToken token) { > System.out.println("Client: " + clientId + " - Delivery completed: > " + token.getTopics()[0]); > } > > @Override > public void connectionLost(Throwable cause) { > System.out.println("Client: " + clientId + " - Connection lost: " > + cause.getMessage()); > cause.printStackTrace(); > } > } >