This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new c833ff6e9a [ISSUE #7538] fix wrong cachedMsgSize if msg body is changed in consumer callback c833ff6e9a is described below commit c833ff6e9a022704b86bee90729c6710bb5c37a6 Author: yuz10 <845238...@qq.com> AuthorDate: Fri Feb 2 12:34:13 2024 +0800 [ISSUE #7538] fix wrong cachedMsgSize if msg body is changed in consumer callback --- .../rocketmq/client/impl/consumer/ProcessQueue.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java index ebc208a8d8..33e698b00c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java @@ -198,10 +198,12 @@ public class ProcessQueue { MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); if (prev != null) { removedCnt--; - msgSize.addAndGet(0 - msg.getBody().length); + msgSize.addAndGet(-msg.getBody().length); } } - msgCount.addAndGet(removedCnt); + if (msgCount.addAndGet(removedCnt) == 0) { + msgSize.set(0); + } if (!msgTreeMap.isEmpty()) { result = msgTreeMap.firstKey(); @@ -264,9 +266,12 @@ public class ProcessQueue { this.treeMapLock.writeLock().lockInterruptibly(); try { Long offset = this.consumingMsgOrderlyTreeMap.lastKey(); - msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size()); - for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) { - msgSize.addAndGet(0 - msg.getBody().length); + if (msgCount.addAndGet(-this.consumingMsgOrderlyTreeMap.size()) == 0) { + msgSize.set(0); + } else { + for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) { + msgSize.addAndGet(-msg.getBody().length); + } } this.consumingMsgOrderlyTreeMap.clear(); if (offset != null) { @@ -426,8 +431,8 @@ public class ProcessQueue { info.setCachedMsgMinOffset(this.msgTreeMap.firstKey()); info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey()); info.setCachedMsgCount(this.msgTreeMap.size()); - info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024))); } + info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024))); if (!this.consumingMsgOrderlyTreeMap.isEmpty()) { info.setTransactionMsgMinOffset(this.consumingMsgOrderlyTreeMap.firstKey());