This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 01a51231b3 [ISSUE #9015] Sync SysFlag and message body inflation status; allow omit of message body (#9016) 01a51231b3 is described below commit 01a51231b3e8a0e72b2803d537bfbeb4f8e45719 Author: Zhanhui Li <lizhan...@apache.org> AuthorDate: Thu Dec 5 15:06:35 2024 +0800 [ISSUE #9015] Sync SysFlag and message body inflation status; allow omit of message body (#9016) --- .../client/impl/consumer/ProcessQueue.java | 12 ++++-- .../client/producer/ProduceAccumulator.java | 50 ++++++++++++++-------- .../trace/hook/SendMessageOpenTracingHookImpl.java | 6 +-- .../trace/hook/SendMessageTraceHookImpl.java | 3 +- .../rocketmq/common/message/MessageDecoder.java | 4 +- 5 files changed, 50 insertions(+), 25 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java index 33e698b00c..bc1b5eff2f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java @@ -137,7 +137,7 @@ public class ProcessQueue { if (null == old) { validMsgCnt++; this.queueOffsetMax = msg.getQueueOffset(); - msgSize.addAndGet(msg.getBody().length); + msgSize.addAndGet(null == msg.getBody() ? 0 : msg.getBody().length); } } msgCount.addAndGet(validMsgCnt); @@ -198,7 +198,10 @@ public class ProcessQueue { MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); if (prev != null) { removedCnt--; - msgSize.addAndGet(-msg.getBody().length); + long bodySize = null == msg.getBody() ? 0 : msg.getBody().length; + if (bodySize > 0) { + msgSize.addAndGet(-bodySize); + } } } if (msgCount.addAndGet(removedCnt) == 0) { @@ -270,7 +273,10 @@ public class ProcessQueue { msgSize.set(0); } else { for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) { - msgSize.addAndGet(-msg.getBody().length); + int bodySize = null == msg.getBody() ? 0 : msg.getBody().length; + if (bodySize > 0) { + msgSize.addAndGet(-bodySize); + } } } this.consumingMsgOrderlyTreeMap.clear(); diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java index 46dfcf71d2..809830e464 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/ProduceAccumulator.java @@ -52,9 +52,9 @@ public class ProduceAccumulator { private final Logger log = LoggerFactory.getLogger(DefaultMQProducer.class); private final GuardForSyncSendService guardThreadForSyncSend; private final GuardForAsyncSendService guardThreadForAsyncSend; - private Map<AggregateKey, MessageAccumulation> syncSendBatchs = new ConcurrentHashMap<AggregateKey, MessageAccumulation>(); - private Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new ConcurrentHashMap<AggregateKey, MessageAccumulation>(); - private AtomicLong currentlyHoldSize = new AtomicLong(0); + private final Map<AggregateKey, MessageAccumulation> syncSendBatchs = new ConcurrentHashMap<AggregateKey, MessageAccumulation>(); + private final Map<AggregateKey, MessageAccumulation> asyncSendBatchs = new ConcurrentHashMap<AggregateKey, MessageAccumulation>(); + private final AtomicLong currentlyHoldSize = new AtomicLong(0); private final String instanceName; public ProduceAccumulator(String instanceName) { @@ -70,11 +70,13 @@ public class ProduceAccumulator { serviceName = String.format("Client_%s_GuardForSyncSend", clientInstanceName); } - @Override public String getServiceName() { + @Override + public String getServiceName() { return serviceName; } - @Override public void run() { + @Override + public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { @@ -115,11 +117,13 @@ public class ProduceAccumulator { serviceName = String.format("Client_%s_GuardForAsyncSend", clientInstanceName); } - @Override public String getServiceName() { + @Override + public String getServiceName() { return serviceName; } - @Override public void run() { + @Override + public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { @@ -276,7 +280,10 @@ public class ProduceAccumulator { boolean tryAddMessage(Message message) { synchronized (currentlyHoldSize) { if (currentlyHoldSize.get() < totalHoldSize) { - currentlyHoldSize.addAndGet(message.getBody().length); + int bodySize = null == message.getBody() ? 0 : message.getBody().length; + if (bodySize > 0) { + currentlyHoldSize.addAndGet(bodySize); + } return true; } else { return false; @@ -305,7 +312,8 @@ public class ProduceAccumulator { this.tag = tag; } - @Override public boolean equals(Object o) { + @Override + public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) @@ -314,7 +322,8 @@ public class ProduceAccumulator { return waitStoreMsgOK == key.waitStoreMsgOK && topic.equals(key.topic) && Objects.equals(mq, key.mq) && Objects.equals(tag, key.tag); } - @Override public int hashCode() { + @Override + public int hashCode() { return Objects.hash(topic, mq, waitStoreMsgOK, tag); } } @@ -324,7 +333,7 @@ public class ProduceAccumulator { private LinkedList<Message> messages; private LinkedList<SendCallback> sendCallbacks; private Set<String> keys; - private AtomicBoolean closed; + private final AtomicBoolean closed; private SendResult[] sendResults; private AggregateKey aggregateKey; private AtomicInteger messagesSize; @@ -351,8 +360,7 @@ public class ProduceAccumulator { return false; } - public int add( - Message msg) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { + public int add(Message msg) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { int ret = -1; synchronized (this.closed) { if (this.closed.get()) { @@ -360,7 +368,10 @@ public class ProduceAccumulator { } ret = this.count++; this.messages.add(msg); - messagesSize.addAndGet(msg.getBody().length); + int bodySize = null == msg.getBody() ? 0 : msg.getBody().length; + if (bodySize > 0) { + messagesSize.addAndGet(bodySize); + } String msgKeys = msg.getKeys(); if (msgKeys != null) { this.keys.addAll(Arrays.asList(msgKeys.split(MessageConst.KEY_SEPARATOR))); @@ -388,7 +399,10 @@ public class ProduceAccumulator { this.count++; this.messages.add(msg); this.sendCallbacks.add(sendCallback); - messagesSize.getAndAdd(msg.getBody().length); + int bodySize = null == msg.getBody() ? 0 : msg.getBody().length; + if (bodySize > 0) { + messagesSize.addAndGet(bodySize); + } } if (readyToSend()) { this.send(sendCallback); @@ -472,7 +486,8 @@ public class ProduceAccumulator { if (defaultMQProducer != null) { final int size = messagesSize.get(); defaultMQProducer.sendDirect(messageBatch, aggregateKey.mq, new SendCallback() { - @Override public void onSuccess(SendResult sendResult) { + @Override + public void onSuccess(SendResult sendResult) { try { splitSendResults(sendResult); int i = 0; @@ -490,7 +505,8 @@ public class ProduceAccumulator { } } - @Override public void onException(Throwable e) { + @Override + public void onException(Throwable e) { for (SendCallback v : sendCallbacks) { v.onException(e); } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java index 3cb6493384..0f828f2b4e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java @@ -48,8 +48,8 @@ public class SendMessageOpenTracingHookImpl implements SendMessageHook { } Message msg = context.getMessage(); Tracer.SpanBuilder spanBuilder = tracer - .buildSpan(TraceConstants.TO_PREFIX + msg.getTopic()) - .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER); + .buildSpan(TraceConstants.TO_PREFIX + msg.getTopic()) + .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER); SpanContext spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(msg.getProperties())); if (spanContext != null) { spanBuilder.asChildOf(spanContext); @@ -62,7 +62,7 @@ public class SendMessageOpenTracingHookImpl implements SendMessageHook { span.setTag(TraceConstants.ROCKETMQ_KEYS, msg.getKeys()); span.setTag(TraceConstants.ROCKETMQ_STORE_HOST, context.getBrokerAddr()); span.setTag(TraceConstants.ROCKETMQ_MSG_TYPE, context.getMsgType().name()); - span.setTag(TraceConstants.ROCKETMQ_BODY_LENGTH, msg.getBody().length); + span.setTag(TraceConstants.ROCKETMQ_BODY_LENGTH, null == msg.getBody() ? 0 : msg.getBody().length); context.setMqTraceContext(span); } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java index dba04b593f..61738928bb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java @@ -58,7 +58,8 @@ public class SendMessageTraceHookImpl implements SendMessageHook { traceBean.setTags(context.getMessage().getTags()); traceBean.setKeys(context.getMessage().getKeys()); traceBean.setStoreHost(context.getBrokerAddr()); - traceBean.setBodyLength(context.getMessage().getBody().length); + int bodyLength = null == context.getMessage().getBody() ? 0 : context.getMessage().getBody().length; + traceBean.setBodyLength(bodyLength); traceBean.setMsgType(context.getMsgType()); traceContext.getTraceBeans().add(traceBean); } diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java index f5491e192a..713f9405ea 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java @@ -516,13 +516,15 @@ public class MessageDecoder { } } - // uncompress body + // inflate body if (deCompressBody && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) { Compressor compressor = CompressorFactory.getCompressor(MessageSysFlag.getCompressionType(sysFlag)); body = compressor.decompress(body); + sysFlag &= ~MessageSysFlag.COMPRESSED_FLAG; } msgExt.setBody(body); + msgExt.setSysFlag(sysFlag); } else { byteBuffer.position(byteBuffer.position() + bodyLen); }