This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch 4.9.x in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/4.9.x by this push: new fc6c899abc [ISSUE #7538] fix wrong cachedMsgSize if msg body is changed in consumer callback (#7820) fc6c899abc is described below commit fc6c899abc8262251c7838af92e7c5d4267bcf87 Author: yuz10 <845238...@qq.com> AuthorDate: Tue Feb 6 09:04:56 2024 +0800 [ISSUE #7538] fix wrong cachedMsgSize if msg body is changed in consumer callback (#7820) --- .../rocketmq/client/impl/consumer/ProcessQueue.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java index ba00aaef99..e0a3cd5ad2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java @@ -202,10 +202,12 @@ public class ProcessQueue { MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); if (prev != null) { removedCnt--; - msgSize.addAndGet(0 - msg.getBody().length); + msgSize.addAndGet(-msg.getBody().length); } } - msgCount.addAndGet(removedCnt); + if (msgCount.addAndGet(removedCnt) == 0) { + msgSize.set(0); + } if (!msgTreeMap.isEmpty()) { result = msgTreeMap.firstKey(); @@ -268,9 +270,12 @@ public class ProcessQueue { this.treeMapLock.writeLock().lockInterruptibly(); try { Long offset = this.consumingMsgOrderlyTreeMap.lastKey(); - msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size()); - for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) { - msgSize.addAndGet(0 - msg.getBody().length); + if (msgCount.addAndGet(-this.consumingMsgOrderlyTreeMap.size()) == 0) { + msgSize.set(0); + } else { + for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) { + msgSize.addAndGet(-msg.getBody().length); + } } this.consumingMsgOrderlyTreeMap.clear(); if (offset != null) { @@ -409,8 +414,8 @@ public class ProcessQueue { info.setCachedMsgMinOffset(this.msgTreeMap.firstKey()); info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey()); info.setCachedMsgCount(this.msgTreeMap.size()); - info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024))); } + info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024))); if (!this.consumingMsgOrderlyTreeMap.isEmpty()) { info.setTransactionMsgMinOffset(this.consumingMsgOrderlyTreeMap.firstKey());