This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c833ff6e9a [ISSUE #7538] fix wrong cachedMsgSize if msg body is 
changed in consumer callback
c833ff6e9a is described below

commit c833ff6e9a022704b86bee90729c6710bb5c37a6
Author: yuz10 <845238...@qq.com>
AuthorDate: Fri Feb 2 12:34:13 2024 +0800

    [ISSUE #7538] fix wrong cachedMsgSize if msg body is changed in consumer 
callback
---
 .../rocketmq/client/impl/consumer/ProcessQueue.java     | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index ebc208a8d8..33e698b00c 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -198,10 +198,12 @@ public class ProcessQueue {
                         MessageExt prev = 
msgTreeMap.remove(msg.getQueueOffset());
                         if (prev != null) {
                             removedCnt--;
-                            msgSize.addAndGet(0 - msg.getBody().length);
+                            msgSize.addAndGet(-msg.getBody().length);
                         }
                     }
-                    msgCount.addAndGet(removedCnt);
+                    if (msgCount.addAndGet(removedCnt) == 0) {
+                        msgSize.set(0);
+                    }
 
                     if (!msgTreeMap.isEmpty()) {
                         result = msgTreeMap.firstKey();
@@ -264,9 +266,12 @@ public class ProcessQueue {
             this.treeMapLock.writeLock().lockInterruptibly();
             try {
                 Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
-                msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
-                for (MessageExt msg : 
this.consumingMsgOrderlyTreeMap.values()) {
-                    msgSize.addAndGet(0 - msg.getBody().length);
+                if 
(msgCount.addAndGet(-this.consumingMsgOrderlyTreeMap.size()) == 0) {
+                    msgSize.set(0);
+                } else {
+                    for (MessageExt msg : 
this.consumingMsgOrderlyTreeMap.values()) {
+                        msgSize.addAndGet(-msg.getBody().length);
+                    }
                 }
                 this.consumingMsgOrderlyTreeMap.clear();
                 if (offset != null) {
@@ -426,8 +431,8 @@ public class ProcessQueue {
                 info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
                 info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
                 info.setCachedMsgCount(this.msgTreeMap.size());
-                info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 
1024)));
             }
+            info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 
1024)));
 
             if (!this.consumingMsgOrderlyTreeMap.isEmpty()) {
                 
info.setTransactionMsgMinOffset(this.consumingMsgOrderlyTreeMap.firstKey());

Reply via email to