> It’s a huge problem for us since in Kapua we are doing # subscriptions
(for internal components) and using $ as topic prefix (also Kura is using
this in its namespace) and worked perfectly with ActiveMQ 5.x.

This use-case is clearly prohibited by the MQTT specification. It
specifically says:

  A subscription to “#” will not receive any messages published to a topic
beginning with a $

If this is working on ActiveMQ 5.x then ActiveMQ 5.x should be fixed to
enforce the MQTT specification appropriately.

> I should evaluate the impact but I’m not so sure we can move to Artemis
with this limitation.

I'm not sure I would categorize this as a "limitation" unless you believe
enforcing the specification is a limitation.

For what it's worth the MQTT specification also says:

  Applications cannot use a topic with a leading $ character for their own
purposes

At this point I recommend you change your application to conform with the
specification.


Justin

On Tue, Apr 26, 2022 at 2:19 AM Modanese, Riccardo
<riccardo.modan...@eurotech.com.invalid> wrote:

> It’s a huge problem for us since in Kapua we are doing # subscriptions
> (for internal components) and using $ as topic prefix (also Kura is using
> this in its namespace) and worked perfectly with ActiveMQ 5.x.
>
> I should evaluate the impact but I’m not so sure we can move to Artemis
> with this limitation.
>
> Riccardo
>
> Da: Justin Bertram <jbert...@apache.org>
> Data: martedì, 26 aprile 2022 00:18
> A: users@activemq.apache.org <users@activemq.apache.org>
> Oggetto: Re: Artemis 2.21+ - MQTT devices don’t receive the messages
> matching their subscriptions
> I opened ARTEMIS-3801 [1] and sent a PR to fix what was broken.
>
> However, it's worth noting that your code will still not work as it did
> before because the previous behavior violated the MQTT 3.1.1 specification.
> As noted previously, both MQTT 3.1.1 and 5 specifications contain
> MQTT-4.7.2-1 which states:
>
> > The Server MUST NOT match Topic Filters starting with a wildcard
> character (# or +) with Topic Names beginning with a $ character.
>
> Previously your "test-client-admin" client was subscribing to `#` and
> receiving messages from topics beginning with `$`. This won't work anymore.
>
>
> Justin
>
> [1] https://issues.apache.org/jira/browse/ARTEMIS-3801
>
> On Wed, Apr 20, 2022 at 4:29 AM Modanese, Riccardo
> <riccardo.modan...@eurotech.com.invalid> wrote:
>
> >
> > Hello,
> >     switching from Artemis broker 2.20 to 2.21 we experienced an issue
> > about message delivering.
> > It looks like MQTT devices don’t receive the messages matching their
> > subscriptions.
> >
> > The test code (***) run with an Artemis broker 2.19 or 2.20 as target
> > (wildcard addresses modified as (**)) produces the correct output:
> > waiting for messages
> > Client: test-client-1 - Delivery completed:
> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> > Client: test-client-admin - Message arrived on topic:
> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> > Client: test-client-1 - Message arrived on topic:
> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> > ===
> > Client: test-client-2 - Delivery completed:
> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> > Client: test-client-2 - Message arrived on topic:
> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> > Client: test-client-admin - Message arrived on topic:
> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> > ===
> > Client: test-client-admin - Delivery completed:
> > $EDC/kapua-sys/test-client-1/MQTT/APPS
> > Client: test-client-admin - Message arrived on topic:
> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> > Client: test-client-1 - Message arrived on topic:
> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> > …
> >
> > With broker 2.21 or 2.22 (configuration changes described in (*) ) as
> > target the output is:
> > waiting for messages
> > ===
> > Client: test-client-1 - Delivery completed:
> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> > ===
> > Client: test-client-2 - Delivery completed:
> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> > ===
> > Client: test-client-admin - Delivery completed:
> > $EDC/kapua-sys/test-client-1/MQTT/APPS
> > …
> >
> > So the broker doesn’t send any message to the clients.
> >
> > May be we missed to configure something needed by 2.21 versions onward?
> >
> > Regards,
> > Riccardo Modanese
> >
> >
> >
> > (*) The 2.21 and 2.22 default broker.xml configuration file has changed
> in
> > this way:
> > set the broker name (message-broker)
> > removed double connector bound to 1883 (the broker with the default
> > configuration crashed)
> > allow only MQTT protocol for connector bound to 1883 port
> > removed broadcast connector and configuration
> > added custom wildcard configuration (**)
> >
> > (**) <wildcard-addresses>
> >          <routing-enabled>true</routing-enabled>
> >          <delimiter>/</delimiter>
> >           <any-words>#</any-words>
> >           <single-word>+</single-word>
> >       </wildcard-addresses>
> >
> > (***)
> >
> >
> /*******************************************************************************
> > * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
> > *
> > * This program and the accompanying materials are made
> > * available under the terms of the Eclipse Public License 2.0
> > * which is available at https://www.eclipse.org/legal/epl-2.0/
> > *
> > * SPDX-License-Identifier: EPL-2.0
> > *
> > * Contributors:
> > *     Eurotech - initial API and implementation
> >
> >
> *******************************************************************************/
> > package org.eclipse.kapua.qa.common;
> >
> > import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
> > import org.eclipse.paho.client.mqttv3.MqttCallback;
> > import org.eclipse.paho.client.mqttv3.MqttClient;
> > import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
> > import org.eclipse.paho.client.mqttv3.MqttException;
> > import org.eclipse.paho.client.mqttv3.MqttMessage;
> > import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
> > import org.slf4j.Logger;
> > import org.slf4j.LoggerFactory;
> >
> > public class TestMqttClient {
> >
> >     protected static Logger logger =
> > LoggerFactory.getLogger(TestMqttClient.class);
> >
> >     private static final String SERVER_URI = "tcp://localhost:1883";
> >     private static final String CLIENT_ID_ADMIN = "test-client-admin";
> >     private static final String CLIENT_ID_1 = "test-client-1";
> >     private static final String CLIENT_ID_2 = "test-client-2";
> >     private static final String USERNAME = "kapua-broker";
> >     private static final String PASSWORD = "kapua-password";
> >     private static final String USERNAME_ADMIN = "kapua-sys";
> >     private static final String PASSWORD_ADMIN = "kapua-password";
> >
> >     private TestMqttClient() {
> >     }
> >
> >     public static void main(String argv[]) throws MqttException {
> >         MqttClient clientAdmin = new MqttClient(SERVER_URI,
> > CLIENT_ID_ADMIN, new MemoryPersistence());
> >         MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new
> > MemoryPersistence());
> >         MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new
> > MemoryPersistence());
> >         clientAdmin.setCallback(new
> > TestMqttClientCallback(CLIENT_ID_ADMIN));
> >         client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1));
> >         client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2));
> >
> >         clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN,
> > PASSWORD_ADMIN));
> >         client1.connect(getMqttConnectOptions(USERNAME, PASSWORD));
> >         client2.connect(getMqttConnectOptions(USERNAME, PASSWORD));
> >         System.out.println("waiting for messages");
> >         client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#");
> >         client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#");
> >         clientAdmin.subscribe("#");
> >
> >         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH",
> > new MqttMessage("test".getBytes()));
> >         System.out.println("===");
> >         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH",
> > new MqttMessage("test".getBytes()));
> >         System.out.println("===");
> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> > "/MQTT/APPS", new MqttMessage("test".getBytes()));
> >         System.out.println("===");
> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> > "/MQTT/APPS", new MqttMessage("test".getBytes()));
> >         System.out.println("===");
> >
> >         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new
> > MqttMessage("test".getBytes()));
> >         System.out.println("===");
> >         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new
> > MqttMessage("test".getBytes()));
> >         System.out.println("===");
> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC",
> > new MqttMessage("test".getBytes()));
> >         System.out.println("===");
> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC",
> > new MqttMessage("test".getBytes()));
> >         System.out.println("===");
> >
> >         clientAdmin.disconnect();
> >         client1.disconnect();
> >         client2.disconnect();
> >     }
> >
> >     private static MqttConnectOptions getMqttConnectOptions(String
> > username, String password) {
> >         MqttConnectOptions options = new MqttConnectOptions();
> >         options.setCleanSession(true);
> >         options.setUserName(username);
> >         options.setPassword(password.toCharArray());
> >         return options;
> >     }
> > }
> >
> > class TestMqttClientCallback implements MqttCallback {
> >
> >     private String clientId;
> >
> >     TestMqttClientCallback(String clientId) {
> >         this.clientId = clientId;
> >     }
> >
> >     @Override
> >     public void messageArrived(String topic, MqttMessage message) throws
> > Exception {
> >         System.out.println("Client: " + clientId + " - Message arrived on
> > topic: " + topic + " - message: " + new String(message.getPayload()));
> >     }
> >
> >     @Override
> >     public void deliveryComplete(IMqttDeliveryToken token) {
> >         System.out.println("Client: " + clientId + " - Delivery
> completed:
> > " + token.getTopics()[0]);
> >     }
> >
> >     @Override
> >     public void connectionLost(Throwable cause) {
> >         System.out.println("Client: " + clientId + " - Connection lost: "
> > + cause.getMessage());
> >         cause.printStackTrace();
> >     }
> > }
> >
>

Reply via email to