>From my understanding our previous subscription
    “#”
Should be changed into 2 different subscriptions:
    “#”
    “$EDC/#”

Now the $EDC/# is not working due to ARTEMIS-3801 right?
If so I think it’s not a big problem we should manage 2 different subscriptions 
instead of one.

Thanks


Da: Justin Bertram <jbert...@apache.org>
Data: martedì, 26 aprile 2022 22:24
A: users@activemq.apache.org <users@activemq.apache.org>
Oggetto: Re: Artemis 2.21+ - MQTT devices don’t receive the messages matching 
their subscriptions
I believe I see why this is working for you in ActiveMQ "Classic."

By default ActiveMQ "Classic" will block any messages published to an MQTT
topic starting with '$' [1]. However, a configuration element named
"publishDollarTopics"
was added [2] a long time ago to assist "legacy MQTT applications"
(presumably those migrating from 3.1 to 3.1.1 since the 3.1 spec makes no
mention of topics starting with '$'). You are actually setting this
parameter in Kapua [3].

A similar parameter could potentially be added to ActiveMQ Artemis, but
it's been 8 years since MQTT 3.1.1 was released and 3 years since MQTT 5
was released. Any application written against MQTT 3.1 has had ample time
to be updated, and it's not like this is a gray area in the spec. Both
3.1.1 and 5 specifications are very clear about how to treat topics that
start with '$'.

Is it reasonable to expect that Kapua would change to be spec compliant?


Justin

[1]
https://github.com/apache/activemq/blob/59dfbc3302c43054d24a96e699b0eaf1b6d1bf98/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java#L159
[2] https://issues.apache.org/jira/browse/AMQ-5292
[3]
https://github.com/eclipse/kapua/blob/master/org.eclipse.kapua.assembly/src/main/resources/conf/broker/activemq.xml#L434

On Tue, Apr 26, 2022 at 9:42 AM Justin Bertram <jbert...@apache.org> wrote:

> > It’s a huge problem for us since in Kapua we are doing # subscriptions
> (for internal components) and using $ as topic prefix (also Kura is using
> this in its namespace) and worked perfectly with ActiveMQ 5.x.
>
> This use-case is clearly prohibited by the MQTT specification. It
> specifically says:
>
>   A subscription to “#” will not receive any messages published to a topic
> beginning with a $
>
> If this is working on ActiveMQ 5.x then ActiveMQ 5.x should be fixed to
> enforce the MQTT specification appropriately.
>
> > I should evaluate the impact but I’m not so sure we can move to Artemis
> with this limitation.
>
> I'm not sure I would categorize this as a "limitation" unless you believe
> enforcing the specification is a limitation.
>
> For what it's worth the MQTT specification also says:
>
>   Applications cannot use a topic with a leading $ character for their own
> purposes
>
> At this point I recommend you change your application to conform with the
> specification.
>
>
> Justin
>
> On Tue, Apr 26, 2022 at 2:19 AM Modanese, Riccardo
> <riccardo.modan...@eurotech.com.invalid> wrote:
>
>> It’s a huge problem for us since in Kapua we are doing # subscriptions
>> (for internal components) and using $ as topic prefix (also Kura is using
>> this in its namespace) and worked perfectly with ActiveMQ 5.x.
>>
>> I should evaluate the impact but I’m not so sure we can move to Artemis
>> with this limitation.
>>
>> Riccardo
>>
>> Da: Justin Bertram <jbert...@apache.org>
>> Data: martedì, 26 aprile 2022 00:18
>> A: users@activemq.apache.org <users@activemq.apache.org>
>> Oggetto: Re: Artemis 2.21+ - MQTT devices don’t receive the messages
>> matching their subscriptions
>> I opened ARTEMIS-3801 [1] and sent a PR to fix what was broken.
>>
>> However, it's worth noting that your code will still not work as it did
>> before because the previous behavior violated the MQTT 3.1.1
>> specification.
>> As noted previously, both MQTT 3.1.1 and 5 specifications contain
>> MQTT-4.7.2-1 which states:
>>
>> > The Server MUST NOT match Topic Filters starting with a wildcard
>> character (# or +) with Topic Names beginning with a $ character.
>>
>> Previously your "test-client-admin" client was subscribing to `#` and
>> receiving messages from topics beginning with `$`. This won't work
>> anymore.
>>
>>
>> Justin
>>
>> [1] https://issues.apache.org/jira/browse/ARTEMIS-3801
>>
>> On Wed, Apr 20, 2022 at 4:29 AM Modanese, Riccardo
>> <riccardo.modan...@eurotech.com.invalid> wrote:
>>
>> >
>> > Hello,
>> >     switching from Artemis broker 2.20 to 2.21 we experienced an issue
>> > about message delivering.
>> > It looks like MQTT devices don’t receive the messages matching their
>> > subscriptions.
>> >
>> > The test code (***) run with an Artemis broker 2.19 or 2.20 as target
>> > (wildcard addresses modified as (**)) produces the correct output:
>> > waiting for messages
>> > Client: test-client-1 - Delivery completed:
>> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH
>> > Client: test-client-admin - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
>> > Client: test-client-1 - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
>> > ===
>> > Client: test-client-2 - Delivery completed:
>> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH
>> > Client: test-client-2 - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
>> > Client: test-client-admin - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
>> > ===
>> > Client: test-client-admin - Delivery completed:
>> > $EDC/kapua-sys/test-client-1/MQTT/APPS
>> > Client: test-client-admin - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
>> > Client: test-client-1 - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
>> > …
>> >
>> > With broker 2.21 or 2.22 (configuration changes described in (*) ) as
>> > target the output is:
>> > waiting for messages
>> > ===
>> > Client: test-client-1 - Delivery completed:
>> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH
>> > ===
>> > Client: test-client-2 - Delivery completed:
>> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH
>> > ===
>> > Client: test-client-admin - Delivery completed:
>> > $EDC/kapua-sys/test-client-1/MQTT/APPS
>> > …
>> >
>> > So the broker doesn’t send any message to the clients.
>> >
>> > May be we missed to configure something needed by 2.21 versions onward?
>> >
>> > Regards,
>> > Riccardo Modanese
>> >
>> >
>> >
>> > (*) The 2.21 and 2.22 default broker.xml configuration file has changed
>> in
>> > this way:
>> > set the broker name (message-broker)
>> > removed double connector bound to 1883 (the broker with the default
>> > configuration crashed)
>> > allow only MQTT protocol for connector bound to 1883 port
>> > removed broadcast connector and configuration
>> > added custom wildcard configuration (**)
>> >
>> > (**) <wildcard-addresses>
>> >          <routing-enabled>true</routing-enabled>
>> >          <delimiter>/</delimiter>
>> >           <any-words>#</any-words>
>> >           <single-word>+</single-word>
>> >       </wildcard-addresses>
>> >
>> > (***)
>> >
>> >
>> /*******************************************************************************
>> > * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
>> > *
>> > * This program and the accompanying materials are made
>> > * available under the terms of the Eclipse Public License 2.0
>> > * which is available at https://www.eclipse.org/legal/epl-2.0/
>> > *
>> > * SPDX-License-Identifier: EPL-2.0
>> > *
>> > * Contributors:
>> > *     Eurotech - initial API and implementation
>> >
>> >
>> *******************************************************************************/
>> > package org.eclipse.kapua.qa.common;
>> >
>> > import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
>> > import org.eclipse.paho.client.mqttv3.MqttCallback;
>> > import org.eclipse.paho.client.mqttv3.MqttClient;
>> > import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
>> > import org.eclipse.paho.client.mqttv3.MqttException;
>> > import org.eclipse.paho.client.mqttv3.MqttMessage;
>> > import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
>> > import org.slf4j.Logger;
>> > import org.slf4j.LoggerFactory;
>> >
>> > public class TestMqttClient {
>> >
>> >     protected static Logger logger =
>> > LoggerFactory.getLogger(TestMqttClient.class);
>> >
>> >     private static final String SERVER_URI = "tcp://localhost:1883";
>> >     private static final String CLIENT_ID_ADMIN = "test-client-admin";
>> >     private static final String CLIENT_ID_1 = "test-client-1";
>> >     private static final String CLIENT_ID_2 = "test-client-2";
>> >     private static final String USERNAME = "kapua-broker";
>> >     private static final String PASSWORD = "kapua-password";
>> >     private static final String USERNAME_ADMIN = "kapua-sys";
>> >     private static final String PASSWORD_ADMIN = "kapua-password";
>> >
>> >     private TestMqttClient() {
>> >     }
>> >
>> >     public static void main(String argv[]) throws MqttException {
>> >         MqttClient clientAdmin = new MqttClient(SERVER_URI,
>> > CLIENT_ID_ADMIN, new MemoryPersistence());
>> >         MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new
>> > MemoryPersistence());
>> >         MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new
>> > MemoryPersistence());
>> >         clientAdmin.setCallback(new
>> > TestMqttClientCallback(CLIENT_ID_ADMIN));
>> >         client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1));
>> >         client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2));
>> >
>> >         clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN,
>> > PASSWORD_ADMIN));
>> >         client1.connect(getMqttConnectOptions(USERNAME, PASSWORD));
>> >         client2.connect(getMqttConnectOptions(USERNAME, PASSWORD));
>> >         System.out.println("waiting for messages");
>> >         client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#");
>> >         client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#");
>> >         clientAdmin.subscribe("#");
>> >
>> >         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH",
>> > new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH",
>> > new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
>> > "/MQTT/APPS", new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
>> > "/MQTT/APPS", new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >
>> >         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC",
>> new
>> > MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC",
>> new
>> > MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
>> "/MQTT/DC",
>> > new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 +
>> "/MQTT/DC",
>> > new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >
>> >         clientAdmin.disconnect();
>> >         client1.disconnect();
>> >         client2.disconnect();
>> >     }
>> >
>> >     private static MqttConnectOptions getMqttConnectOptions(String
>> > username, String password) {
>> >         MqttConnectOptions options = new MqttConnectOptions();
>> >         options.setCleanSession(true);
>> >         options.setUserName(username);
>> >         options.setPassword(password.toCharArray());
>> >         return options;
>> >     }
>> > }
>> >
>> > class TestMqttClientCallback implements MqttCallback {
>> >
>> >     private String clientId;
>> >
>> >     TestMqttClientCallback(String clientId) {
>> >         this.clientId = clientId;
>> >     }
>> >
>> >     @Override
>> >     public void messageArrived(String topic, MqttMessage message) throws
>> > Exception {
>> >         System.out.println("Client: " + clientId + " - Message arrived
>> on
>> > topic: " + topic + " - message: " + new String(message.getPayload()));
>> >     }
>> >
>> >     @Override
>> >     public void deliveryComplete(IMqttDeliveryToken token) {
>> >         System.out.println("Client: " + clientId + " - Delivery
>> completed:
>> > " + token.getTopics()[0]);
>> >     }
>> >
>> >     @Override
>> >     public void connectionLost(Throwable cause) {
>> >         System.out.println("Client: " + clientId + " - Connection lost:
>> "
>> > + cause.getMessage());
>> >         cause.printStackTrace();
>> >     }
>> > }
>> >
>>
>

Reply via email to