lokistars opened a new issue, #7565: URL: https://github.com/apache/rocketmq/issues/7565
### Before Creating the Bug Report - [X] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq/discussions). - [X] I have searched the [GitHub Issues](https://github.com/apache/rocketmq/issues) and [GitHub Discussions](https://github.com/apache/rocketmq/discussions) of this repository and believe that this is not a duplicate. - [X] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Runtime platform environment windows server ### RocketMQ version RocketMQ verison : 4.9.3 and all ### JDK Version jdk: 1.8 ### Describe the Bug When executing the pull task in DefaultLitePullConsumerImpl, it verifies whether the current task is paused. This verification is done before checking if the task should be discarded. As a result, the discarded task will never be cleaned up, causing the scheduled task to remain unreleased and leading to memory leaks. // After rebalancing, the assignedMessageQueue is deleted. This condition always returns true, causing an empty scheduled task to keep running indefinitely. if (assignedMessageQueue.isPaused(messageQueue)) { scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS); log.debug("Message Queue: {} has been paused!", messageQueue); return; } ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); if (null == processQueue || processQueue.isDropped()) { log.info("The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", defaultLitePullConsumer.getConsumerGroup(), this.messageQueue); return; } // If the assignedMessageQueueState does not have any queued messages, the condition always evaluates to true. public boolean isPaused(MessageQueue messageQueue) { MessageQueueState messageQueueState = assignedMessageQueueState.get(messageQueue); if (messageQueueState != null) { return messageQueueState.isPaused(); } return true; } // After rebalancing, any message queues that are not assigned to oneself will be removed from the assignedMessageQueueState. public void updateAssignedMessageQueue(String topic, Collection<MessageQueue> assigned) { synchronized (this.assignedMessageQueueState) { Iterator<Map.Entry<MessageQueue, MessageQueueState>> it = this.assignedMessageQueueState.entrySet().iterator(); while (it.hasNext()) { Map.Entry<MessageQueue, MessageQueueState> next = it.next(); if (next.getKey().getTopic().equals(topic)) { if (!assigned.contains(next.getKey())) { next.getValue().getProcessQueue().setDropped(true); it.remove(); } } } addAssignedMessageQueue(assigned); } } ### Steps to Reproduce 1、In a running environment, there is only one consumer that concurrently consumes four queues. 2、When adding a new consumer, it triggers a rebalancing strategy, resulting in an average of two queues assigned to each consumer. 3、The two tasks allocated to the first consumer cannot clean up the scheduled tasks and are not canceled. ### What Did You Expect to See? If this is not intended behavior, there is indeed a bug that needs to be fixed. ### What Did You See Instead? Bug Report ### Additional Context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org