lokistars opened a new issue, #7565:
URL: https://github.com/apache/rocketmq/issues/7565

   ### Before Creating the Bug Report
   
   - [X] I found a bug, not just asking a question, which should be created in 
[GitHub Discussions](https://github.com/apache/rocketmq/discussions).
   
   - [X] I have searched the [GitHub 
Issues](https://github.com/apache/rocketmq/issues) and [GitHub 
Discussions](https://github.com/apache/rocketmq/discussions)  of this 
repository and believe that this is not a duplicate.
   
   - [X] I have confirmed that this bug belongs to the current repository, not 
other repositories of RocketMQ.
   
   
   ### Runtime platform environment
   
   windows server
   
   ### RocketMQ version
   
   RocketMQ verison : 4.9.3 and all
   
   ### JDK Version
   
   jdk: 1.8
   
   ### Describe the Bug
   
   When executing the pull task in DefaultLitePullConsumerImpl, it verifies 
whether the current task is paused. This verification is done before checking 
if the task should be discarded. As a result, the discarded task will never be 
cleaned up, causing the scheduled task to remain unreleased and leading to 
memory leaks.
   
                   // After rebalancing, the assignedMessageQueue is deleted. 
This condition always returns true, causing an empty scheduled task to keep 
running indefinitely.
                   if (assignedMessageQueue.isPaused(messageQueue)) {
                       scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
                       log.debug("Message Queue: {} has been paused!", 
messageQueue);
                       return;
                   }
   
                   ProcessQueue processQueue = 
assignedMessageQueue.getProcessQueue(messageQueue);
   
                   if (null == processQueue || processQueue.isDropped()) {
                       log.info("The message queue not be able to poll, because 
it's dropped. group={}, messageQueue={}", 
                      defaultLitePullConsumer.getConsumerGroup(), 
this.messageQueue);
                       return;
                   }
                  
                   // If the assignedMessageQueueState does not have any queued 
messages, the condition always evaluates to true.
                   public boolean isPaused(MessageQueue messageQueue) {
                       MessageQueueState messageQueueState = 
assignedMessageQueueState.get(messageQueue);
                       if (messageQueueState != null) {
                           return messageQueueState.isPaused();
                       }
                       return true;
                   }
   
         // After rebalancing, any message queues that are not assigned to 
oneself will be removed from the assignedMessageQueueState.
         public void updateAssignedMessageQueue(String topic, 
Collection<MessageQueue> assigned) {
             synchronized (this.assignedMessageQueueState) {
                 Iterator<Map.Entry<MessageQueue, MessageQueueState>> it = 
this.assignedMessageQueueState.entrySet().iterator();
                 while (it.hasNext()) {
                     Map.Entry<MessageQueue, MessageQueueState> next = 
it.next();
                     if (next.getKey().getTopic().equals(topic)) {
                         if (!assigned.contains(next.getKey())) {
                             next.getValue().getProcessQueue().setDropped(true);
                             it.remove();
                         }
                     }
                 }
                 addAssignedMessageQueue(assigned);
             }
         }
   
   ### Steps to Reproduce
   
   1、In a running environment, there is only one consumer that concurrently 
consumes four queues.
   2、When adding a new consumer, it triggers a rebalancing strategy, resulting 
in an average of two queues assigned to each consumer.
   3、The two tasks allocated to the first consumer cannot clean up the 
scheduled tasks and are not canceled.
   
   ### What Did You Expect to See?
   
   If this is not intended behavior, there is indeed a bug that needs to be 
fixed.
   
   ### What Did You See Instead?
   
   Bug Report
   
   ### Additional Context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to