> It’s a huge problem for us since in Kapua we are doing # subscriptions (for internal components) and using $ as topic prefix (also Kura is using this in its namespace) and worked perfectly with ActiveMQ 5.x.
This use-case is clearly prohibited by the MQTT specification. It specifically says: A subscription to “#” will not receive any messages published to a topic beginning with a $ If this is working on ActiveMQ 5.x then ActiveMQ 5.x should be fixed to enforce the MQTT specification appropriately. > I should evaluate the impact but I’m not so sure we can move to Artemis with this limitation. I'm not sure I would categorize this as a "limitation" unless you believe enforcing the specification is a limitation. For what it's worth the MQTT specification also says: Applications cannot use a topic with a leading $ character for their own purposes At this point I recommend you change your application to conform with the specification. Justin On Tue, Apr 26, 2022 at 2:19 AM Modanese, Riccardo <riccardo.modan...@eurotech.com.invalid> wrote: > It’s a huge problem for us since in Kapua we are doing # subscriptions > (for internal components) and using $ as topic prefix (also Kura is using > this in its namespace) and worked perfectly with ActiveMQ 5.x. > > I should evaluate the impact but I’m not so sure we can move to Artemis > with this limitation. > > Riccardo > > Da: Justin Bertram <jbert...@apache.org> > Data: martedì, 26 aprile 2022 00:18 > A: users@activemq.apache.org <users@activemq.apache.org> > Oggetto: Re: Artemis 2.21+ - MQTT devices don’t receive the messages > matching their subscriptions > I opened ARTEMIS-3801 [1] and sent a PR to fix what was broken. > > However, it's worth noting that your code will still not work as it did > before because the previous behavior violated the MQTT 3.1.1 specification. > As noted previously, both MQTT 3.1.1 and 5 specifications contain > MQTT-4.7.2-1 which states: > > > The Server MUST NOT match Topic Filters starting with a wildcard > character (# or +) with Topic Names beginning with a $ character. > > Previously your "test-client-admin" client was subscribing to `#` and > receiving messages from topics beginning with `$`. This won't work anymore. > > > Justin > > [1] https://issues.apache.org/jira/browse/ARTEMIS-3801 > > On Wed, Apr 20, 2022 at 4:29 AM Modanese, Riccardo > <riccardo.modan...@eurotech.com.invalid> wrote: > > > > > Hello, > > switching from Artemis broker 2.20 to 2.21 we experienced an issue > > about message delivering. > > It looks like MQTT devices don’t receive the messages matching their > > subscriptions. > > > > The test code (***) run with an Artemis broker 2.19 or 2.20 as target > > (wildcard addresses modified as (**)) produces the correct output: > > waiting for messages > > Client: test-client-1 - Delivery completed: > > $EDC/kapua-sys/test-client-1/MQTT/BIRTH > > Client: test-client-admin - Message arrived on topic: > > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test > > Client: test-client-1 - Message arrived on topic: > > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test > > === > > Client: test-client-2 - Delivery completed: > > $EDC/kapua-sys/test-client-2/MQTT/BIRTH > > Client: test-client-2 - Message arrived on topic: > > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test > > Client: test-client-admin - Message arrived on topic: > > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test > > === > > Client: test-client-admin - Delivery completed: > > $EDC/kapua-sys/test-client-1/MQTT/APPS > > Client: test-client-admin - Message arrived on topic: > > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test > > Client: test-client-1 - Message arrived on topic: > > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test > > … > > > > With broker 2.21 or 2.22 (configuration changes described in (*) ) as > > target the output is: > > waiting for messages > > === > > Client: test-client-1 - Delivery completed: > > $EDC/kapua-sys/test-client-1/MQTT/BIRTH > > === > > Client: test-client-2 - Delivery completed: > > $EDC/kapua-sys/test-client-2/MQTT/BIRTH > > === > > Client: test-client-admin - Delivery completed: > > $EDC/kapua-sys/test-client-1/MQTT/APPS > > … > > > > So the broker doesn’t send any message to the clients. > > > > May be we missed to configure something needed by 2.21 versions onward? > > > > Regards, > > Riccardo Modanese > > > > > > > > (*) The 2.21 and 2.22 default broker.xml configuration file has changed > in > > this way: > > set the broker name (message-broker) > > removed double connector bound to 1883 (the broker with the default > > configuration crashed) > > allow only MQTT protocol for connector bound to 1883 port > > removed broadcast connector and configuration > > added custom wildcard configuration (**) > > > > (**) <wildcard-addresses> > > <routing-enabled>true</routing-enabled> > > <delimiter>/</delimiter> > > <any-words>#</any-words> > > <single-word>+</single-word> > > </wildcard-addresses> > > > > (***) > > > > > /******************************************************************************* > > * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others > > * > > * This program and the accompanying materials are made > > * available under the terms of the Eclipse Public License 2.0 > > * which is available at https://www.eclipse.org/legal/epl-2.0/ > > * > > * SPDX-License-Identifier: EPL-2.0 > > * > > * Contributors: > > * Eurotech - initial API and implementation > > > > > *******************************************************************************/ > > package org.eclipse.kapua.qa.common; > > > > import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; > > import org.eclipse.paho.client.mqttv3.MqttCallback; > > import org.eclipse.paho.client.mqttv3.MqttClient; > > import org.eclipse.paho.client.mqttv3.MqttConnectOptions; > > import org.eclipse.paho.client.mqttv3.MqttException; > > import org.eclipse.paho.client.mqttv3.MqttMessage; > > import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; > > import org.slf4j.Logger; > > import org.slf4j.LoggerFactory; > > > > public class TestMqttClient { > > > > protected static Logger logger = > > LoggerFactory.getLogger(TestMqttClient.class); > > > > private static final String SERVER_URI = "tcp://localhost:1883"; > > private static final String CLIENT_ID_ADMIN = "test-client-admin"; > > private static final String CLIENT_ID_1 = "test-client-1"; > > private static final String CLIENT_ID_2 = "test-client-2"; > > private static final String USERNAME = "kapua-broker"; > > private static final String PASSWORD = "kapua-password"; > > private static final String USERNAME_ADMIN = "kapua-sys"; > > private static final String PASSWORD_ADMIN = "kapua-password"; > > > > private TestMqttClient() { > > } > > > > public static void main(String argv[]) throws MqttException { > > MqttClient clientAdmin = new MqttClient(SERVER_URI, > > CLIENT_ID_ADMIN, new MemoryPersistence()); > > MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new > > MemoryPersistence()); > > MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new > > MemoryPersistence()); > > clientAdmin.setCallback(new > > TestMqttClientCallback(CLIENT_ID_ADMIN)); > > client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1)); > > client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2)); > > > > clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN, > > PASSWORD_ADMIN)); > > client1.connect(getMqttConnectOptions(USERNAME, PASSWORD)); > > client2.connect(getMqttConnectOptions(USERNAME, PASSWORD)); > > System.out.println("waiting for messages"); > > client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#"); > > client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#"); > > clientAdmin.subscribe("#"); > > > > client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH", > > new MqttMessage("test".getBytes())); > > System.out.println("==="); > > client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH", > > new MqttMessage("test".getBytes())); > > System.out.println("==="); > > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + > > "/MQTT/APPS", new MqttMessage("test".getBytes())); > > System.out.println("==="); > > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + > > "/MQTT/APPS", new MqttMessage("test".getBytes())); > > System.out.println("==="); > > > > client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new > > MqttMessage("test".getBytes())); > > System.out.println("==="); > > client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new > > MqttMessage("test".getBytes())); > > System.out.println("==="); > > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", > > new MqttMessage("test".getBytes())); > > System.out.println("==="); > > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", > > new MqttMessage("test".getBytes())); > > System.out.println("==="); > > > > clientAdmin.disconnect(); > > client1.disconnect(); > > client2.disconnect(); > > } > > > > private static MqttConnectOptions getMqttConnectOptions(String > > username, String password) { > > MqttConnectOptions options = new MqttConnectOptions(); > > options.setCleanSession(true); > > options.setUserName(username); > > options.setPassword(password.toCharArray()); > > return options; > > } > > } > > > > class TestMqttClientCallback implements MqttCallback { > > > > private String clientId; > > > > TestMqttClientCallback(String clientId) { > > this.clientId = clientId; > > } > > > > @Override > > public void messageArrived(String topic, MqttMessage message) throws > > Exception { > > System.out.println("Client: " + clientId + " - Message arrived on > > topic: " + topic + " - message: " + new String(message.getPayload())); > > } > > > > @Override > > public void deliveryComplete(IMqttDeliveryToken token) { > > System.out.println("Client: " + clientId + " - Delivery > completed: > > " + token.getTopics()[0]); > > } > > > > @Override > > public void connectionLost(Throwable cause) { > > System.out.println("Client: " + clientId + " - Connection lost: " > > + cause.getMessage()); > > cause.printStackTrace(); > > } > > } > > >