powertoredstar opened a new issue, #1092:
URL: https://github.com/apache/rocketmq-clients/issues/1092

   ### Before Creating the Bug Report
   
   - [x] I found a bug, not just asking a question, which should be created in 
[GitHub Discussions](https://github.com/apache/rocketmq-clients/discussions).
   
   - [x] I have searched the [GitHub 
Issues](https://github.com/apache/rocketmq-clients/issues) and [GitHub 
Discussions](https://github.com/apache/rocketmq-clients/discussions)  of this 
repository and believe that this is not a duplicate.
   
   - [x] I have confirmed that this bug belongs to the current repository, not 
other repositories of RocketMQ.
   
   
   ### Programming Language of the Client
   
   Java
   
   ### Runtime Platform Environment
   
   Ubuntu 24.04.2 LTS
   
   ### RocketMQ Version of the Client/Server
   
   rocketmq vsersion:5.3.2
   rocketmq-client-java version:5.0.8
   
   ### Run or Compiler Version
   
   jdk21
   
   ### Describe the Bug
   
   When PushConsumers initialize,the consumption of FIFO message is out of 
order and not consumed in sequence
   
   ### Steps to Reproduce
   
   1.I create a topic named AI-CHAT-TOPIC(message type is FIFO)
   ```java
       @Test
       public void createTopic() throws MQClientException, MQBrokerException, 
RemotingException, InterruptedException {
           DefaultMQAdminExt admin = new DefaultMQAdminExt();
           admin.setNamesrvAddr("10.30.18.78:19876");
           admin.start();
           String clusterName = "DefaultCluster";
           String topicName = "AI-CHAT-TOPIC";
           Map<String, String> map = new HashMap<>();
           admin.deleteTopic(topicName,clusterName);
           map.put("+message.type", MessageType.FIFO.name());
           admin.createTopic(clusterName, topicName, 1, map);
           admin.shutdown();
       }
   ```
   
   2.I check the topic's type is FIFO
   ```java
   
       @Test
       public void getTopicMessageType() throws MQClientException, 
MQBrokerException, RemotingException, InterruptedException {
           DefaultMQAdminExt admin = new DefaultMQAdminExt();
           admin.setNamesrvAddr("10.30.18.78:19876");
           admin.start();
           String topicName = "AI-CHAT-TOPIC";
           String clusterName = "DefaultCluster";
           TopicRouteData topicRouteData = 
admin.examineTopicRouteInfo(topicName);
           for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
               for(String ip :brokerData.getBrokerAddrs().values()){
                   System.out.println(ip+" is "+brokerData.getBrokerName());
                   TopicConfig topicConfig = admin.examineTopicConfig(ip, 
topicName);
                   System.out.println("message 
type:"+topicConfig.getTopicMessageType());
               }
           }
           admin.shutdown();
       }
   
   ``` 
   
   <img width="668" height="244" alt="Image" 
src="https://github.com/user-attachments/assets/1603ff29-626e-44f2-bfbe-5d96b5bd09bd";
 />
   
   3. I start a producer to produce sequence messages which are sequenced by 
number. 
   ```java
    @Test
       public void test() throws ClientException {
           final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
   
           String tag = "CHAT";
           String clusterName = "DefaultCluster";
           String topicName = "AI-CHAT-TOPIC";
           String endpoints = "10.30.18.78:18081";
           // Credential provider is optional for client configuration.
           // This parameter is necessary only when the server ACL is enabled. 
Otherwise,
           // it does not need to be set by default.
           ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
                   .setEndpoints(endpoints)
                   // On some Windows platforms, you may encounter SSL 
compatibility issues. Try turning off the SSL option in
                   // client configuration to solve the problem please if SSL 
is not essential.
                   .enableSsl(false)
                   .build();
           final ProducerBuilder builder = provider.newProducerBuilder()
                   .setClientConfiguration(clientConfiguration)
                   // Set the topic name(s), which is optional but recommended. 
It makes producer could prefetch
                   // the topic route before message publishing.
                   .setTopics(topicName);
           Producer producer = builder.build();
           int count1 = 1;
           while(true){
               // Define your message body.
               String content = "seq:"+count1++;
               byte[] body = content.getBytes(StandardCharsets.UTF_8);
               final Message message = provider.newMessageBuilder()
                       .setTopic(topicName)
                       .setTag(tag)
                       .setKeys(UUID.randomUUID().toString())
                       .setBody(body)
                       .setMessageGroup("group1")
                       .build();
               try {
                   final SendReceipt sendReceipt = producer.send(message);
                   log.info("Send message successfully, messageId={}", content);
                   Thread.sleep(500);
               } catch (Throwable t) {
                   log.error("Failed to send message", t);
               }
   
           }
       }
   ```
   4. I start a pushConsumer to consumer the message
   ```
   @Test
       public void test() throws ClientException, InterruptedException, 
IOException {
           final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
           Lock lock = new ReentrantLock();
           String endpoints = "10.30.18.78:18081";
           ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
                   .setEndpoints(endpoints)
                   .build();
           String consumerGroup = "ai-chat-consumer";
           String tag = "*";
           FilterExpression filterExpression = new FilterExpression(tag, 
FilterExpressionType.TAG);
           String topic = "AI-CHAT-TOPIC";
           // In most case, you don't need to create too many consumers, 
singleton pattern is recommended.
           PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                   .setClientConfiguration(clientConfiguration)
                   // Set the consumer group name.
                   .setConsumerGroup(consumerGroup)
                   // Set the subscription for the consumer.
                   .setSubscriptionExpressions(Collections.singletonMap(topic, 
filterExpression))
                   .setMessageListener(message -> {
                       // Handle the received message and return consume result
                       ByteBuffer body = message.getBody();
                       // 正确反序列化消息内容
                       byte[] bytes = new byte[body.remaining()];
                       body.get(bytes);
                       String content = new String(bytes, 
StandardCharsets.UTF_8);
                       try {
                           lock.lockInterruptibly();
                           log.info("数据内容:" + content);
                       } catch (InterruptedException e) {
                           throw new RuntimeException(e);
                       }finally {
                           lock.unlock();
                       }
                       return ConsumeResult.SUCCESS;
                   })
                   .build();
           // Block the main thread, no need for production environment.
           Thread.sleep(Long.MAX_VALUE);
           // Close the push consumer when you don't need it anymore.
           // You could close it manually or add this into the JVM shutdown 
hook.
           pushConsumer.close();
       }
   ```
   5. I check the console and find that the first 30 message is unordered and 
it's only after that they become ordered.
   
   <img width="1825" height="586" alt="Image" 
src="https://github.com/user-attachments/assets/24e2c721-14a7-4671-ba77-e5a6810143f4";
 />
   
   <img width="1816" height="567" alt="Image" 
src="https://github.com/user-attachments/assets/5e93921f-2358-47cf-b8e4-20716bb42f31";
 />
   
   <img width="1895" height="576" alt="Image" 
src="https://github.com/user-attachments/assets/dfac6997-1c1b-4786-aa1a-7f3e12ae9a9d";
 />
   
   ### What Did You Expect to See?
   
   I expect to see that the Pushconsumer can consumer the FIFO message 
correctly when it  is initialized.
   
   ### What Did You See Instead?
   
   I check the console and find that the first 30 message is unordered and it's 
only after that they become ordered.
   
   ### Additional Context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to