zwcclub opened a new issue, #196:
URL: https://github.com/apache/rocketmq-mqtt/issues/196

   使用RocketMQ5.1.0+RocketMQ MQTT 
1.0.1版本进行消息推送,RocketMQ生产的消息主题分别为mqttconsumertopic/r1/和mqttconsumertopic/r/wc/,在MQTT中使用mqttconsumertopic/r1/和mqttconsumertopic/r/wc/订阅主题可以正常接收到消息,但是是使用mqttconsumertopic/#进行订阅时,无法收到订阅的消息信息。
   生产者代码:
   ```java
   public class RocketMQProducer {
       private static DefaultMQProducer producer;
       private static String firstTopic = "mqttconsumertopic";
       private static String recvClientId = "recv01";
   
       public static void main(String[] args) throws Exception {
           producer = new DefaultMQProducer("Rocket_MQ_Producer");
           producer.setNamesrvAddr("24.82.64.248:9876");
           producer.start();
           for (int i = 0; i < 1000; i++) {
               try {
                   sendMessage(i);
                   Thread.sleep(1000);
                   sendWithWildcardMessage(i);
                   Thread.sleep(1000);
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
           // Shut down once the producer instance is not longer in use.
           producer.shutdown();
       }
   
       private static void setLmq(Message msg, Set<String> queues) {
           msg.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, 
StringUtils.join(queues.stream().map(s -> StringUtils.replace(s, "/", 
"%")).map(s -> MixAll.LMQ_PREFIX + s).collect(Collectors.toSet()), 
MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
       }
   
       private static void sendMessage(int i) throws MQBrokerException, 
RemotingException, InterruptedException, MQClientException {
           Message msg = new Message(firstTopic, "MQ2MQTT", ("MQ_" + 
System.currentTimeMillis() + "_" + i).getBytes(StandardCharsets.UTF_8));
           String secondTopic = "/r1";
           System.out.println(TopicUtils.wrapLmq(firstTopic, secondTopic));
           setLmq(msg, new 
HashSet<>(Arrays.asList(TopicUtils.wrapLmq(firstTopic, secondTopic))));
           SendResult sendResult = producer.send(msg);
           System.out.println(now() + "sendMessage: " + new 
String(msg.getBody()));
       }
   
       private static void sendWithWildcardMessage(int i) throws 
MQBrokerException, RemotingException, InterruptedException, MQClientException {
           Message msg = new Message(firstTopic, "MQ2MQTT", ("MQwc_" + 
System.currentTimeMillis() + "_" + i).getBytes(StandardCharsets.UTF_8));
           String secondTopic = "/r/wc";
           Set<String> lmqSet = new HashSet<>();
           lmqSet.add(TopicUtils.wrapLmq(firstTopic, secondTopic));
           lmqSet.addAll(mapWildCardLmq(firstTopic, secondTopic));
           setLmq(msg, lmqSet);
           SendResult sendResult = producer.send(msg);
           System.out.println(now() + "sendWcMessage: " + new 
String(msg.getBody()));
       }
   
       private static Set<String> mapWildCardLmq(String firstTopic, String 
secondTopic) {
           System.out.println(TopicUtils.wrapLmq(firstTopic, secondTopic));
           // todo by yourself
           return new HashSet<>(Arrays.asList(TopicUtils.wrapLmq(firstTopic, 
secondTopic)));
       }
   
       private static String now() {
           SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss:SSS");
           return sf.format(new Date()) + "\t";
       }
   
   }
   ```
   消费者代码:
   ```java
   public class MqttConsumer {
       public static void main(String[] args) throws MqttException, 
NoSuchAlgorithmException, InvalidKeyException {
           String brokerUrl = "tcp://24.82.64.248:1883";
           String firstTopic = "mqttconsumertopic";
           MemoryPersistence memoryPersistence = new MemoryPersistence();
           String recvClientId = "test-mqtt-Consumer1";
           MqttConnectOptions mqttConnectOptions = 
buildMqttConnectOptions(recvClientId);
           MqttClient mqttClient = new MqttClient(brokerUrl, recvClientId, 
memoryPersistence);
           mqttClient.setTimeToWait(5000L);
           mqttClient.setCallback(new MqttCallbackExtended() {
               @Override
               public void connectComplete(boolean reconnect, String serverURI) 
{
                   System.out.println(recvClientId + " connect success to " + 
serverURI);
                   try {
                       final String topicFilter[] = {firstTopic + "/#"};
                       final int[] qos = {1};
                       mqttClient.subscribe(topicFilter, qos);
                   } catch (Exception e) {
                       e.printStackTrace();
                   }
               }
   
               @Override
               public void connectionLost(Throwable throwable) {
                   throwable.printStackTrace();
               }
   
               @Override
               public void messageArrived(String topic, MqttMessage 
mqttMessage) throws Exception {
                   try {
                       String payload = new String(mqttMessage.getPayload());
                       String[] ss = payload.split("_");
                       System.out.println(now() + "receive:" + topic + "," + 
payload);
                   } catch (Exception e) {
                       e.printStackTrace();
                   }
               }
   
               @Override
               public void deliveryComplete(IMqttDeliveryToken 
iMqttDeliveryToken) {
                   System.out.println(11);
               }
           });
   
           try {
               mqttClient.connect(mqttConnectOptions);
           } catch (Exception e) {
               e.printStackTrace();
               System.out.println("connect fail");
           }
       }
   
       private static MqttConnectOptions buildMqttConnectOptions(String 
clientId) throws NoSuchAlgorithmException, InvalidKeyException {
           MqttConnectOptions connOpts = new MqttConnectOptions();
           connOpts.setCleanSession(true);
           connOpts.setKeepAliveInterval(60);
           connOpts.setAutomaticReconnect(true);
           connOpts.setMaxInflight(10000);
           connOpts.setUserName("admin");
           connOpts.setPassword(HmacSHA1Util.macSignature(clientId, 
"123456").toCharArray());
           return connOpts;
       }
   
       private static String now() {
           SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss:SSS");
           return sf.format(new Date()) + "\t";
       }
   }
   ```
   RocketMQKV配置
   ```json
   {
        "configTable": {
                "LMQ": {
                        "LMQ_CONNECT_NODES": "24.82.64.248",
                        "test-topic": "test-topic/+",
                        "mqttconsumertopic": "mqttconsumertopic/#",
                        "clientRetryTopic": "clientRetryTopic/#",
                        "ALL_FIRST_TOPICS": 
"eventNotifyRetryTopic,clientRetryTopic,mqttconsumertopic",
                        "eventNotifyRetryTopic": "eventNotifyRetryTopic/#"
                }
        }
   }
   ```
   
![image](https://github.com/apache/rocketmq-mqtt/assets/65881251/56901a87-45df-46df-9c8c-0a006fe3eee5)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to