> Now the $EDC/# is not working due to ARTEMIS-3801 right? I believe that is the case, yes.
Justin On Fri, Apr 29, 2022 at 2:40 AM Modanese, Riccardo <riccardo.modan...@eurotech.com.invalid> wrote: > From my understanding our previous subscription > ?#? > Should be changed into 2 different subscriptions: > ?#? > ?$EDC/#? > > Now the $EDC/# is not working due to ARTEMIS-3801 right? > If so I think it?s not a big problem we should manage 2 different > subscriptions instead of one. > > Thanks > > > Da: Justin Bertram <jbert...@apache.org> > Data: marted?, 26 aprile 2022 22:24 > A: users@activemq.apache.org <users@activemq.apache.org> > Oggetto: Re: Artemis 2.21+ - MQTT devices don?t receive the messages > matching their subscriptions > I believe I see why this is working for you in ActiveMQ "Classic." > > By default ActiveMQ "Classic" will block any messages published to an MQTT > topic starting with '$' [1]. However, a configuration element named > "publishDollarTopics" > was added [2] a long time ago to assist "legacy MQTT applications" > (presumably those migrating from 3.1 to 3.1.1 since the 3.1 spec makes no > mention of topics starting with '$'). You are actually setting this > parameter in Kapua [3]. > > A similar parameter could potentially be added to ActiveMQ Artemis, but > it's been 8 years since MQTT 3.1.1 was released and 3 years since MQTT 5 > was released. Any application written against MQTT 3.1 has had ample time > to be updated, and it's not like this is a gray area in the spec. Both > 3.1.1 and 5 specifications are very clear about how to treat topics that > start with '$'. > > Is it reasonable to expect that Kapua would change to be spec compliant? > > > Justin > > [1] > > https://github.com/apache/activemq/blob/59dfbc3302c43054d24a96e699b0eaf1b6d1bf98/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java#L159 > [2] https://issues.apache.org/jira/browse/AMQ-5292 > [3] > > https://github.com/eclipse/kapua/blob/master/org.eclipse.kapua.assembly/src/main/resources/conf/broker/activemq.xml#L434 > > On Tue, Apr 26, 2022 at 9:42 AM Justin Bertram <jbert...@apache.org> > wrote: > > > > It?s a huge problem for us since in Kapua we are doing # subscriptions > > (for internal components) and using $ as topic prefix (also Kura is using > > this in its namespace) and worked perfectly with ActiveMQ 5.x. > > > > This use-case is clearly prohibited by the MQTT specification. It > > specifically says: > > > > A subscription to ?#? will not receive any messages published to a > topic > > beginning with a $ > > > > If this is working on ActiveMQ 5.x then ActiveMQ 5.x should be fixed to > > enforce the MQTT specification appropriately. > > > > > I should evaluate the impact but I?m not so sure we can move to Artemis > > with this limitation. > > > > I'm not sure I would categorize this as a "limitation" unless you believe > > enforcing the specification is a limitation. > > > > For what it's worth the MQTT specification also says: > > > > Applications cannot use a topic with a leading $ character for their > own > > purposes > > > > At this point I recommend you change your application to conform with the > > specification. > > > > > > Justin > > > > On Tue, Apr 26, 2022 at 2:19 AM Modanese, Riccardo > > <riccardo.modan...@eurotech.com.invalid> wrote: > > > >> It?s a huge problem for us since in Kapua we are doing # subscriptions > >> (for internal components) and using $ as topic prefix (also Kura is > using > >> this in its namespace) and worked perfectly with ActiveMQ 5.x. > >> > >> I should evaluate the impact but I?m not so sure we can move to Artemis > >> with this limitation. > >> > >> Riccardo > >> > >> Da: Justin Bertram <jbert...@apache.org> > >> Data: marted?, 26 aprile 2022 00:18 > >> A: users@activemq.apache.org <users@activemq.apache.org> > >> Oggetto: Re: Artemis 2.21+ - MQTT devices don?t receive the messages > >> matching their subscriptions > >> I opened ARTEMIS-3801 [1] and sent a PR to fix what was broken. > >> > >> However, it's worth noting that your code will still not work as it did > >> before because the previous behavior violated the MQTT 3.1.1 > >> specification. > >> As noted previously, both MQTT 3.1.1 and 5 specifications contain > >> MQTT-4.7.2-1 which states: > >> > >> > The Server MUST NOT match Topic Filters starting with a wildcard > >> character (# or +) with Topic Names beginning with a $ character. > >> > >> Previously your "test-client-admin" client was subscribing to `#` and > >> receiving messages from topics beginning with `$`. This won't work > >> anymore. > >> > >> > >> Justin > >> > >> [1] https://issues.apache.org/jira/browse/ARTEMIS-3801 > >> > >> On Wed, Apr 20, 2022 at 4:29 AM Modanese, Riccardo > >> <riccardo.modan...@eurotech.com.invalid> wrote: > >> > >> > > >> > Hello, > >> > switching from Artemis broker 2.20 to 2.21 we experienced an issue > >> > about message delivering. > >> > It looks like MQTT devices don?t receive the messages matching their > >> > subscriptions. > >> > > >> > The test code (***) run with an Artemis broker 2.19 or 2.20 as target > >> > (wildcard addresses modified as (**)) produces the correct output: > >> > waiting for messages > >> > Client: test-client-1 - Delivery completed: > >> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH > >> > Client: test-client-admin - Message arrived on topic: > >> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test > >> > Client: test-client-1 - Message arrived on topic: > >> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test > >> > === > >> > Client: test-client-2 - Delivery completed: > >> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH > >> > Client: test-client-2 - Message arrived on topic: > >> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test > >> > Client: test-client-admin - Message arrived on topic: > >> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test > >> > === > >> > Client: test-client-admin - Delivery completed: > >> > $EDC/kapua-sys/test-client-1/MQTT/APPS > >> > Client: test-client-admin - Message arrived on topic: > >> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test > >> > Client: test-client-1 - Message arrived on topic: > >> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test > >> > ? > >> > > >> > With broker 2.21 or 2.22 (configuration changes described in (*) ) as > >> > target the output is: > >> > waiting for messages > >> > === > >> > Client: test-client-1 - Delivery completed: > >> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH > >> > === > >> > Client: test-client-2 - Delivery completed: > >> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH > >> > === > >> > Client: test-client-admin - Delivery completed: > >> > $EDC/kapua-sys/test-client-1/MQTT/APPS > >> > ? > >> > > >> > So the broker doesn?t send any message to the clients. > >> > > >> > May be we missed to configure something needed by 2.21 versions > onward? > >> > > >> > Regards, > >> > Riccardo Modanese > >> > > >> > > >> > > >> > (*) The 2.21 and 2.22 default broker.xml configuration file has > changed > >> in > >> > this way: > >> > set the broker name (message-broker) > >> > removed double connector bound to 1883 (the broker with the default > >> > configuration crashed) > >> > allow only MQTT protocol for connector bound to 1883 port > >> > removed broadcast connector and configuration > >> > added custom wildcard configuration (**) > >> > > >> > (**) <wildcard-addresses> > >> > <routing-enabled>true</routing-enabled> > >> > <delimiter>/</delimiter> > >> > <any-words>#</any-words> > >> > <single-word>+</single-word> > >> > </wildcard-addresses> > >> > > >> > (***) > >> > > >> > > >> > /******************************************************************************* > >> > * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others > >> > * > >> > * This program and the accompanying materials are made > >> > * available under the terms of the Eclipse Public License 2.0 > >> > * which is available at https://www.eclipse.org/legal/epl-2.0/ > >> > * > >> > * SPDX-License-Identifier: EPL-2.0 > >> > * > >> > * Contributors: > >> > * Eurotech - initial API and implementation > >> > > >> > > >> > *******************************************************************************/ > >> > package org.eclipse.kapua.qa.common; > >> > > >> > import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; > >> > import org.eclipse.paho.client.mqttv3.MqttCallback; > >> > import org.eclipse.paho.client.mqttv3.MqttClient; > >> > import org.eclipse.paho.client.mqttv3.MqttConnectOptions; > >> > import org.eclipse.paho.client.mqttv3.MqttException; > >> > import org.eclipse.paho.client.mqttv3.MqttMessage; > >> > import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; > >> > import org.slf4j.Logger; > >> > import org.slf4j.LoggerFactory; > >> > > >> > public class TestMqttClient { > >> > > >> > protected static Logger logger = > >> > LoggerFactory.getLogger(TestMqttClient.class); > >> > > >> > private static final String SERVER_URI = "tcp://localhost:1883"; > >> > private static final String CLIENT_ID_ADMIN = "test-client-admin"; > >> > private static final String CLIENT_ID_1 = "test-client-1"; > >> > private static final String CLIENT_ID_2 = "test-client-2"; > >> > private static final String USERNAME = "kapua-broker"; > >> > private static final String PASSWORD = "kapua-password"; > >> > private static final String USERNAME_ADMIN = "kapua-sys"; > >> > private static final String PASSWORD_ADMIN = "kapua-password"; > >> > > >> > private TestMqttClient() { > >> > } > >> > > >> > public static void main(String argv[]) throws MqttException { > >> > MqttClient clientAdmin = new MqttClient(SERVER_URI, > >> > CLIENT_ID_ADMIN, new MemoryPersistence()); > >> > MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, > new > >> > MemoryPersistence()); > >> > MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, > new > >> > MemoryPersistence()); > >> > clientAdmin.setCallback(new > >> > TestMqttClientCallback(CLIENT_ID_ADMIN)); > >> > client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1)); > >> > client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2)); > >> > > >> > clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN, > >> > PASSWORD_ADMIN)); > >> > client1.connect(getMqttConnectOptions(USERNAME, PASSWORD)); > >> > client2.connect(getMqttConnectOptions(USERNAME, PASSWORD)); > >> > System.out.println("waiting for messages"); > >> > client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#"); > >> > client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#"); > >> > clientAdmin.subscribe("#"); > >> > > >> > client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + > "/MQTT/BIRTH", > >> > new MqttMessage("test".getBytes())); > >> > System.out.println("==="); > >> > client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + > "/MQTT/BIRTH", > >> > new MqttMessage("test".getBytes())); > >> > System.out.println("==="); > >> > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + > >> > "/MQTT/APPS", new MqttMessage("test".getBytes())); > >> > System.out.println("==="); > >> > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + > >> > "/MQTT/APPS", new MqttMessage("test".getBytes())); > >> > System.out.println("==="); > >> > > >> > client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", > >> new > >> > MqttMessage("test".getBytes())); > >> > System.out.println("==="); > >> > client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", > >> new > >> > MqttMessage("test".getBytes())); > >> > System.out.println("==="); > >> > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + > >> "/MQTT/DC", > >> > new MqttMessage("test".getBytes())); > >> > System.out.println("==="); > >> > clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + > >> "/MQTT/DC", > >> > new MqttMessage("test".getBytes())); > >> > System.out.println("==="); > >> > > >> > clientAdmin.disconnect(); > >> > client1.disconnect(); > >> > client2.disconnect(); > >> > } > >> > > >> > private static MqttConnectOptions getMqttConnectOptions(String > >> > username, String password) { > >> > MqttConnectOptions options = new MqttConnectOptions(); > >> > options.setCleanSession(true); > >> > options.setUserName(username); > >> > options.setPassword(password.toCharArray()); > >> > return options; > >> > } > >> > } > >> > > >> > class TestMqttClientCallback implements MqttCallback { > >> > > >> > private String clientId; > >> > > >> > TestMqttClientCallback(String clientId) { > >> > this.clientId = clientId; > >> > } > >> > > >> > @Override > >> > public void messageArrived(String topic, MqttMessage message) > throws > >> > Exception { > >> > System.out.println("Client: " + clientId + " - Message arrived > >> on > >> > topic: " + topic + " - message: " + new String(message.getPayload())); > >> > } > >> > > >> > @Override > >> > public void deliveryComplete(IMqttDeliveryToken token) { > >> > System.out.println("Client: " + clientId + " - Delivery > >> completed: > >> > " + token.getTopics()[0]); > >> > } > >> > > >> > @Override > >> > public void connectionLost(Throwable cause) { > >> > System.out.println("Client: " + clientId + " - Connection > lost: > >> " > >> > + cause.getMessage()); > >> > cause.printStackTrace(); > >> > } > >> > } > >> > > >> > > >