RapperCL opened a new issue, #8354: URL: https://github.com/apache/rocketmq/issues/8354
### Before Creating the Bug Report - [X] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq/discussions). - [X] I have searched the [GitHub Issues](https://github.com/apache/rocketmq/issues) and [GitHub Discussions](https://github.com/apache/rocketmq/discussions) of this repository and believe that this is not a duplicate. - [X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Runtime platform environment windows ### RocketMQ version develop ### JDK Version jdk 1.8 ### Describe the Bug 问题: 消费者线程池对外开放了核心线程数,最大线程数,但由于阻塞队列为无界队列,导致线程数无法扩增到最多线程数。 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService 无效的动态调整任务: 沿着这个问题去查看源码,发现内部有个动态调整线程池的定时任务: org.apache.rocketmq.client.impl.factory.MQClientInstance#adjustThreadPool 频率: 一分钟执行一次 任务:通过遍历当前消费者实例下的消费者所订阅主题分区的本地队列中的消息数,判断是否超过了调整阈 值adjustThreadPoolNumsThreshold = 100000(默认),如果达到了就增加当前线程池的线程数,低于就减少当前线程数。 ![image](https://github.com/apache/rocketmq/assets/44110731/1f02f0e5-0e54-4def-a4cd-bf009290943f) 存在问题:虽然提供了这种机制,但当前所有的ConsumeMessageService 都没有实现这两个方法,这就导致实际上这个任务实际上是个无用的任务。 其次,感知迟钝:定时1min执行一次,如果当前并发消费情况下consumeThreadMin = 20,consumeThreadMax= 64(某线上配置) ,在遇到突发消息时,要想扩容到最大线程数,此时如果递增, 需要轮询44分钟,完全不能及时感知流量。 解决方案: 结合了拉取消息的流控和消费逻辑,设计了下面两种方案。 一种是沿用现有的这种机制,二种是去掉这种机制,使用线程池自带的线程扩缩机制。 方案一:沿用现有机制 其实我不很是建议,因为adjustThreadPoolNumsThreshold这个阈值并不好设置。 其一,因为这个值受消费者订阅主题分区数影响: 当选择默认值10000时,消费者订阅主题分区数过少,可能本地消息永远达不到100000,(拉取消息的限流默认单队列小于1000条,大小小于100m),导致线程数无法得到扩充。 其次,感知迟钝:定时1min执行一次,如果当前并发消费情况下consumeThreadMin = 20,consumeThreadMax= 64(某线上配置) ,在遇到突发消息时,要想扩容到最大线程数,此时如果递增, 需要轮询44分钟,完全不能及时感知流量。 方案二:使用线程池自带的线程扩缩机制 该方案主要结合了消息拉取的流控配置:pullThresholdSizeForQueue (本地单个分区队列的最大消息数) 1 pullThresholdSizeForQueue拉取阈值大小作为当前消费者线程池的大小 这样会有什么问题呢? 容易频繁触发线程池拒绝。 原因:在拉取消息时,通过当前分区队列的消息数与消费队列进行比较,在消费者订阅了多个主题分区情况下,会容易频繁触发线程池的拒绝策略,导致消息拉取流控效果差,拉取和消费失衡。 参数: consumeMessageBatchMaxSize = 1 pullBatchSize = 32 pullThresholdSizeForQueue = 1000 订阅主题数=2, 单个主题分区分配数=3 此时的消费者线程池容量: 1000 当前消费者订阅了6个分区队列,当前消费队列中已经有999条消息正在消费,当前判断通过,此时会继续拉取,拉取之后,将消息投递,然后会触发拒绝,延迟5s重新投递。 下一个分区继续拉取,当消费速度比较慢的时候,此时线程池中已有990条消息,此时又会触发线程池拒绝策略。 2 pullThresholdSizeForQueue * 订阅主题分区数 参数同上: consumeMessageBatchMaxSize = 1 pullBatchSize = 32 pullThresholdSizeForQueue = 1000 订阅主题数=2, 单个主题分区分配数=3 此时的消费者线程池容量: 6000 设置的队列大小符合限流的流控,当订阅了多主题分区时,容易导致容量设置的过大,但阻塞队列采用的是双端阻塞队列,从某种程度上是可以避免内存占用的, 但在消费压力比较大的情况下,消费者本地会积攒过多消息,(其实当前rocketmq的版本就会导致本地积攒消息过多的情况)所以遇到这种情况下,推荐扩容。 另外需要考虑的一点是该订阅的主题分区在运行期间可能会发生变化,如果动态调整阻塞队列的大小就有点过于麻烦了。 3 pullThresholdSizeForQueue * 订阅主题数 综合1,2方案的缺陷,此方案只会受订阅主题数的影响,正常订阅的主题是不会变化的,所以调整线程池数=pullThresholdSizeForQueue * 订阅主题数,是比较合理的。既可以避免频繁的触发拒绝策略,也可以避免受主题分区的影响。 ### Steps to Reproduce Start the consumer for concurrent consumption. ### What Did You Expect to See? The number of threads can be expanded to the maximum thread count in real time based on the current message situation. ### What Did You See Instead? read code ### Additional Context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org