RapperCL opened a new issue, #8356: URL: https://github.com/apache/rocketmq/issues/8356
### Before Creating the Bug Report - [X] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq/discussions). - [X] I have searched the [GitHub Issues](https://github.com/apache/rocketmq/issues) and [GitHub Discussions](https://github.com/apache/rocketmq/discussions) of this repository and believe that this is not a duplicate. - [X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Runtime platform environment windows ### RocketMQ version develop ### JDK Version jdk 1.8 ### Describe the Bug issue: pull消费模式下有序消费时,线程数无法扩充到最大线程数 背景: 结合第一篇的描述可以看到相关问题前提;这里的问题同样是线程数无法扩充到最大线程数 问题: rocketmq中为了保证有序消费,在消费任务中,以队列分区的粒度进行了加锁,并在run方法中拉取当前分区前面的消息进行消费,保证了分区的有序消费。 为了避免竞争,在对拉取的消息进行消费之前,会判断当前分区是否已经在进行消费了,如果已经在进行消费了,就不会继续交给线程池消费了。 这样其实可以发现,线程池中的任务一直会比较少,应该会<= 当前消费者所定义的分区数。 所以这种模式下,最大并行度应该也是<= 当前分区数。 这样一来,就不能采用和前面一样的方式了,需要区别对待。 org.apache.rocketmq.client.impl.consumer.ConsumeMessagePopConcurrentlyService 方案: 以分区的维度来动态调整线程池核心线程数的大小 参数: consumeMessageBatchMaxSize = 1 pullBatchSize = 32 pullThresholdSizeForQueue = 1000 consumeThreadMin=4 consumeThreadMax=10 订阅主题数=5, 单个主题分区分配数=2 以上面的参数为例,消费者实例分配到的分区数=10,按照当前的逻辑,线程数最多扩充到4,此时最大的并行度为4,其实并行度可以达到10的。 由于主题分区数会动态变化的,所以我们可以考虑通过定时任务更新线程池核心线程数,由于变更频率很低,定时任务可能存在大量的无效执行,所以也可以考虑添加变更后置事件:rebalance之后,调用后置事件,完成核心线程数的变更。 正好rebalance已经提供了后置处理接口, org.apache.rocketmq.client.impl.consumer.RebalanceImpl#messageQueueChanged 于是调整逻辑: 调整核心线程数=当前分配的主题分区数 this.defaultMQPushConsumer.setMessageQueueListener(new MessageQueueListener() { @Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqAssigned) { int queueSize = defaultMQPushConsumerImpl.getRebalanceImpl().processQueueTable.size(); updateCorePoolSize(queueSize); } }); 又由于在消费者实例启动时,会出发重平衡,此时会不会导致频繁的更新线程池核心线程数? 当前的更新逻辑确实会导致这样的情况:只要corePoolSize 在0~ Math.min(ConsumeThreadMax, Short.MAX_VALUE) 之间就会。 public void updateCorePoolSize(int corePoolSize) { if (corePoolSize > 0 && corePoolSize <= Short.MAX_VALUE && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) { this.consumeExecutor.setCorePoolSize(corePoolSize); } } 优化思路: corePoolSize > this.defaultMQPushConsumer.getConsumeThreadMin() 针对当前的场景,我们应该控制corePoolSize的范围在设置的min~max之间才是合理的。 这样一来,也不用担心在启动时,corePoolSize被频繁更新了。 简单测试: 1 在有序消费ConsumeMessageOrderlyService的重平衡监听中打印当前线程池的情况。 2 启动时,重平衡; 3 在控制台调整主题分区,触发重平衡。 测试结果,符合预期效果 Connected to the target VM, address: '127.0.0.1:54211', transport: 'socket' Consumer Started. consumeExecutor queueSize: 24, corePoolSize: 24 consumeExecutor queueSize: 25, corePoolSize: 25 consumeExecutor queueSize: 29, corePoolSize: 29 ### Steps to Reproduce Start the consumer for orderly consumption ### What Did You Expect to See? The number of threads can dynamically change with the quantity of subscribed topic partitions to enhance the parallelism of orderly consumption. ### What Did You See Instead? read code ### Additional Context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org