>From my understanding our previous subscription “#” Should be changed into 2 different subscriptions: “#” “$EDC/#”
Now the $EDC/# is not working due to ARTEMIS-3801 right? If so I think it’s not a big problem we should manage 2 different subscriptions instead of one. Thanks Da: Justin Bertram <jbert...@apache.org> Data: martedì, 26 aprile 2022 22:24 A: users@activemq.apache.org <users@activemq.apache.org> Oggetto: Re: Artemis 2.21+ - MQTT devices don’t receive the messages matching their subscriptions I believe I see why this is working for you in ActiveMQ "Classic." By default ActiveMQ "Classic" will block any messages published to an MQTT topic starting with '$' [1]. However, a configuration element named "publishDollarTopics" was added [2] a long time ago to assist "legacy MQTT applications" (presumably those migrating from 3.1 to 3.1.1 since the 3.1 spec makes no mention of topics starting with '$'). You are actually setting this parameter in Kapua [3]. A similar parameter could potentially be added to ActiveMQ Artemis, but it's been 8 years since MQTT 3.1.1 was released and 3 years since MQTT 5 was released. Any application written against MQTT 3.1 has had ample time to be updated, and it's not like this is a gray area in the spec. Both 3.1.1 and 5 specifications are very clear about how to treat topics that start with '$'. Is it reasonable to expect that Kapua would change to be spec compliant? Justin [1] https://github.com/apache/activemq/blob/59dfbc3302c43054d24a96e699b0eaf1b6d1bf98/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java#L159 [2] https://issues.apache.org/jira/browse/AMQ-5292 [3] https://github.com/eclipse/kapua/blob/master/org.eclipse.kapua.assembly/src/main/resources/conf/broker/activemq.xml#L434 On Tue, Apr 26, 2022 at 9:42 AM Justin Bertram <jbert...@apache.org> wrote: > > It’s a huge problem for us since in Kapua we are doing # subscriptions > (for internal components) and using $ as topic prefix (also Kura is using > this in its namespace) and worked perfectly with ActiveMQ 5.x. > > This use-case is clearly prohibited by the MQTT specification. It > specifically says: > > A subscription to “#” will not receive any messages published to a topic > beginning with a $ > > If this is working on ActiveMQ 5.x then ActiveMQ 5.x should be fixed to > enforce the MQTT specification appropriately. > > > I should evaluate the impact but I’m not so sure we can move to Artemis > with this limitation. > > I'm not sure I would categorize this as a "limitation" unless you believe > enforcing the specification is a limitation. > > For what it's worth the MQTT specification also says: > > Applications cannot use a topic with a leading $ character for their own > purposes > > At this point I recommend you change your application to conform with the > specification. > > > Justin > > On Tue, Apr 26, 2022 at 2:19 AM Modanese, Riccardo > <riccardo.modan...@eurotech.com.invalid> wrote: > >> It’s a huge problem for us since in Kapua we are doing # subscriptions >> (for internal components) and using $ as topic prefix (also Kura is using >> this in its namespace) and worked perfectly with ActiveMQ 5.x. >> >> I should evaluate the impact but I’m not so sure we can move to Artemis >> with this limitation. >> >> Riccardo >> >> Da: Justin Bertram <jbert...@apache.org> >> Data: martedì, 26 aprile 2022 00:18 >> A: users@activemq.apache.org <users@activemq.apache.org> >> Oggetto: Re: Artemis 2.21+ - MQTT devices don’t receive the messages >> matching their subscriptions >> I opened ARTEMIS-3801 [1] and sent a PR to fix what was broken. >> >> However, it's worth noting that your code will still not work as it did >> before because the previous behavior violated the MQTT 3.1.1 >> specification. >> As noted previously, both MQTT 3.1.1 and 5 specifications contain >> MQTT-4.7.2-1 which states: >> >> > The Server MUST NOT match Topic Filters starting with a wildcard >> character (# or +) with Topic Names beginning with a $ character. >> >> Previously your "test-client-admin" client was subscribing to `#` and >> receiving messages from topics beginning with `$`. This won't work >> anymore. >> >> >> Justin >> >> [1] https://issues.apache.org/jira/browse/ARTEMIS-3801 >> >> On Wed, Apr 20, 2022 at 4:29 AM Modanese, Riccardo >> <riccardo.modan...@eurotech.com.invalid> wrote: >> >> > >> > Hello, >> > switching from Artemis broker 2.20 to 2.21 we experienced an issue >> > about message delivering. >> > It looks like MQTT devices don’t receive the messages matching their >> > subscriptions. >> > >> > The test code (***) run with an Artemis broker 2.19 or 2.20 as target >> > (wildcard addresses modified as (**)) produces the correct output: >> > waiting for messages >> > Client: test-client-1 - Delivery completed: >> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH >> > Client: test-client-admin - Message arrived on topic: >> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test >> > Client: test-client-1 - Message arrived on topic: >> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test >> > === >> > Client: test-client-2 - Delivery completed: >> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH >> > Client: test-client-2 - Message arrived on topic: >> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test >> > Client: test-client-admin - Message arrived on topic: >> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test >> > === >> > Client: test-client-admin - Delivery completed: >> > $EDC/kapua-sys/test-client-1/MQTT/APPS >> > Client: test-client-admin - Message arrived on topic: >> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test >> > Client: test-client-1 - Message arrived on topic: >> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test >> > … >> > >> > With broker 2.21 or 2.22 (configuration changes described in (*) ) as >> > target the output is: >> > waiting for messages >> > === >> > Client: test-client-1 - Delivery completed: >> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH >> > === >> > Client: test-client-2 - Delivery completed: >> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH >> > === >> > Client: test-client-admin - Delivery completed: >> > $EDC/kapua-sys/test-client-1/MQTT/APPS >> > … >> > >> > So the broker doesn’t send any message to the clients. >> > >> > May be we missed to configure something needed by 2.21 versions onward? >> > >> > Regards, >> > Riccardo Modanese >> > >> > >> > >> > (*) The 2.21 and 2.22 default broker.xml configuration file has changed >> in >> > this way: >> > set the broker name (message-broker) >> > removed double connector bound to 1883 (the broker with the default >> > configuration crashed) >> > allow only MQTT protocol for connector bound to 1883 port >> > removed broadcast connector and configuration >> > added custom wildcard configuration (**) >> > >> > (**) <wildcard-addresses> >> > <routing-enabled>true</routing-enabled> >> > <delimiter>/</delimiter> >> > <any-words>#</any-words> >> > <single-word>+</single-word> >> > </wildcard-addresses> >> > >> > (***) >> > >> > >> /******************************************************************************* >> > * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others >> > * >> > * This program and the accompanying materials are made >> > * available under the terms of the Eclipse Public License 2.0 >> > * which is available at https://www.eclipse.org/legal/epl-2.0/ >> > * >> > * SPDX-License-Identifier: EPL-2.0 >> > * >> > * Contributors: >> > * Eurotech - initial API and implementation >> > >> > >> *******************************************************************************/ >> > package org.eclipse.kapua.qa.common; >> > >> > import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; >> > import org.eclipse.paho.client.mqttv3.MqttCallback; >> > import org.eclipse.paho.client.mqttv3.MqttClient; >> > import org.eclipse.paho.client.mqttv3.MqttConnectOptions; >> > import org.eclipse.paho.client.mqttv3.MqttException; >> > import org.eclipse.paho.client.mqttv3.MqttMessage; >> > import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; >> > import org.slf4j.Logger; >> > import org.slf4j.LoggerFactory; >> > >> > public class TestMqttClient { >> > >> > protected static Logger logger = >> > LoggerFactory.getLogger(TestMqttClient.class); >> > >> > private static final String SERVER_URI = "tcp://localhost:1883"; >> > private static final String CLIENT_ID_ADMIN = "test-client-admin"; >> > private static final String CLIENT_ID_1 = "test-client-1"; >> > private static final String CLIENT_ID_2 = "test-client-2"; >> > private static final String USERNAME = "kapua-broker"; >> > private static final String PASSWORD = "kapua-password"; >> > private static final String USERNAME_ADMIN = "kapua-sys"; >> > private static final String PASSWORD_ADMIN = "kapua-password"; >> > >> > private TestMqttClient() { >> > } >> > >> > public static void main(String argv[]) throws MqttException { >> > MqttClient clientAdmin = new MqttClient(SERVER_URI, >> > CLIENT_ID_ADMIN, new MemoryPersistence()); >> > MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new >> > MemoryPersistence()); >> > MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new >> > MemoryPersistence()); >> > clientAdmin.setCallback(new >> > TestMqttClientCallback(CLIENT_ID_ADMIN)); >> > client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1)); >> > client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2)); >> > >> > clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN, >> > PASSWORD_ADMIN)); >> > client1.connect(getMqttConnectOptions(USERNAME, PASSWORD)); >> > client2.connect(getMqttConnectOptions(USERNAME, PASSWORD)); >> > System.out.println("waiting for messages"); >> > client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#"); >> > client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#"); >> > clientAdmin.subscribe("#"); >> > >> > client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH", >> > new MqttMessage("test".getBytes())); >> > System.out.println("==="); >> > client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH", >> > new MqttMessage("test".getBytes())); >> > System.out.println("==="); >> > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + >> > "/MQTT/APPS", new MqttMessage("test".getBytes())); >> > System.out.println("==="); >> > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + >> > "/MQTT/APPS", new MqttMessage("test".getBytes())); >> > System.out.println("==="); >> > >> > client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", >> new >> > MqttMessage("test".getBytes())); >> > System.out.println("==="); >> > client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", >> new >> > MqttMessage("test".getBytes())); >> > System.out.println("==="); >> > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + >> "/MQTT/DC", >> > new MqttMessage("test".getBytes())); >> > System.out.println("==="); >> > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + >> "/MQTT/DC", >> > new MqttMessage("test".getBytes())); >> > System.out.println("==="); >> > >> > clientAdmin.disconnect(); >> > client1.disconnect(); >> > client2.disconnect(); >> > } >> > >> > private static MqttConnectOptions getMqttConnectOptions(String >> > username, String password) { >> > MqttConnectOptions options = new MqttConnectOptions(); >> > options.setCleanSession(true); >> > options.setUserName(username); >> > options.setPassword(password.toCharArray()); >> > return options; >> > } >> > } >> > >> > class TestMqttClientCallback implements MqttCallback { >> > >> > private String clientId; >> > >> > TestMqttClientCallback(String clientId) { >> > this.clientId = clientId; >> > } >> > >> > @Override >> > public void messageArrived(String topic, MqttMessage message) throws >> > Exception { >> > System.out.println("Client: " + clientId + " - Message arrived >> on >> > topic: " + topic + " - message: " + new String(message.getPayload())); >> > } >> > >> > @Override >> > public void deliveryComplete(IMqttDeliveryToken token) { >> > System.out.println("Client: " + clientId + " - Delivery >> completed: >> > " + token.getTopics()[0]); >> > } >> > >> > @Override >> > public void connectionLost(Throwable cause) { >> > System.out.println("Client: " + clientId + " - Connection lost: >> " >> > + cause.getMessage()); >> > cause.printStackTrace(); >> > } >> > } >> > >> >