CodingOX opened a new issue, #7999: URL: https://github.com/apache/rocketmq/issues/7999
### Before Creating the Bug Report - [X] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq/discussions). - [X] I have searched the [GitHub Issues](https://github.com/apache/rocketmq/issues) and [GitHub Discussions](https://github.com/apache/rocketmq/discussions) of this repository and believe that this is not a duplicate. - [X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Runtime platform environment Unbuntu 20.04 ### RocketMQ version RocketMQ 5.2 ### JDK Version JDK 21 ### Describe the Bug When attempting to consume messages in order using the RocketMQ 5 client with a 2 master no any slaves, it is observed that the consumer is unable to consume messages sequentially. ### Steps to Reproduce Producer Code, Write in Kotlin ``` import cn.hutool.core.lang.generator.SnowflakeGenerator import org.apache.rocketmq.client.apis.ClientConfiguration import org.apache.rocketmq.client.apis.ClientServiceProvider import org.slf4j.Logger import org.slf4j.LoggerFactory import java.util.concurrent.atomic.AtomicLong fun main() { val log: Logger = LoggerFactory.getLogger("OCP") val provider = ClientServiceProvider.loadService() val endpoints = "rmq5-broker-1.middle:8081;rmq5-broker-2.middle:8081" val clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .build() val topic = "FIFOTopic" val tag = "tag-2" val producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(clientConfiguration).build() val atomicLong = AtomicLong() val snowflakeGenerator = SnowflakeGenerator() val messageGroup = "OrderedGroup-1" try { val ts = System.currentTimeMillis() // repeat(30) { val id = snowflakeGenerator.next() val curIndex = atomicLong.incrementAndGet() val message = provider .newMessageBuilder() .setTag(tag) .setKeys(id.toString()) .setTopic(topic) .setMessageGroup(messageGroup) // Set MessageGroup For FIFO .setBody(("$ts - $curIndex").toByteArray()) .build() val receipt = producer.send(message) log.info("Send message: $curIndex - $id - ${receipt.messageId}") } } catch (ex: Exception) { log.error("Send message error", ex) } } ``` - The producer side seems to be correctly setting the message group for ensuring ordered message delivery. - Messages are being sent to the specified topic with message group `OrderedGroup-1`. Consumer Code ``` import cn.hutool.core.thread.ThreadUtil import org.apache.rocketmq.client.apis.ClientConfiguration import org.apache.rocketmq.client.apis.ClientServiceProvider import org.apache.rocketmq.client.apis.consumer.ConsumeResult import org.apache.rocketmq.client.apis.consumer.FilterExpression import org.apache.rocketmq.client.apis.consumer.FilterExpressionType import org.slf4j.Logger import org.slf4j.LoggerFactory import java.nio.charset.StandardCharsets import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.TimeUnit fun main() { val log: Logger = LoggerFactory.getLogger("ORC") val provider = ClientServiceProvider.loadService() val endpoints = "rmq5-broker-1.middle:8081;rmq5-broker-2.middle:8081" val clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .build() val topic = "FIFOTopic" val tag = "tag-2" val topicToTag = mapOf(topic to FilterExpression(tag, FilterExpressionType.TAG)) // val cq = ConcurrentLinkedQueue<String>() provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setSubscriptionExpressions(topicToTag) .setConsumerGroup("OrderConsumerGroupV3") .setMessageListener { val msg = StandardCharsets.UTF_8.decode(it!!.body) cq.offer(msg.toString()) ConsumeResult.SUCCESS }.build() ThreadUtil.sleep(10, TimeUnit.SECONDS) log.info(cq.joinToString("\n")) ThreadUtil.sleep(1, TimeUnit.HOURS) } ``` - The consumer side is using a `ConcurrentLinkedQueue` to store received messages temporarily for logging purposes. - Messages are being consumed from the topic `FIFOTopic` with tag `tag-2`. - After a waiting period of 10 seconds, the received messages are logged using `cq.joinToString("\n")`. ### What Did You Expect to See? Te messages should be consumed in the order they were sent. However, the consumer is not able to achieve this expected behavior. ### What Did You See Instead? Messages are not consumed in the order in which they were sent? ``` 16:55:02.686 [main] INFO ORC -- 1712134499102 - 1 1712134499102 - 2 1712134499102 - 3 1712134499102 - 4 1712134499102 - 6 1712134499102 - 5 1712134499102 - 7 1712134499102 - 10 1712134499102 - 11 1712134499102 - 12 1712134499102 - 9 1712134499102 - 8 1712134499102 - 13 1712134499102 - 14 1712134499102 - 15 1712134499102 - 16 1712134499102 - 17 1712134499102 - 21 1712134499102 - 18 1712134499102 - 19 1712134499102 - 22 1712134499102 - 23 1712134499102 - 20 1712134499102 - 24 1712134499102 - 25 1712134499102 - 26 1712134499102 - 27 1712134499102 - 28 1712134499102 - 29 1712134499102 - 30 ``` ### Additional Context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org