This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit ee6cddfdb25e8c33082a85acdbff217631eb8985 Author: nowinkey <[email protected]> AuthorDate: Mon Feb 13 10:51:46 2023 +0800 Combine the process of decoding byteBuffer into preCheckMessageAndReturnSize method --- .../apache/rocketmq/store/DefaultMessageStore.java | 54 +++++++++++++--------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index ba4b53064..43052e2a8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -3020,28 +3020,7 @@ public class DefaultMessageStore implements MessageStore { for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) { ByteBuffer byteBuffer = result.getByteBuffer(); - byteBuffer.mark(); - - int totalSize = byteBuffer.getInt(); - if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) { - doNext = false; - break; - } - - int magicCode = byteBuffer.getInt(); - switch (magicCode) { - case MessageDecoder.MESSAGE_MAGIC_CODE: - case MessageDecoder.MESSAGE_MAGIC_CODE_V2: - break; - case MessageDecoder.BLANK_MAGIC_CODE: - totalSize = 0; - break; - default: - totalSize = -1; - doNext = false; - } - - byteBuffer.reset(); + int totalSize = preCheckMessageAndReturnSize(byteBuffer); if (totalSize > 0) { if (batchDispatchRequestStart == -1) { @@ -3058,9 +3037,9 @@ public class DefaultMessageStore implements MessageStore { this.reputFromOffset += totalSize; readSize += totalSize; } else { + doNext = false; if (totalSize == 0) { this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); - readSize = result.getSize(); } this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize); batchDispatchRequestStart = -1; @@ -3083,6 +3062,35 @@ public class DefaultMessageStore implements MessageStore { } } + /** + * pre-check the message and returns the message size + * + * @return 0 Come to the end of file // >0 Normal messages // -1 Message checksum failure + */ + public int preCheckMessageAndReturnSize(ByteBuffer byteBuffer) { + byteBuffer.mark(); + + int totalSize = byteBuffer.getInt(); + if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) { + return -1; + } + + int magicCode = byteBuffer.getInt(); + switch (magicCode) { + case MessageDecoder.MESSAGE_MAGIC_CODE: + case MessageDecoder.MESSAGE_MAGIC_CODE_V2: + break; + case MessageDecoder.BLANK_MAGIC_CODE: + return 0; + default: + return -1; + } + + byteBuffer.reset(); + + return totalSize; + } + @Override public void shutdown() { for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
