This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 91349f30b9 [ISSUE #7437] Add the CRC check of commitlog  (#7468)
91349f30b9 is described below

commit 91349f30b96db2e16b71d65a535d81f11b60bda5
Author: guyinyou <36399867+guyin...@users.noreply.github.com>
AuthorDate: Wed Oct 25 14:54:00 2023 +0800

    [ISSUE #7437] Add the CRC check of commitlog  (#7468)
    
    * Added CRC32 check for full data
    
    * add unit test
    
    * add MessageExtBrokerInnerTest
    
    * fix codestyle
    
    * fix codestyle
    
    ---------
    
    Co-authored-by: guyinyou <guyinyou....@alibaba-inc.com>
---
 .../java/org/apache/rocketmq/common/UtilAll.java   |  14 ++
 .../rocketmq/common/message/MessageConst.java      |   2 +
 .../rocketmq/common/message/MessageDecoder.java    |  32 +++-
 .../common/message/MessageExtBrokerInner.java      |  49 +++++
 .../rocketmq/common/MessageExtBrokerInnerTest.java |  93 ++++++++++
 .../java/org/apache/rocketmq/store/CommitLog.java  |  87 ++++++++-
 .../apache/rocketmq/store/MessageExtEncoder.java   |  38 +++-
 .../rocketmq/store/config/MessageStoreConfig.java  |  23 +++
 .../apache/rocketmq/store/AppendPropCRCTest.java   | 200 +++++++++++++++++++++
 .../apache/rocketmq/store/BatchPutMessageTest.java |   2 +-
 .../rocketmq/store/MessageExtBrokerInnerTest.java  | 105 +++++++++++
 .../store/ha/autoswitch/AutoSwitchHATest.java      |   2 +-
 .../file/CompositeQueueFlatFileTest.java           |   2 +-
 .../tieredstore/util/MessageBufferUtilTest.java    |   2 +-
 14 files changed, 630 insertions(+), 21 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java 
b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index d2b7c374b7..95b6b09b41 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -307,6 +307,20 @@ public class UtilAll {
         return (int) (crc32.getValue() & 0x7FFFFFFF);
     }
 
+    public static int crc32(ByteBuffer byteBuffer) {
+        CRC32 crc32 = new CRC32();
+        crc32.update(byteBuffer);
+        return (int) (crc32.getValue() & 0x7FFFFFFF);
+    }
+
+    public static int crc32(ByteBuffer[] byteBuffers) {
+        CRC32 crc32 = new CRC32();
+        for (ByteBuffer buffer : byteBuffers) {
+            crc32.update(buffer);
+        }
+        return (int) (crc32.getValue() & 0x7FFFFFFF);
+    }
+
     public static String bytes2string(byte[] src) {
         char[] hexChars = new char[src.length * 2];
         for (int j = 0; j < src.length; j++) {
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index 87fed7c192..24f7bdb99a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -97,6 +97,7 @@ public class MessageConst {
     public static final String PROPERTY_TIMER_DEL_UNIQKEY = 
"TIMER_DEL_UNIQKEY";
     public static final String PROPERTY_TIMER_DELAY_LEVEL = 
"TIMER_DELAY_LEVEL";
     public static final String PROPERTY_TIMER_DELAY_MS = "TIMER_DELAY_MS";
+    public static final String PROPERTY_CRC32 = "__CRC32#";
 
     /**
      * properties for DLQ
@@ -155,5 +156,6 @@ public class MessageConst {
         STRING_HASH_SET.add(PROPERTY_BORN_TIMESTAMP);
         STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_TOPIC);
         STRING_HASH_SET.add(PROPERTY_DLQ_ORIGIN_MESSAGE_ID);
+        STRING_HASH_SET.add(PROPERTY_CRC32);
     }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index 6de0b69fb7..b053f82759 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.common.message;
 
+import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.net.Inet4Address;
 import java.net.InetAddress;
@@ -152,6 +153,34 @@ public class MessageDecoder {
         return null;
     }
 
+    public static void createCrc32(final ByteBuffer input, int crc32) {
+        
input.put(MessageConst.PROPERTY_CRC32.getBytes(StandardCharsets.UTF_8));
+        input.put((byte) NAME_VALUE_SEPARATOR);
+        for (int i = 0; i < 10; i++) {
+            byte b = '0';
+            if (crc32 > 0) {
+                b += (byte) (crc32 % 10);
+                crc32 /= 10;
+            }
+            input.put(b);
+        }
+        input.put((byte) PROPERTY_SEPARATOR);
+    }
+
+    public static void createCrc32(final ByteBuf input, int crc32) {
+        
input.writeBytes(MessageConst.PROPERTY_CRC32.getBytes(StandardCharsets.UTF_8));
+        input.writeByte((byte) NAME_VALUE_SEPARATOR);
+        for (int i = 0; i < 10; i++) {
+            byte b = '0';
+            if (crc32 > 0) {
+                b += (byte) (crc32 % 10);
+                crc32 /= 10;
+            }
+            input.writeByte(b);
+        }
+        input.writeByte((byte) PROPERTY_SEPARATOR);
+    }
+
     public static MessageExt decode(ByteBuffer byteBuffer) {
         return decode(byteBuffer, true, true, false);
     }
@@ -601,9 +630,6 @@ public class MessageDecoder {
             sb.append(value);
             sb.append(PROPERTY_SEPARATOR);
         }
-        if (sb.length() > 0) {
-            sb.deleteCharAt(sb.length() - 1);
-        }
         return sb.toString();
     }
 
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
index 91599653c5..4e5d3419a3 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
@@ -20,6 +20,9 @@ import java.nio.ByteBuffer;
 
 import org.apache.rocketmq.common.TopicFilterType;
 
+import static 
org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
+import static 
org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;
+
 public class MessageExtBrokerInner extends MessageExt {
     private static final long serialVersionUID = 7256001576878700634L;
     private String propertiesString;
@@ -55,6 +58,52 @@ public class MessageExtBrokerInner extends MessageExt {
         this.propertiesString = propertiesString;
     }
 
+
+    public void deleteProperty(String name) {
+        super.clearProperty(name);
+        if (propertiesString != null) {
+            int idx0 = 0;
+            int idx1;
+            int idx2;
+            idx1 = propertiesString.indexOf(name, idx0);
+            if (idx1 != -1) {
+                // cropping may be required
+                StringBuilder stringBuilder = new 
StringBuilder(propertiesString.length());
+                while (true) {
+                    int startIdx = idx0;
+                    while (true) {
+                        idx1 = propertiesString.indexOf(name, startIdx);
+                        if (idx1 == -1) {
+                            break;
+                        }
+                        startIdx = idx1 + name.length();
+                        if (idx1 == 0 || propertiesString.charAt(idx1 - 1) == 
PROPERTY_SEPARATOR) {
+                            if (propertiesString.length() > idx1 + 
name.length()
+                                && propertiesString.charAt(idx1 + 
name.length()) == NAME_VALUE_SEPARATOR) {
+                                break;
+                            }
+                        }
+                    }
+                    if (idx1 == -1) {
+                        // there are no characters that need to be skipped. 
Append all remaining characters.
+                        stringBuilder.append(propertiesString, idx0, 
propertiesString.length());
+                        break;
+                    }
+                    // there are characters that need to be cropped
+                    stringBuilder.append(propertiesString, idx0, idx1);
+                    // move idx2 to the end of the cropped character
+                    idx2 = propertiesString.indexOf(PROPERTY_SEPARATOR, idx1 + 
name.length() + 1);
+                    // all subsequent characters will be cropped
+                    if (idx2 == -1) {
+                        break;
+                    }
+                    idx0 = idx2 + 1;
+                }
+                this.setPropertiesString(stringBuilder.toString());
+            }
+        }
+    }
+
     public long getTagsCode() {
         return tagsCode;
     }
diff --git 
a/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java
 
b/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java
new file mode 100644
index 0000000000..77d69e5ad7
--- /dev/null
+++ 
b/common/src/test/java/org/apache/rocketmq/common/MessageExtBrokerInnerTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MessageExtBrokerInnerTest {
+    @Test
+    public void testDeleteProperty() {
+        MessageExtBrokerInner messageExtBrokerInner = new 
MessageExtBrokerInner();
+        String propertiesString = "";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+        propertiesString = "KeyA\u0001ValueA";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+        propertiesString = "KeyA\u0001ValueA\u0002";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+        propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+        propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA\u0002";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+        propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+        propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+        propertiesString = 
"KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002KeyB\u0001ValueB\u0002");
+
+        propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+        propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB");
+
+        propertiesString = 
"KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001ValueA\u0002";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001ValueA\u0002");
+
+        propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001");
+
+        propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA");
+    }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 93102799b7..3d3ee86b8f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -73,6 +73,10 @@ public class CommitLog implements Swappable {
     protected static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
     // End of file empty MAGIC CODE cbd43194
     public final static int BLANK_MAGIC_CODE = -875286124;
+    /**
+     * CRC32 Format: [PROPERTY_CRC32 + NAME_VALUE_SEPARATOR + 10-digit 
fixed-length string + PROPERTY_SEPARATOR]
+     */
+    public static final int CRC32_RESERVED_LEN = 
MessageConst.PROPERTY_CRC32.length() + 1 + 10 + 1;
     protected final MappedFileQueue mappedFileQueue;
     protected final DefaultMessageStore defaultMessageStore;
 
@@ -96,6 +100,8 @@ public class CommitLog implements Swappable {
 
     protected int commitLogSize;
 
+    private final boolean enabledAppendPropCRC;
+
     public CommitLog(final DefaultMessageStore messageStore) {
         String storePath = 
messageStore.getMessageStoreConfig().getStorePathCommitLog();
         if (storePath.contains(MixAll.MULTI_PATH_SPLITTER)) {
@@ -117,7 +123,9 @@ public class CommitLog implements Swappable {
         putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
             @Override
             protected PutMessageThreadLocal initialValue() {
-                return new 
PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
+                return new PutMessageThreadLocal(
+                    
defaultMessageStore.getMessageStoreConfig().getMaxMessageSize(),
+                    
defaultMessageStore.getMessageStoreConfig().isEnabledAppendPropCRC());
             }
         };
         this.putMessageLock = 
messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new 
PutMessageReentrantLock() : new PutMessageSpinLock();
@@ -127,6 +135,8 @@ public class CommitLog implements Swappable {
         this.topicQueueLock = new 
TopicQueueLock(messageStore.getMessageStoreConfig().getTopicQueueLockNum());
 
         this.commitLogSize = 
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
+
+        this.enabledAppendPropCRC = 
messageStore.getMessageStoreConfig().isEnabledAppendPropCRC();
     }
 
     public void setFullStorePaths(Set<String> fullStorePaths) {
@@ -470,10 +480,16 @@ public class CommitLog implements Swappable {
                     byteBuffer.get(bytesContent, 0, bodyLen);
 
                     if (checkCRC) {
-                        int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
-                        if (crc != bodyCRC) {
-                            log.warn("CRC check failed. bodyCRC={}, 
currentCRC={}", crc, bodyCRC);
-                            return new DispatchRequest(-1, false/* success */);
+                        /**
+                         * When the forceVerifyPropCRC = false,
+                         * use original bodyCrc validation.
+                         */
+                        if 
(!this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) {
+                            int crc = UtilAll.crc32(bytesContent, 0, bodyLen);
+                            if (crc != bodyCRC) {
+                                log.warn("CRC check failed. bodyCRC={}, 
currentCRC={}", crc, bodyCRC);
+                                return new DispatchRequest(-1, false/* success 
*/);
+                            }
                         }
                     }
                 } else {
@@ -531,6 +547,43 @@ public class CommitLog implements Swappable {
                 }
             }
 
+            if (checkCRC) {
+                /**
+                 * When the forceVerifyPropCRC = true,
+                 * Crc verification needs to be performed on the entire 
message data (excluding the length reserved at the tail)
+                 */
+                if 
(this.defaultMessageStore.getMessageStoreConfig().isForceVerifyPropCRC()) {
+                    int expectedCRC = -1;
+                    if (propertiesMap != null) {
+                        String crc32Str = 
propertiesMap.get(MessageConst.PROPERTY_CRC32);
+                        if (crc32Str != null) {
+                            expectedCRC = 0;
+                            for (int i = crc32Str.length() - 1; i >= 0; i--) {
+                                int num = crc32Str.charAt(i) - '0';
+                                expectedCRC *= 10;
+                                expectedCRC += num;
+                            }
+                        }
+                    }
+                    if (expectedCRC > 0) {
+                        ByteBuffer tmpBuffer = byteBuffer.duplicate();
+                        tmpBuffer.position(tmpBuffer.position() - totalSize);
+                        tmpBuffer.limit(tmpBuffer.position() + totalSize - 
CommitLog.CRC32_RESERVED_LEN);
+                        int crc = UtilAll.crc32(tmpBuffer);
+                        if (crc != expectedCRC) {
+                            log.warn(
+                                "CommitLog#checkAndDispatchMessage: failed to 
check message CRC, expected "
+                                    + "CRC={}, actual CRC={}", bodyCRC, crc);
+                            return new DispatchRequest(-1, false/* success */);
+                        }
+                    } else {
+                        log.warn(
+                            "CommitLog#checkAndDispatchMessage: failed to 
check message CRC, not found CRC in properties");
+                        return new DispatchRequest(-1, false/* success */);
+                    }
+                }
+            }
+
             int readLength = MessageExtEncoder.calMsgLength(messageVersion, 
sysFlag, bodyLen, topicLen, propertiesLength);
             if (totalSize != readLength) {
                 doNothingForDeadCode(reconsumeTimes);
@@ -846,9 +899,12 @@ public class CommitLog implements Swappable {
         if 
(!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
             msg.setStoreTimestamp(System.currentTimeMillis());
         }
-
         // Set the message body CRC (consider the most appropriate setting on 
the client)
         msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
+        if (enabledAppendPropCRC) {
+            // delete crc32 properties if exist
+            msg.deleteProperty(MessageConst.PROPERTY_CRC32);
+        }
         // Back to Results
         AppendMessageResult result = null;
 
@@ -1764,6 +1820,7 @@ public class CommitLog implements Swappable {
         private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
         // Store the message content
         private final ByteBuffer msgStoreItemMemory;
+        private final int crc32ReservedLength = CommitLog.CRC32_RESERVED_LEN;
 
         DefaultAppendMessageCallback() {
             this.msgStoreItemMemory = 
ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
@@ -1837,6 +1894,15 @@ public class CommitLog implements Swappable {
             pos += 8 + 4 + 8 + ipLen;
             // refresh store time stamp in lock
             preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
+            if (enabledAppendPropCRC) {
+                // 18 CRC32
+                int checkSize = msgLen - crc32ReservedLength;
+                ByteBuffer tmpBuffer = preEncodeBuffer.duplicate();
+                tmpBuffer.limit(tmpBuffer.position() + checkSize);
+                int crc32 = UtilAll.crc32(tmpBuffer);
+                tmpBuffer.limit(tmpBuffer.position() + crc32ReservedLength);
+                MessageDecoder.createCrc32(tmpBuffer, crc32);
+            }
 
             final long beginTimeMills = 
CommitLog.this.defaultMessageStore.now();
             
CommitLog.this.getMessageStore().getPerfCounter().startTick("WRITE_MEMORY_TIME_MS");
@@ -1918,6 +1984,15 @@ public class CommitLog implements Swappable {
                 pos += 8 + 4 + 8 + bornHostLength;
                 // refresh store time stamp in lock
                 messagesByteBuff.putLong(pos, 
messageExtBatch.getStoreTimestamp());
+                if (enabledAppendPropCRC) {
+                    //append crc32
+                    int checkSize = msgLen - crc32ReservedLength;
+                    ByteBuffer tmpBuffer = messagesByteBuff.duplicate();
+                    tmpBuffer.position(msgPos).limit(msgPos + checkSize);
+                    int crc32 = UtilAll.crc32(tmpBuffer);
+                    messagesByteBuff.position(msgPos + checkSize);
+                    MessageDecoder.createCrc32(messagesByteBuff, crc32);
+                }
 
                 putMessageContext.getPhyPos()[index++] = wroteOffset + 
totalMsgLen - msgLen;
                 queueOffset++;
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
index ee609a337b..c1d8087285 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.store;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.UnpooledByteBufAllocator;
+import java.nio.ByteBuffer;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageDecoder;
@@ -29,8 +30,6 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-
 public class MessageExtEncoder {
     protected static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
     private ByteBuf byteBuf;
@@ -38,7 +37,13 @@ public class MessageExtEncoder {
     private int maxMessageBodySize;
     // The maximum length of the full message.
     private int maxMessageSize;
+    private final int crc32ReservedLength;
+
     public MessageExtEncoder(final int maxMessageBodySize) {
+        this(maxMessageBodySize, false);
+    }
+
+    public MessageExtEncoder(final int maxMessageBodySize, boolean 
enabledAppendPropCRC) {
         ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
         //Reserve 64kb for encoding buffer outside body
         int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 
1024 ?
@@ -46,6 +51,7 @@ public class MessageExtEncoder {
         byteBuf = alloc.directBuffer(maxMessageSize);
         this.maxMessageBodySize = maxMessageBodySize;
         this.maxMessageSize = maxMessageSize;
+        this.crc32ReservedLength = enabledAppendPropCRC ? 
CommitLog.CRC32_RESERVED_LEN : 0;
     }
 
     public static int calMsgLength(MessageVersion messageVersion,
@@ -81,10 +87,13 @@ public class MessageExtEncoder {
         final byte[] propertiesData =
             msgInner.getPropertiesString() == null ? null : 
msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
 
-        final int propertiesLength = propertiesData == null ? 0 : 
propertiesData.length;
+        boolean needAppendLastPropertySeparator = crc32ReservedLength > 0 && 
propertiesData != null && propertiesData.length > 0
+            && propertiesData[propertiesData.length - 1] != 
MessageDecoder.PROPERTY_SEPARATOR;
+
+        final int propertiesLength = (propertiesData == null ? 0 : 
propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + 
crc32ReservedLength;
 
         if (propertiesLength > Short.MAX_VALUE) {
-            log.warn("putMessage message properties length too long. 
length={}", propertiesData.length);
+            log.warn("putMessage message properties length too long. 
length={}", propertiesLength);
             return new 
PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
         }
 
@@ -160,8 +169,14 @@ public class MessageExtEncoder {
 
         // 17 PROPERTIES
         this.byteBuf.writeShort((short) propertiesLength);
-        if (propertiesLength > 0)
+        if (propertiesLength > crc32ReservedLength) {
             this.byteBuf.writeBytes(propertiesData);
+        }
+        if (needAppendLastPropertySeparator) {
+            this.byteBuf.writeByte((byte) MessageDecoder.PROPERTY_SEPARATOR);
+        }
+        // 18 CRC32
+        this.byteBuf.writerIndex(this.byteBuf.writerIndex() + 
crc32ReservedLength);
 
         return null;
     }
@@ -213,10 +228,11 @@ public class MessageExtEncoder {
             final byte[] topicData = 
messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
 
             final int topicLength = topicData.length;
-            final int topicLengthSize = 
messageExtBatch.getVersion().getTopicLengthSize();
             int totalPropLen = needAppendLastPropertySeparator ?
-                propertiesLen + batchPropLen + topicLengthSize : propertiesLen 
+ batchPropLen;
+                propertiesLen + batchPropLen + 1 : propertiesLen + 
batchPropLen;
 
+            // properties need to add crc32
+            totalPropLen += crc32ReservedLength;
             final int msgLen = calMsgLength(
                 messageExtBatch.getVersion(), messageExtBatch.getSysFlag(), 
bodyLen, topicLength, totalPropLen);
 
@@ -278,6 +294,7 @@ public class MessageExtEncoder {
                 }
                 this.byteBuf.writeBytes(batchPropData, 0, batchPropLen);
             }
+            this.byteBuf.writerIndex(this.byteBuf.writerIndex() + 
crc32ReservedLength);
         }
         putMessageContext.setBatchSize(batchSize);
         putMessageContext.setPhyPos(new long[batchSize]);
@@ -304,8 +321,13 @@ public class MessageExtEncoder {
     static class PutMessageThreadLocal {
         private final MessageExtEncoder encoder;
         private final StringBuilder keyBuilder;
+
         PutMessageThreadLocal(int size) {
-            encoder = new MessageExtEncoder(size);
+            this(size, false);
+        }
+
+        PutMessageThreadLocal(int size, boolean enabledAppendPropCRC) {
+            encoder = new MessageExtEncoder(size, enabledAppendPropCRC);
             keyBuilder = new StringBuilder();
         }
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 028facbdc6..8cb3ea6e9e 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -270,6 +270,12 @@ public class MessageStoreConfig {
      */
     private boolean autoMessageVersionOnTopicLen = true;
 
+    /**
+     * It cannot be changed after the broker is started.
+     * Modifications need to be restarted to take effect.
+     */
+    private boolean enabledAppendPropCRC = false;
+    private boolean forceVerifyPropCRC = false;
     private int travelCqFileNumWhenGetMessage = 1;
     // Sleep interval between to corrections
     private int correctLogicMinOffsetSleepInterval = 1;
@@ -405,6 +411,14 @@ public class MessageStoreConfig {
 
     private int topicQueueLockNum = 32;
 
+    public boolean isEnabledAppendPropCRC() {
+        return enabledAppendPropCRC;
+    }
+
+    public void setEnabledAppendPropCRC(boolean enabledAppendPropCRC) {
+        this.enabledAppendPropCRC = enabledAppendPropCRC;
+    }
+
     public boolean isDebugLockEnable() {
         return debugLockEnable;
     }
@@ -640,6 +654,15 @@ public class MessageStoreConfig {
         this.checkCRCOnRecover = checkCRCOnRecover;
     }
 
+    public boolean isForceVerifyPropCRC() {
+        return forceVerifyPropCRC;
+    }
+
+    public void setForceVerifyPropCRC(boolean forceVerifyPropCRC) {
+        this.forceVerifyPropCRC = forceVerifyPropCRC;
+    }
+
+
     public String getStorePathCommitLog() {
         if (storePathCommitLog == null) {
             return storePathRootDir + File.separator + "commitlog";
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java 
b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
new file mode 100644
index 0000000000..c8ed4d74db
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AppendPropCRCTest {
+
+    AppendMessageCallback callback;
+
+    MessageExtEncoder encoder;
+
+    CommitLog commitLog;
+
+    @Before
+    public void init() throws Exception {
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setMappedFileSizeCommitLog(1024 * 8);
+        messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4);
+        messageStoreConfig.setMaxHashSlotNum(100);
+        messageStoreConfig.setMaxIndexNum(100 * 10);
+        
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + 
File.separator + "unitteststore");
+        
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + 
File.separator + "unitteststore" + File.separator + "commitlog");
+        messageStoreConfig.setForceVerifyPropCRC(true);
+        messageStoreConfig.setEnabledAppendPropCRC(true);
+        //too much reference
+        DefaultMessageStore messageStore = new 
DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new 
ConcurrentHashMap<>());
+        commitLog = new CommitLog(messageStore);
+        encoder = new MessageExtEncoder(10 * 1024 * 1024, true);
+        callback = commitLog.new DefaultAppendMessageCallback();
+    }
+
+    @After
+    public void destroy() {
+        UtilAll.deleteFile(new File(System.getProperty("user.home") + 
File.separator + "unitteststore"));
+    }
+
+    @Test
+    public void testAppendMessageSucc() throws Exception {
+        String topic = "test-topic";
+        int queue = 0;
+        int msgNum = 10;
+        int propertiesLen = 0;
+        Message msg = new Message();
+        msg.setBody("body".getBytes());
+        msg.setTopic(topic);
+        msg.setTags("abc");
+        msg.putUserProperty("a", "aaaaaaaa");
+        msg.putUserProperty("b", "bbbbbbbb");
+        msg.putUserProperty("c", "cccccccc");
+        msg.putUserProperty("d", "dddddddd");
+        msg.putUserProperty("e", "eeeeeeee");
+        msg.putUserProperty("f", "ffffffff");
+
+        MessageExtBrokerInner messageExtBrokerInner = new 
MessageExtBrokerInner();
+        messageExtBrokerInner.setTopic(topic);
+        messageExtBrokerInner.setQueueId(queue);
+        messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis());
+        messageExtBrokerInner.setBornHost(new InetSocketAddress("127.0.0.1", 
123));
+        messageExtBrokerInner.setStoreHost(new InetSocketAddress("127.0.0.1", 
124));
+        messageExtBrokerInner.setBody(msg.getBody());
+        
messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+        propertiesLen = messageExtBrokerInner.getPropertiesString().length();
+
+        ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
+        for (int i = 0; i < msgNum; i++) {
+            encoder.encode(messageExtBrokerInner);
+            messageExtBrokerInner.setEncodedBuff(encoder.getEncoderBuffer());
+            AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 
10, messageExtBrokerInner, null);
+            assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
+        }
+        // Expected to pass when message is not modified
+        buff.flip();
+        for (int i = 0; i < msgNum - 1; i++) {
+            DispatchRequest request = 
commitLog.checkMessageAndReturnSize(buff, true, false);
+            assertTrue(request.isSuccess());
+        }
+        // Modify the properties of the last message and expect the 
verification to fail.
+        int idx = buff.limit() - (propertiesLen / 2);
+        buff.put(idx, (byte) (buff.get(idx) + 1));
+        DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, 
true, false);
+        assertFalse(request.isSuccess());
+    }
+
+    @Test
+    public void testAppendMessageBatchSucc() throws Exception {
+        List<Message> messages = new ArrayList<>();
+        String topic = "test-topic";
+        int queue = 0;
+        int propertiesLen = 0;
+        for (int i = 0; i < 10; i++) {
+            Message msg = new Message();
+            msg.setBody("body".getBytes());
+            msg.setTopic(topic);
+            msg.setTags("abc");
+            msg.putUserProperty("a", "aaaaaaaa");
+            msg.putUserProperty("b", "bbbbbbbb");
+            msg.putUserProperty("c", "cccccccc");
+            msg.putUserProperty("d", "dddddddd");
+            msg.putUserProperty("e", "eeeeeeee");
+            msg.putUserProperty("f", "ffffffff");
+            String propertiesString = 
MessageDecoder.messageProperties2String(msg.getProperties());
+            propertiesLen = propertiesString.length();
+            messages.add(msg);
+        }
+        MessageExtBatch messageExtBatch = new MessageExtBatch();
+        messageExtBatch.setTopic(topic);
+        messageExtBatch.setQueueId(queue);
+        messageExtBatch.setBornTimestamp(System.currentTimeMillis());
+        messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 123));
+        messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124));
+        messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
+
+        PutMessageContext putMessageContext = new PutMessageContext(topic + 
"-" + queue);
+        messageExtBatch.setEncodedBuff(encoder.encode(messageExtBatch, 
putMessageContext));
+        ByteBuffer buff = ByteBuffer.allocate(1024 * 10);
+        //encounter end of file when append half of the data
+        AppendMessageResult allresult =
+            callback.doAppend(0, buff, 1024 * 10, messageExtBatch, 
putMessageContext);
+
+        assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus());
+        assertEquals(0, allresult.getWroteOffset());
+        assertEquals(0, allresult.getLogicsOffset());
+        assertEquals(buff.position(), allresult.getWroteBytes());
+
+        assertEquals(messages.size(), allresult.getMsgNum());
+
+        Set<String> msgIds = new HashSet<>();
+        for (String msgId : allresult.getMsgId().split(",")) {
+            assertEquals(32, msgId.length());
+            msgIds.add(msgId);
+        }
+        assertEquals(messages.size(), msgIds.size());
+
+        List<MessageExt> decodeMsgs = MessageDecoder.decodes((ByteBuffer) 
buff.flip());
+        assertEquals(decodeMsgs.size(), decodeMsgs.size());
+        long queueOffset = decodeMsgs.get(0).getQueueOffset();
+        long storeTimeStamp = decodeMsgs.get(0).getStoreTimestamp();
+        for (int i = 0; i < messages.size(); i++) {
+            assertEquals(messages.get(i).getTopic(), 
decodeMsgs.get(i).getTopic());
+            assertEquals(new String(messages.get(i).getBody()), new 
String(decodeMsgs.get(i).getBody()));
+            assertEquals(messages.get(i).getTags(), 
decodeMsgs.get(i).getTags());
+
+            assertEquals(messageExtBatch.getBornHostNameString(), 
decodeMsgs.get(i).getBornHostNameString());
+
+            assertEquals(messageExtBatch.getBornTimestamp(), 
decodeMsgs.get(i).getBornTimestamp());
+            assertEquals(storeTimeStamp, 
decodeMsgs.get(i).getStoreTimestamp());
+            assertEquals(queueOffset++, decodeMsgs.get(i).getQueueOffset());
+        }
+
+        // Expected to pass when message is not modified
+        buff.flip();
+        for (int i = 0; i < messages.size() - 1; i++) {
+            DispatchRequest request = 
commitLog.checkMessageAndReturnSize(buff, true, false);
+            assertTrue(request.isSuccess());
+        }
+        // Modify the properties of the last message and expect the 
verification to fail.
+        int idx = buff.limit() - (propertiesLen / 2);
+        buff.put(idx, (byte) (buff.get(idx) + 1));
+        DispatchRequest request = commitLog.checkMessageAndReturnSize(buff, 
true, false);
+        assertFalse(request.isSuccess());
+    }
+}
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java 
b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
index 43ca38eb48..768029ca1a 100644
--- a/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/BatchPutMessageTest.java
@@ -108,7 +108,7 @@ public class BatchPutMessageTest {
             short propertiesLength = (short) propertiesBytes.length;
             final byte[] topicData = 
msg.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
             final int topicLength = topicData.length;
-            msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, 
propertiesLength + batchPropLen + 1) + msgLengthArr[j - 1];
+            msgLengthArr[j] = calMsgLength(msg.getBody().length, topicLength, 
propertiesLength + batchPropLen) + msgLengthArr[j - 1];
             j++;
         }
         byte[] batchMessageBody = MessageDecoder.encodeMessages(messages);
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java 
b/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java
new file mode 100644
index 0000000000..415dc38117
--- /dev/null
+++ 
b/store/src/test/java/org/apache/rocketmq/store/MessageExtBrokerInnerTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MessageExtBrokerInnerTest {
+    @Test
+    public void testDeleteProperty() {
+        MessageExtBrokerInner messageExtBrokerInner = new 
MessageExtBrokerInner();
+        String propertiesString = "";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+        propertiesString = "KeyA\u0001ValueA";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+        propertiesString = "KeyA\u0001ValueA\u0002";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+        propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+        propertiesString = "KeyA\u0001ValueA\u0002KeyA\u0001ValueA\u0002";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("");
+
+        propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+        propertiesString = "KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+        propertiesString = 
"KeyB\u0001ValueB\u0002KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002KeyB\u0001ValueB\u0002");
+
+        propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB\u0002";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB\u0002");
+
+        propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueB";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueB");
+
+        propertiesString = 
"KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001ValueA\u0002";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001ValueA\u0002");
+
+        propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA\u0001";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA\u0001");
+
+        propertiesString = "KeyA\u0001ValueA\u0002KeyB\u0001ValueBKeyA";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("KeyA");
+        
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo("KeyB\u0001ValueBKeyA");
+
+        propertiesString = "__CRC32#\u0001";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("__CRC32#");
+        assertThat(messageExtBrokerInner.getPropertiesString()).isEmpty();
+
+        propertiesString = "__CRC32#";
+        messageExtBrokerInner.setPropertiesString(propertiesString);
+        messageExtBrokerInner.deleteProperty("__CRC32#");
+        
assertThat(messageExtBrokerInner.getPropertiesString()).isEqualTo(propertiesString);
+    }
+
+}
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index db5c5af4cd..7d659d2f6a 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -465,7 +465,7 @@ public class AutoSwitchHATest {
 
         // Step2: check flag SynchronizingSyncStateSet
         Assert.assertTrue(masterHAService.isSynchronizingSyncStateSet());
-        Assert.assertEquals(this.messageStore1.getConfirmOffset(), 1570);
+        Assert.assertEquals(this.messageStore1.getConfirmOffset(), 1580);
         Set<Long> syncStateSet = masterHAService.getSyncStateSet();
         Assert.assertEquals(syncStateSet.size(), 2);
         Assert.assertTrue(syncStateSet.contains(1L));
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
index 2e028ada32..5884243048 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
@@ -74,7 +74,7 @@ public class CompositeQueueFlatFileTest {
         ByteBuffer message = MessageBufferUtilTest.buildMockedMessageBuffer();
         AppendResult result = flatFile.appendCommitLog(message);
         Assert.assertEquals(AppendResult.SUCCESS, result);
-        Assert.assertEquals(122L, 
flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
+        Assert.assertEquals(123L, 
flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
         Assert.assertEquals(0L, 
flatFile.commitLog.getFlatFile().getFileToWrite().getCommitPosition());
 
         flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq);
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
index 1f38d4f6c5..a413f2113e 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
@@ -47,7 +47,7 @@ public class MessageBufferUtilTest {
         + 8 //Prepared Transaction Offset
         + 4 + 0 //BODY
         + 2 + 0 //TOPIC
-        + 2 + 30 //properties
+        + 2 + 31 //properties
         + 0;
 
     public static ByteBuffer buildMockedMessageBuffer() {

Reply via email to