mobaijaavaer opened a new issue, #7242:
URL: https://github.com/apache/rocketmq/issues/7242

   ### Before Creating the Bug Report
   
   - [X] I found a bug, not just asking a question, which should be created in 
[GitHub Discussions](https://github.com/apache/rocketmq/discussions).
   
   - [X] I have searched the [GitHub 
Issues](https://github.com/apache/rocketmq/issues) and [GitHub 
Discussions](https://github.com/apache/rocketmq/discussions)  of this 
repository and believe that this is not a duplicate.
   
   - [X] I have confirmed that this bug belongs to the current repository, not 
other repositories of RocketMQ.
   
   
   ### Runtime platform environment
   
   Macos Montery 12.1(m1)
   
   ### RocketMQ version
   
   4.5.2
   
   ### JDK Version
   
   Oracle JDK 1.8
   
   ### Describe the Bug
   
   您好,我在使用Rocketmq 4.5.2版本客户端进行顺序消息消费场景测试时发现一个诡异的问题: 
顺序消息客户端刚启动时有极大概率存在几十秒的延迟时间,在这时间内无法拉取消息进行消费,以下是我的测试的代码:
   `java
   public static void main(String[] args) throws MQClientException, 
MQBrokerException, RemotingException, InterruptedException {
           SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss");
           String group = "";
           RPCHook aclRPCHook = new AclClientRPCHook(new 
SessionCredentials("ak","sk"));
           DefaultMQPushConsumer mqClient = new DefaultMQPushConsumer(group,
                   aclRPCHook,
                   new AllocateMessageQueueAveragely(), true, null);
           mqClient.setAccessChannel(AccessChannel.CLOUD);
           mqClient.setInstanceName("instanceId");
           mqClient.setNamesrvAddr("namesrc");
           mqClient.subscribe("topic", "*");
           mqClient.setConsumeTimeout(10000);
           mqClient.registerMessageListener(new MessageListenerOrderly() {
               @Override
               public ConsumeOrderlyStatus consumeMessage(List<MessageExt> 
msgs, ConsumeOrderlyContext context) {
                   MessageExt messageExt = msgs.get(0);
                   String time = new String(messageExt.getBody(), 
StandardCharsets.UTF_8);
                   long now = System.currentTimeMillis();
                   long duration = now - (Long.parseLong(time));
                   
System.out.println("消息id:"+messageExt.getMsgId()+",发送时间:"+simpleDateFormat.format(new
 Date(Long.parseLong(time)))+
                           ",存储时间:"+simpleDateFormat.format(new 
Date(messageExt.getStoreTimestamp()))+",客户端消费时间:"+simpleDateFormat.format(new 
Date(now))+",延时:"+duration);
                   return ConsumeOrderlyStatus.SUCCESS;
               }
           });
           mqClient.start();
           //生产者
           DefaultMQProducer mqProducer = new 
DefaultMQProducer("gaoding-message-producer",
                   new AclClientRPCHook(new 
SessionCredentials("LTAI4G4tf1j4TwH7c9BvkfaU","tN6B4fsNnEjZPCbINvnzL5T4RAna2T")),
                   true, null);
           mqProducer.setAccessChannel(AccessChannel.CLOUD);
           mqProducer.setInstanceName("instanceId");
           mqProducer.setNamesrvAddr("namesrc");
           mqProducer.start();
           //连续发送3条顺序消息
           for (int i = 0; i < 3; i++) {
               Message test = new 
Message("MQ_INST_125036_BXbcy8xH%pt-gaoding-message-test-order-dev",
                       "*",
                       "test",
                       (System.currentTimeMillis() + 
"").getBytes(StandardCharsets.UTF_8));
               test.putUserProperty("__SHARDINGKEY", "test");
               mqProducer.send(test, new MessageQueueSelector() {
                   @Override
                   public MessageQueue select(List<MessageQueue> mqs, Message 
msg, Object arg) {
                       int select = Math.abs(arg.hashCode());
                       if (select < 0) {
                           select = 0;
                       }
                       return mqs.get(select % mqs.size());
                   }
               }, "test");
           }
   
           Thread.sleep(10000L);
       }
   `
   这是控制台截图:
   
![image](https://github.com/apache/rocketmq/assets/121411673/b4324a76-ebe7-4830-a2ab-9a585c822c8a)
   
   可以发现broker端存储消息时间和发送时间几乎一致,而实际客户端消费时间存在较大延迟,所以排除掉是 生产者发送延迟的问题;
   
其次,通过断点调试发现,顺序消息消费会对当前客户端分配到的所有队列尝试向broker申请加锁,只有加锁成功的队列才会允许向broker发起消息拉取动作,目前上锁是由
 ConsumeMessageOrderlyService#start  
方法中一个定时线程去调用lockMQPeriodically方法,默认20s执行一次,通过在对应上锁代码中打印消息发现,原因系 
第一次启动时本地没有任何队列,所以请求直接返回了,需要等客户端rebalance成功后才有队列去申请上锁:
   
![image](https://github.com/apache/rocketmq/assets/121411673/b6ccebbe-a07b-4dd6-8d03-70a6c735218d)
   
![image](https://github.com/apache/rocketmq/assets/121411673/d09aacec-0502-4d9a-bed0-529995877f5c)
   
   
   
   ### Steps to Reproduce
   
   如上所述
   
   ### What Did You Expect to See?
   
   消费消息应该尽可能快,低延时
   
   ### What Did You See Instead?
   
   客户端延迟了几十秒才开始拉取消息
   
   ### Additional Context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to