powertoredstar opened a new issue, #1092: URL: https://github.com/apache/rocketmq-clients/issues/1092
### Before Creating the Bug Report - [x] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq-clients/discussions). - [x] I have searched the [GitHub Issues](https://github.com/apache/rocketmq-clients/issues) and [GitHub Discussions](https://github.com/apache/rocketmq-clients/discussions) of this repository and believe that this is not a duplicate. - [x] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Programming Language of the Client Java ### Runtime Platform Environment Ubuntu 24.04.2 LTS ### RocketMQ Version of the Client/Server rocketmq vsersion:5.3.2 rocketmq-client-java version:5.0.8 ### Run or Compiler Version jdk21 ### Describe the Bug When PushConsumers initialize,the consumption of FIFO message is out of order and not consumed in sequence ### Steps to Reproduce 1.I create a topic named AI-CHAT-TOPIC(message type is FIFO) ```java @Test public void createTopic() throws MQClientException, MQBrokerException, RemotingException, InterruptedException { DefaultMQAdminExt admin = new DefaultMQAdminExt(); admin.setNamesrvAddr("10.30.18.78:19876"); admin.start(); String clusterName = "DefaultCluster"; String topicName = "AI-CHAT-TOPIC"; Map<String, String> map = new HashMap<>(); admin.deleteTopic(topicName,clusterName); map.put("+message.type", MessageType.FIFO.name()); admin.createTopic(clusterName, topicName, 1, map); admin.shutdown(); } ``` 2.I check the topic's type is FIFO ```java @Test public void getTopicMessageType() throws MQClientException, MQBrokerException, RemotingException, InterruptedException { DefaultMQAdminExt admin = new DefaultMQAdminExt(); admin.setNamesrvAddr("10.30.18.78:19876"); admin.start(); String topicName = "AI-CHAT-TOPIC"; String clusterName = "DefaultCluster"; TopicRouteData topicRouteData = admin.examineTopicRouteInfo(topicName); for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { for(String ip :brokerData.getBrokerAddrs().values()){ System.out.println(ip+" is "+brokerData.getBrokerName()); TopicConfig topicConfig = admin.examineTopicConfig(ip, topicName); System.out.println("message type:"+topicConfig.getTopicMessageType()); } } admin.shutdown(); } ``` <img width="668" height="244" alt="Image" src="https://github.com/user-attachments/assets/1603ff29-626e-44f2-bfbe-5d96b5bd09bd" /> 3. I start a producer to produce sequence messages which are sequenced by number. ```java @Test public void test() throws ClientException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String tag = "CHAT"; String clusterName = "DefaultCluster"; String topicName = "AI-CHAT-TOPIC"; String endpoints = "10.30.18.78:18081"; // Credential provider is optional for client configuration. // This parameter is necessary only when the server ACL is enabled. Otherwise, // it does not need to be set by default. ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) // On some Windows platforms, you may encounter SSL compatibility issues. Try turning off the SSL option in // client configuration to solve the problem please if SSL is not essential. .enableSsl(false) .build(); final ProducerBuilder builder = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) // Set the topic name(s), which is optional but recommended. It makes producer could prefetch // the topic route before message publishing. .setTopics(topicName); Producer producer = builder.build(); int count1 = 1; while(true){ // Define your message body. String content = "seq:"+count1++; byte[] body = content.getBytes(StandardCharsets.UTF_8); final Message message = provider.newMessageBuilder() .setTopic(topicName) .setTag(tag) .setKeys(UUID.randomUUID().toString()) .setBody(body) .setMessageGroup("group1") .build(); try { final SendReceipt sendReceipt = producer.send(message); log.info("Send message successfully, messageId={}", content); Thread.sleep(500); } catch (Throwable t) { log.error("Failed to send message", t); } } } ``` 4. I start a pushConsumer to consumer the message ``` @Test public void test() throws ClientException, InterruptedException, IOException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); Lock lock = new ReentrantLock(); String endpoints = "10.30.18.78:18081"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .build(); String consumerGroup = "ai-chat-consumer"; String tag = "*"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); String topic = "AI-CHAT-TOPIC"; // In most case, you don't need to create too many consumers, singleton pattern is recommended. PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // Set the consumer group name. .setConsumerGroup(consumerGroup) // Set the subscription for the consumer. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(message -> { // Handle the received message and return consume result ByteBuffer body = message.getBody(); // 正确反序列化消息内容 byte[] bytes = new byte[body.remaining()]; body.get(bytes); String content = new String(bytes, StandardCharsets.UTF_8); try { lock.lockInterruptibly(); log.info("数据内容:" + content); } catch (InterruptedException e) { throw new RuntimeException(e); }finally { lock.unlock(); } return ConsumeResult.SUCCESS; }) .build(); // Block the main thread, no need for production environment. Thread.sleep(Long.MAX_VALUE); // Close the push consumer when you don't need it anymore. // You could close it manually or add this into the JVM shutdown hook. pushConsumer.close(); } ``` 5. I check the console and find that the first 30 message is unordered and it's only after that they become ordered. <img width="1825" height="586" alt="Image" src="https://github.com/user-attachments/assets/24e2c721-14a7-4671-ba77-e5a6810143f4" /> <img width="1816" height="567" alt="Image" src="https://github.com/user-attachments/assets/5e93921f-2358-47cf-b8e4-20716bb42f31" /> <img width="1895" height="576" alt="Image" src="https://github.com/user-attachments/assets/dfac6997-1c1b-4786-aa1a-7f3e12ae9a9d" /> ### What Did You Expect to See? I expect to see that the Pushconsumer can consumer the FIFO message correctly when it is initialized. ### What Did You See Instead? I check the console and find that the first 30 message is unordered and it's only after that they become ordered. ### Additional Context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
