CodingOX opened a new issue, #7999:
URL: https://github.com/apache/rocketmq/issues/7999

   ### Before Creating the Bug Report
   
   - [X] I found a bug, not just asking a question, which should be created in 
[GitHub Discussions](https://github.com/apache/rocketmq/discussions).
   
   - [X] I have searched the [GitHub 
Issues](https://github.com/apache/rocketmq/issues) and [GitHub 
Discussions](https://github.com/apache/rocketmq/discussions)  of this 
repository and believe that this is not a duplicate.
   
   - [X] I have confirmed that this bug belongs to the current repository, not 
other repositories of RocketMQ.
   
   
   ### Runtime platform environment
   
   Unbuntu 20.04
   
   
   ### RocketMQ version
   
   RocketMQ 5.2
   
   ### JDK Version
   
   JDK 21
   
   ### Describe the Bug
   
   When attempting to consume messages in order using the RocketMQ 5 client 
with a 2 master no any slaves, it is observed that the consumer is unable to 
consume messages sequentially.
   
   ### Steps to Reproduce
   
   Producer Code, Write in Kotlin
   ```
   
   import cn.hutool.core.lang.generator.SnowflakeGenerator
   import org.apache.rocketmq.client.apis.ClientConfiguration
   import org.apache.rocketmq.client.apis.ClientServiceProvider
   import org.slf4j.Logger
   import org.slf4j.LoggerFactory
   import java.util.concurrent.atomic.AtomicLong
   
   fun main() {
   
       val log: Logger = LoggerFactory.getLogger("OCP")
       val provider = ClientServiceProvider.loadService()
   
       val endpoints = "rmq5-broker-1.middle:8081;rmq5-broker-2.middle:8081"
       val clientConfiguration = ClientConfiguration.newBuilder()
           .setEndpoints(endpoints)
           .build()
   
       val topic = "FIFOTopic"
       val tag = "tag-2"
   
       val producer = 
provider.newProducerBuilder().setTopics(topic).setClientConfiguration(clientConfiguration).build()
   
       val atomicLong = AtomicLong()
       val snowflakeGenerator = SnowflakeGenerator()
       val messageGroup = "OrderedGroup-1"
       try {
           val ts = System.currentTimeMillis()
                // 
           repeat(30) {
               val id = snowflakeGenerator.next()
               val curIndex = atomicLong.incrementAndGet()
               val message = provider
                   .newMessageBuilder()
                   .setTag(tag)
                   .setKeys(id.toString())
                   .setTopic(topic)
                   .setMessageGroup(messageGroup) // Set MessageGroup For FIFO
                   .setBody(("$ts - $curIndex").toByteArray())
                   .build()
   
               val receipt = producer.send(message)
   
               log.info("Send message: $curIndex - $id - ${receipt.messageId}")
           }
       } catch (ex: Exception) {
           log.error("Send message error", ex)
       }
   }
   ```
      - The producer side seems to be correctly setting the message group for 
ensuring ordered message delivery.
      - Messages are being sent to the specified topic with message group 
`OrderedGroup-1`.
   
   
   Consumer Code
   ```
   
   import cn.hutool.core.thread.ThreadUtil
   import org.apache.rocketmq.client.apis.ClientConfiguration
   import org.apache.rocketmq.client.apis.ClientServiceProvider
   import org.apache.rocketmq.client.apis.consumer.ConsumeResult
   import org.apache.rocketmq.client.apis.consumer.FilterExpression
   import org.apache.rocketmq.client.apis.consumer.FilterExpressionType
   import org.slf4j.Logger
   import org.slf4j.LoggerFactory
   import java.nio.charset.StandardCharsets
   import java.util.concurrent.ConcurrentLinkedQueue
   import java.util.concurrent.TimeUnit
   
   
   fun main() {
       val log: Logger = LoggerFactory.getLogger("ORC")
   
       val provider = ClientServiceProvider.loadService()
       val endpoints = "rmq5-broker-1.middle:8081;rmq5-broker-2.middle:8081"
       val clientConfiguration = ClientConfiguration.newBuilder()
           .setEndpoints(endpoints)
           .build()
   
       val topic = "FIFOTopic"
       val tag = "tag-2"
       val topicToTag = mapOf(topic to FilterExpression(tag, 
FilterExpressionType.TAG))
        
        // 
       val cq = ConcurrentLinkedQueue<String>()
   
       provider.newPushConsumerBuilder()
           .setClientConfiguration(clientConfiguration)
           .setSubscriptionExpressions(topicToTag)
           .setConsumerGroup("OrderConsumerGroupV3")
           .setMessageListener {
               val msg = StandardCharsets.UTF_8.decode(it!!.body)
               cq.offer(msg.toString())
               ConsumeResult.SUCCESS
           }.build()
   
       ThreadUtil.sleep(10, TimeUnit.SECONDS)
   
       log.info(cq.joinToString("\n"))
   
       ThreadUtil.sleep(1, TimeUnit.HOURS)
   }
   
   ```
      - The consumer side is using a `ConcurrentLinkedQueue` to store received 
messages temporarily for logging purposes.
      - Messages are being consumed from the topic `FIFOTopic` with tag `tag-2`.
      - After a waiting period of 10 seconds, the received messages are logged 
using `cq.joinToString("\n")`.
   
   
   
   ### What Did You Expect to See?
   
   Te messages should be consumed in the order they were sent. However, the 
consumer is not able to achieve this expected behavior.
   
   ### What Did You See Instead?
   
   Messages are not consumed in the order in which they were sent?
   
   ```
   16:55:02.686 [main] INFO ORC -- 1712134499102 - 1
   1712134499102 - 2
   1712134499102 - 3
   1712134499102 - 4
   1712134499102 - 6
   1712134499102 - 5
   1712134499102 - 7
   1712134499102 - 10
   1712134499102 - 11
   1712134499102 - 12
   1712134499102 - 9
   1712134499102 - 8
   1712134499102 - 13
   1712134499102 - 14
   1712134499102 - 15
   1712134499102 - 16
   1712134499102 - 17
   1712134499102 - 21
   1712134499102 - 18
   1712134499102 - 19
   1712134499102 - 22
   1712134499102 - 23
   1712134499102 - 20
   1712134499102 - 24
   1712134499102 - 25
   1712134499102 - 26
   1712134499102 - 27
   1712134499102 - 28
   1712134499102 - 29
   1712134499102 - 30
   
   ```
   
   ### Additional Context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to