This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 01a51231b3 [ISSUE #9015] Sync SysFlag and message body inflation 
status; allow omit of message body (#9016)
01a51231b3 is described below

commit 01a51231b3e8a0e72b2803d537bfbeb4f8e45719
Author: Zhanhui Li <lizhan...@apache.org>
AuthorDate: Thu Dec 5 15:06:35 2024 +0800

    [ISSUE #9015] Sync SysFlag and message body inflation status; allow omit of 
message body (#9016)
---
 .../client/impl/consumer/ProcessQueue.java         | 12 ++++--
 .../client/producer/ProduceAccumulator.java        | 50 ++++++++++++++--------
 .../trace/hook/SendMessageOpenTracingHookImpl.java |  6 +--
 .../trace/hook/SendMessageTraceHookImpl.java       |  3 +-
 .../rocketmq/common/message/MessageDecoder.java    |  4 +-
 5 files changed, 50 insertions(+), 25 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 33e698b00c..bc1b5eff2f 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -137,7 +137,7 @@ public class ProcessQueue {
                     if (null == old) {
                         validMsgCnt++;
                         this.queueOffsetMax = msg.getQueueOffset();
-                        msgSize.addAndGet(msg.getBody().length);
+                        msgSize.addAndGet(null == msg.getBody() ? 0 : 
msg.getBody().length);
                     }
                 }
                 msgCount.addAndGet(validMsgCnt);
@@ -198,7 +198,10 @@ public class ProcessQueue {
                         MessageExt prev = 
msgTreeMap.remove(msg.getQueueOffset());
                         if (prev != null) {
                             removedCnt--;
-                            msgSize.addAndGet(-msg.getBody().length);
+                            long bodySize = null == msg.getBody() ? 0 : 
msg.getBody().length;
+                            if (bodySize > 0) {
+                                msgSize.addAndGet(-bodySize);
+                            }
                         }
                     }
                     if (msgCount.addAndGet(removedCnt) == 0) {
@@ -270,7 +273,10 @@ public class ProcessQueue {
                     msgSize.set(0);
                 } else {
                     for (MessageExt msg : 
this.consumingMsgOrderlyTreeMap.values()) {
-                        msgSize.addAndGet(-msg.getBody().length);
+                        int bodySize = null == msg.getBody() ? 0 : 
msg.getBody().length;
+                        if (bodySize > 0) {
+                            msgSize.addAndGet(-bodySize);
+                        }
                     }
                 }
                 this.consumingMsgOrderlyTreeMap.clear();
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
 
b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
index 46dfcf71d2..809830e464 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java
@@ -52,9 +52,9 @@ public class ProduceAccumulator {
     private final Logger log = 
LoggerFactory.getLogger(DefaultMQProducer.class);
     private final GuardForSyncSendService guardThreadForSyncSend;
     private final GuardForAsyncSendService guardThreadForAsyncSend;
-    private Map<AggregateKey, MessageAccumulation> syncSendBatchs = new 
ConcurrentHashMap<AggregateKey, MessageAccumulation>();
-    private Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new 
ConcurrentHashMap<AggregateKey, MessageAccumulation>();
-    private AtomicLong currentlyHoldSize = new AtomicLong(0);
+    private final Map<AggregateKey, MessageAccumulation> syncSendBatchs = new 
ConcurrentHashMap<AggregateKey, MessageAccumulation>();
+    private final Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new 
ConcurrentHashMap<AggregateKey, MessageAccumulation>();
+    private final AtomicLong currentlyHoldSize = new AtomicLong(0);
     private final String instanceName;
 
     public ProduceAccumulator(String instanceName) {
@@ -70,11 +70,13 @@ public class ProduceAccumulator {
             serviceName = String.format("Client_%s_GuardForSyncSend", 
clientInstanceName);
         }
 
-        @Override public String getServiceName() {
+        @Override
+        public String getServiceName() {
             return serviceName;
         }
 
-        @Override public void run() {
+        @Override
+        public void run() {
             log.info(this.getServiceName() + " service started");
 
             while (!this.isStopped()) {
@@ -115,11 +117,13 @@ public class ProduceAccumulator {
             serviceName = String.format("Client_%s_GuardForAsyncSend", 
clientInstanceName);
         }
 
-        @Override public String getServiceName() {
+        @Override
+        public String getServiceName() {
             return serviceName;
         }
 
-        @Override public void run() {
+        @Override
+        public void run() {
             log.info(this.getServiceName() + " service started");
 
             while (!this.isStopped()) {
@@ -276,7 +280,10 @@ public class ProduceAccumulator {
     boolean tryAddMessage(Message message) {
         synchronized (currentlyHoldSize) {
             if (currentlyHoldSize.get() < totalHoldSize) {
-                currentlyHoldSize.addAndGet(message.getBody().length);
+                int bodySize = null == message.getBody() ? 0 : 
message.getBody().length;
+                if (bodySize > 0) {
+                    currentlyHoldSize.addAndGet(bodySize);
+                }
                 return true;
             } else {
                 return false;
@@ -305,7 +312,8 @@ public class ProduceAccumulator {
             this.tag = tag;
         }
 
-        @Override public boolean equals(Object o) {
+        @Override
+        public boolean equals(Object o) {
             if (this == o)
                 return true;
             if (o == null || getClass() != o.getClass())
@@ -314,7 +322,8 @@ public class ProduceAccumulator {
             return waitStoreMsgOK == key.waitStoreMsgOK && 
topic.equals(key.topic) && Objects.equals(mq, key.mq) && Objects.equals(tag, 
key.tag);
         }
 
-        @Override public int hashCode() {
+        @Override
+        public int hashCode() {
             return Objects.hash(topic, mq, waitStoreMsgOK, tag);
         }
     }
@@ -324,7 +333,7 @@ public class ProduceAccumulator {
         private LinkedList<Message> messages;
         private LinkedList<SendCallback> sendCallbacks;
         private Set<String> keys;
-        private AtomicBoolean closed;
+        private final AtomicBoolean closed;
         private SendResult[] sendResults;
         private AggregateKey aggregateKey;
         private AtomicInteger messagesSize;
@@ -351,8 +360,7 @@ public class ProduceAccumulator {
             return false;
         }
 
-        public int add(
-            Message msg) throws InterruptedException, MQBrokerException, 
RemotingException, MQClientException {
+        public int add(Message msg) throws InterruptedException, 
MQBrokerException, RemotingException, MQClientException {
             int ret = -1;
             synchronized (this.closed) {
                 if (this.closed.get()) {
@@ -360,7 +368,10 @@ public class ProduceAccumulator {
                 }
                 ret = this.count++;
                 this.messages.add(msg);
-                messagesSize.addAndGet(msg.getBody().length);
+                int bodySize = null == msg.getBody() ? 0 : 
msg.getBody().length;
+                if (bodySize > 0) {
+                    messagesSize.addAndGet(bodySize);
+                }
                 String msgKeys = msg.getKeys();
                 if (msgKeys != null) {
                     
this.keys.addAll(Arrays.asList(msgKeys.split(MessageConst.KEY_SEPARATOR)));
@@ -388,7 +399,10 @@ public class ProduceAccumulator {
                 this.count++;
                 this.messages.add(msg);
                 this.sendCallbacks.add(sendCallback);
-                messagesSize.getAndAdd(msg.getBody().length);
+                int bodySize = null == msg.getBody() ? 0 : 
msg.getBody().length;
+                if (bodySize > 0) {
+                    messagesSize.addAndGet(bodySize);
+                }
             }
             if (readyToSend()) {
                 this.send(sendCallback);
@@ -472,7 +486,8 @@ public class ProduceAccumulator {
                 if (defaultMQProducer != null) {
                     final int size = messagesSize.get();
                     defaultMQProducer.sendDirect(messageBatch, 
aggregateKey.mq, new SendCallback() {
-                        @Override public void onSuccess(SendResult sendResult) 
{
+                        @Override
+                        public void onSuccess(SendResult sendResult) {
                             try {
                                 splitSendResults(sendResult);
                                 int i = 0;
@@ -490,7 +505,8 @@ public class ProduceAccumulator {
                             }
                         }
 
-                        @Override public void onException(Throwable e) {
+                        @Override
+                        public void onException(Throwable e) {
                             for (SendCallback v : sendCallbacks) {
                                 v.onException(e);
                             }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java
index 3cb6493384..0f828f2b4e 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java
@@ -48,8 +48,8 @@ public class SendMessageOpenTracingHookImpl implements 
SendMessageHook {
         }
         Message msg = context.getMessage();
         Tracer.SpanBuilder spanBuilder = tracer
-                .buildSpan(TraceConstants.TO_PREFIX + msg.getTopic())
-                .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER);
+            .buildSpan(TraceConstants.TO_PREFIX + msg.getTopic())
+            .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER);
         SpanContext spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new 
TextMapAdapter(msg.getProperties()));
         if (spanContext != null) {
             spanBuilder.asChildOf(spanContext);
@@ -62,7 +62,7 @@ public class SendMessageOpenTracingHookImpl implements 
SendMessageHook {
         span.setTag(TraceConstants.ROCKETMQ_KEYS, msg.getKeys());
         span.setTag(TraceConstants.ROCKETMQ_STORE_HOST, 
context.getBrokerAddr());
         span.setTag(TraceConstants.ROCKETMQ_MSG_TYPE, 
context.getMsgType().name());
-        span.setTag(TraceConstants.ROCKETMQ_BODY_LENGTH, msg.getBody().length);
+        span.setTag(TraceConstants.ROCKETMQ_BODY_LENGTH, null == msg.getBody() 
? 0 : msg.getBody().length);
         context.setMqTraceContext(span);
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
index dba04b593f..61738928bb 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java
@@ -58,7 +58,8 @@ public class SendMessageTraceHookImpl implements 
SendMessageHook {
         traceBean.setTags(context.getMessage().getTags());
         traceBean.setKeys(context.getMessage().getKeys());
         traceBean.setStoreHost(context.getBrokerAddr());
-        traceBean.setBodyLength(context.getMessage().getBody().length);
+        int bodyLength = null == context.getMessage().getBody() ? 0 : 
context.getMessage().getBody().length;
+        traceBean.setBodyLength(bodyLength);
         traceBean.setMsgType(context.getMsgType());
         traceContext.getTraceBeans().add(traceBean);
     }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index f5491e192a..713f9405ea 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -516,13 +516,15 @@ public class MessageDecoder {
                         }
                     }
 
-                    // uncompress body
+                    // inflate body
                     if (deCompressBody && (sysFlag & 
MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
                         Compressor compressor = 
CompressorFactory.getCompressor(MessageSysFlag.getCompressionType(sysFlag));
                         body = compressor.decompress(body);
+                        sysFlag &= ~MessageSysFlag.COMPRESSED_FLAG;
                     }
 
                     msgExt.setBody(body);
+                    msgExt.setSysFlag(sysFlag);
                 } else {
                     byteBuffer.position(byteBuffer.position() + bodyLen);
                 }

Reply via email to