This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new ead3d90501 [ISSUE #7511] Lock granularity issue causing LMQ message 
loss (#7525)
ead3d90501 is described below

commit ead3d905016d9db4785a46beaa555c7fafd4f9bb
Author: Dongyuan Pan <dongyuanp...@gmail.com>
AuthorDate: Wed Nov 8 10:40:52 2023 +0800

    [ISSUE #7511] Lock granularity issue causing LMQ message loss (#7525)
    
    * bug fix: assignOffset and increaseOffset in LMQ has concurrency issues in 
topicQueueLock, should be in putMessageLock
    
    * fix MultiDispatchTest
    
    * fix MultiDispatchTest
    
    * fix unit test
---
 .../common/message/MessageExtBrokerInner.java      |  10 ++
 .../java/org/apache/rocketmq/store/CommitLog.java  |  94 ++++++++++++++--
 .../org/apache/rocketmq/store/ConsumeQueue.java    |  44 +-------
 .../apache/rocketmq/store/DefaultMessageStore.java |   1 -
 .../apache/rocketmq/store/MessageExtEncoder.java   | 118 ++++++++++++++++++---
 .../org/apache/rocketmq/store/MultiDispatch.java   |  77 ++++++++++++++
 .../store/queue/AbstractConsumeQueueStore.java     |  10 ++
 .../store/queue/ConsumeQueueInterface.java         |   1 -
 .../store/queue/ConsumeQueueStoreInterface.java    |  14 +++
 ...{MultiDispatch.java => MultiDispatchUtils.java} |  17 +--
 .../rocketmq/store/queue/QueueOffsetOperator.java  |   6 +-
 .../rocketmq/store/queue/RocksDBConsumeQueue.java  |  42 --------
 .../apache/rocketmq/store/AppendCallbackTest.java  |   6 +-
 .../apache/rocketmq/store/AppendPropCRCTest.java   |   5 +-
 .../apache/rocketmq/store/MultiDispatchTest.java   |  12 +--
 .../rocketmq/store/kv/CompactionLogTest.java       |   2 +-
 16 files changed, 322 insertions(+), 137 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
index 52501dbca0..147f23f123 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java
@@ -28,6 +28,8 @@ public class MessageExtBrokerInner extends MessageExt {
 
     private ByteBuffer encodedBuff;
 
+    private volatile boolean encodeCompleted;
+
     private MessageVersion version = MessageVersion.MESSAGE_VERSION_V1;
 
     public ByteBuffer getEncodedBuff() {
@@ -92,4 +94,12 @@ public class MessageExtBrokerInner extends MessageExt {
             
this.setPropertiesString(MessageDecoder.messageProperties2String(this.getProperties()));
         }
     }
+
+    public boolean isEncodeCompleted() {
+        return encodeCompleted;
+    }
+
+    public void setEncodeCompleted(boolean encodeCompleted) {
+        this.encodeCompleted = encodeCompleted;
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 6c3afde70f..35c1d0e2d7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -35,6 +35,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import com.sun.jna.NativeLong;
 import com.sun.jna.Pointer;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.SystemClock;
@@ -56,6 +57,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.store.MessageExtEncoder.PutMessageThreadLocal;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.ha.HAService;
 import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
 import org.apache.rocketmq.store.logfile.MappedFile;
@@ -101,6 +103,7 @@ public class CommitLog implements Swappable {
     protected int commitLogSize;
 
     private final boolean enabledAppendPropCRC;
+    protected final MultiDispatch multiDispatch;
 
     public CommitLog(final DefaultMessageStore messageStore) {
         String storePath = 
messageStore.getMessageStoreConfig().getStorePathCommitLog();
@@ -119,13 +122,11 @@ public class CommitLog implements Swappable {
         this.flushManager = new DefaultFlushManager();
         this.coldDataCheckService = new ColdDataCheckService();
 
-        this.appendMessageCallback = new DefaultAppendMessageCallback();
+        this.appendMessageCallback = new 
DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig());
         putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
             @Override
             protected PutMessageThreadLocal initialValue() {
-                return new PutMessageThreadLocal(
-                    
defaultMessageStore.getMessageStoreConfig().getMaxMessageSize(),
-                    
defaultMessageStore.getMessageStoreConfig().isEnabledAppendPropCRC());
+                return new 
PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig());
             }
         };
         this.putMessageLock = 
messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new 
PutMessageReentrantLock() : new PutMessageSpinLock();
@@ -137,6 +138,8 @@ public class CommitLog implements Swappable {
         this.commitLogSize = 
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
 
         this.enabledAppendPropCRC = 
messageStore.getMessageStoreConfig().isEnabledAppendPropCRC();
+
+        this.multiDispatch = new MultiDispatch(defaultMessageStore);
     }
 
     public void setFullStorePaths(Set<String> fullStorePaths) {
@@ -1830,15 +1833,84 @@ public class CommitLog implements Swappable {
         // Store the message content
         private final ByteBuffer msgStoreItemMemory;
         private final int crc32ReservedLength = CommitLog.CRC32_RESERVED_LEN;
+        private final MessageStoreConfig messageStoreConfig;
 
-        DefaultAppendMessageCallback() {
+        DefaultAppendMessageCallback(MessageStoreConfig messageStoreConfig) {
             this.msgStoreItemMemory = 
ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH);
+            this.messageStoreConfig = messageStoreConfig;
+        }
+
+        public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer 
preEncodeBuffer, final MessageExtBrokerInner msgInner) {
+            if (msgInner.isEncodeCompleted()) {
+                return null;
+            }
+
+            multiDispatch.wrapMultiDispatch(msgInner);
+
+            
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+
+            final byte[] propertiesData =
+                    msgInner.getPropertiesString() == null ? null : 
msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
+
+            boolean needAppendLastPropertySeparator = enabledAppendPropCRC && 
propertiesData != null && propertiesData.length > 0
+                    && propertiesData[propertiesData.length - 1] != 
MessageDecoder.PROPERTY_SEPARATOR;
+
+            final int propertiesLength = (propertiesData == null ? 0 : 
propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + 
crc32ReservedLength;
+
+            if (propertiesLength > Short.MAX_VALUE) {
+                log.warn("putMessage message properties length too long. 
length={}", propertiesData.length);
+                return new 
AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
+            }
+
+            int msgLenWithoutProperties = preEncodeBuffer.getInt(0);
+
+            int msgLen = msgLenWithoutProperties + 2 + propertiesLength;
+
+            // Exceeds the maximum message
+            if (msgLen > this.messageStoreConfig.getMaxMessageSize()) {
+                log.warn("message size exceeded, msg total size: " + msgLen + 
", maxMessageSize: " + this.messageStoreConfig.getMaxMessageSize());
+                return new 
AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
+            }
+
+            // Back filling total message length
+            preEncodeBuffer.putInt(0, msgLen);
+            // Modify position to msgLenWithoutProperties
+            preEncodeBuffer.position(msgLenWithoutProperties);
+
+            preEncodeBuffer.putShort((short) propertiesLength);
+
+            if (propertiesLength > crc32ReservedLength) {
+                preEncodeBuffer.put(propertiesData);
+            }
+
+            if (needAppendLastPropertySeparator) {
+                preEncodeBuffer.put((byte) MessageDecoder.PROPERTY_SEPARATOR);
+            }
+            // 18 CRC32
+            preEncodeBuffer.position(preEncodeBuffer.position() + 
crc32ReservedLength);
+
+            msgInner.setEncodeCompleted(true);
+
+            return null;
         }
 
         public AppendMessageResult doAppend(final long fileFromOffset, final 
ByteBuffer byteBuffer, final int maxBlank,
             final MessageExtBrokerInner msgInner, PutMessageContext 
putMessageContext) {
             // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
 
+            ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
+            boolean isMultiDispatchMsg = 
messageStoreConfig.isEnableMultiDispatch() && 
CommitLog.isMultiDispatchMsg(msgInner);
+            if (isMultiDispatchMsg) {
+                AppendMessageResult appendMessageResult = 
handlePropertiesForLmqMsg(preEncodeBuffer, msgInner);
+                if (appendMessageResult != null) {
+                    return appendMessageResult;
+                }
+            }
+
+            final int msgLen = preEncodeBuffer.getInt(0);
+            preEncodeBuffer.position(0);
+            preEncodeBuffer.limit(msgLen);
+
             // PHY OFFSET
             long wroteOffset = fileFromOffset + byteBuffer.position();
 
@@ -1872,9 +1944,6 @@ public class CommitLog implements Swappable {
                     break;
             }
 
-            ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
-            final int msgLen = preEncodeBuffer.getInt(0);
-
             // Determines whether there is sufficient free space
             if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                 this.msgStoreItemMemory.clear();
@@ -1919,6 +1988,11 @@ public class CommitLog implements Swappable {
             byteBuffer.put(preEncodeBuffer);
             
CommitLog.this.getMessageStore().getPerfCounter().endTick("WRITE_MEMORY_TIME_MS");
             msgInner.setEncodedBuff(null);
+
+            if (isMultiDispatchMsg) {
+                CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
+            }
+
             return new AppendMessageResult(AppendMessageStatus.PUT_OK, 
wroteOffset, msgLen, msgIdSupplier,
                 msgInner.getStoreTimestamp(), queueOffset, 
CommitLog.this.defaultMessageStore.now() - beginTimeMills, messageNum);
         }
@@ -2159,6 +2233,10 @@ public class CommitLog implements Swappable {
         return flushManager;
     }
 
+    public static boolean isMultiDispatchMsg(MessageExtBrokerInner msg) {
+        return 
StringUtils.isNoneBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))
 && !msg.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
+    }
+
     private boolean isCloseReadAhead() {
         return !MixAll.isWindows() && 
!defaultMessageStore.getMessageStoreConfig().isDataReadAheadEnable();
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 623509c8bf..453c9d1dc7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -27,7 +27,6 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -38,7 +37,7 @@ import org.apache.rocketmq.store.logfile.MappedFile;
 import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 import org.apache.rocketmq.store.queue.CqUnit;
 import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
-import org.apache.rocketmq.store.queue.MultiDispatch;
+import org.apache.rocketmq.store.queue.MultiDispatchUtils;
 import org.apache.rocketmq.store.queue.QueueOffsetOperator;
 import org.apache.rocketmq.store.queue.ReferredIterator;
 
@@ -702,7 +701,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, 
FileQueueLifeCycle {
                     
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
                 }
                 
this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
-                if 
(MultiDispatch.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(),
 request)) {
+                if 
(MultiDispatchUtils.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(),
 request)) {
                     multiDispatchLmqQueue(request, maxRetries);
                 }
                 return;
@@ -776,28 +775,6 @@ public class ConsumeQueue implements 
ConsumeQueueInterface, FileQueueLifeCycle {
         String topicQueueKey = getTopic() + "-" + getQueueId();
         long queueOffset = queueOffsetOperator.getQueueOffset(topicQueueKey);
         msg.setQueueOffset(queueOffset);
-
-
-        // Handling the multi dispatch message. In the context of a light 
message queue (as defined in RIP-28),
-        // light message queues are constructed based on message properties, 
which requires special handling of queue offset of the light message queue.
-        if 
(!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(),
 msg.getTopic())) {
-            return;
-        }
-        String multiDispatchQueue = 
msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
-        if (StringUtils.isBlank(multiDispatchQueue)) {
-            return;
-        }
-        String[] queues = 
multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
-        Long[] queueOffsets = new Long[queues.length];
-        for (int i = 0; i < queues.length; i++) {
-            if (this.messageStore.getMessageStoreConfig().isEnableLmq() && 
MixAll.isLmq(queues[i])) {
-                String key = MultiDispatch.lmqQueueKey(queues[i]);
-                queueOffsets[i] = queueOffsetOperator.getLmqOffset(key);
-            }
-        }
-        MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
-            StringUtils.join(queueOffsets, 
MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
-        msg.removeWaitStorePropertyString();
     }
 
     @Override
@@ -805,23 +782,6 @@ public class ConsumeQueue implements 
ConsumeQueueInterface, FileQueueLifeCycle {
         short messageNum) {
         String topicQueueKey = getTopic() + "-" + getQueueId();
         queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
-
-        // Handling the multi dispatch message. In the context of a light 
message queue (as defined in RIP-28),
-        // light message queues are constructed based on message properties, 
which requires special handling of queue offset of the light message queue.
-        if 
(!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(),
 msg.getTopic())) {
-            return;
-        }
-        String multiDispatchQueue = 
msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
-        if (StringUtils.isBlank(multiDispatchQueue)) {
-            return;
-        }
-        String[] queues = 
multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
-        for (int i = 0; i < queues.length; i++) {
-            if (this.messageStore.getMessageStoreConfig().isEnableLmq() && 
MixAll.isLmq(queues[i])) {
-                String key = MultiDispatch.lmqQueueKey(queues[i]);
-                queueOffsetOperator.increaseLmqOffset(key, (short) 1);
-            }
-        }
     }
 
     private boolean putMessagePositionInfo(final long offset, final int size, 
final long tagsCode,
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 99a54e2d7f..dc5f312e5a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2112,7 +2112,6 @@ public class DefaultMessageStore implements MessageStore {
         }
     }
 
-
     @Override
     public void increaseOffset(MessageExtBrokerInner msg, short messageNum) {
         final int tranType = 
MessageSysFlag.getTransactionValue(msg.getSysFlag());
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
index c1d8087285..20e9a652b7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtEncoder.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.message.MessageVersion;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
 
 public class MessageExtEncoder {
     protected static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -38,20 +39,22 @@ public class MessageExtEncoder {
     // The maximum length of the full message.
     private int maxMessageSize;
     private final int crc32ReservedLength;
+    private MessageStoreConfig messageStoreConfig;
 
-    public MessageExtEncoder(final int maxMessageBodySize) {
-        this(maxMessageBodySize, false);
+    public MessageExtEncoder(final int maxMessageBodySize, final 
MessageStoreConfig messageStoreConfig) {
+        this(messageStoreConfig);
     }
 
-    public MessageExtEncoder(final int maxMessageBodySize, boolean 
enabledAppendPropCRC) {
+    public MessageExtEncoder(final MessageStoreConfig messageStoreConfig) {
         ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
+        this.messageStoreConfig = messageStoreConfig;
+        this.maxMessageBodySize = messageStoreConfig.getMaxMessageSize();
         //Reserve 64kb for encoding buffer outside body
         int maxMessageSize = Integer.MAX_VALUE - maxMessageBodySize >= 64 * 
1024 ?
             maxMessageBodySize + 64 * 1024 : Integer.MAX_VALUE;
         byteBuf = alloc.directBuffer(maxMessageSize);
-        this.maxMessageBodySize = maxMessageBodySize;
         this.maxMessageSize = maxMessageSize;
-        this.crc32ReservedLength = enabledAppendPropCRC ? 
CommitLog.CRC32_RESERVED_LEN : 0;
+        this.crc32ReservedLength = messageStoreConfig.isEnabledAppendPropCRC() 
? CommitLog.CRC32_RESERVED_LEN : 0;
     }
 
     public static int calMsgLength(MessageVersion messageVersion,
@@ -79,8 +82,103 @@ public class MessageExtEncoder {
             + 2 + (Math.max(propertiesLength, 0)); //propertiesLength
     }
 
+    public static int calMsgLengthNoProperties(MessageVersion messageVersion,
+                                               int sysFlag, int bodyLength, 
int topicLength) {
+
+        int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 
? 8 : 20;
+        int storehostAddressLength = (sysFlag & 
MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 8 : 20;
+
+        return 4 //TOTALSIZE
+                + 4 //MAGICCODE
+                + 4 //BODYCRC
+                + 4 //QUEUEID
+                + 4 //FLAG
+                + 8 //QUEUEOFFSET
+                + 8 //PHYSICALOFFSET
+                + 4 //SYSFLAG
+                + 8 //BORNTIMESTAMP
+                + bornhostLength //BORNHOST
+                + 8 //STORETIMESTAMP
+                + storehostAddressLength //STOREHOSTADDRESS
+                + 4 //RECONSUMETIMES
+                + 8 //Prepared Transaction Offset
+                + 4 + (Math.max(bodyLength, 0)) //BODY
+                + messageVersion.getTopicLengthSize() + topicLength; //TOPIC
+    }
+
+    public PutMessageResult encodeWithoutProperties(MessageExtBrokerInner 
msgInner) {
+
+        final byte[] topicData = 
msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
+        final int topicLength = topicData.length;
+
+        final int bodyLength = msgInner.getBody() == null ? 0 : 
msgInner.getBody().length;
+
+        // Exceeds the maximum message body
+        if (bodyLength > this.maxMessageBodySize) {
+            CommitLog.log.warn("message body size exceeded, msg body size: " + 
bodyLength
+                    + ", maxMessageSize: " + this.maxMessageBodySize);
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, 
null);
+        }
+
+        final int msgLenNoProperties = 
calMsgLengthNoProperties(msgInner.getVersion(), msgInner.getSysFlag(), 
bodyLength, topicLength);
+
+        // 1 TOTALSIZE
+        this.byteBuf.writeInt(msgLenNoProperties);
+        // 2 MAGICCODE
+        this.byteBuf.writeInt(msgInner.getVersion().getMagicCode());
+        // 3 BODYCRC
+        this.byteBuf.writeInt(msgInner.getBodyCRC());
+        // 4 QUEUEID
+        this.byteBuf.writeInt(msgInner.getQueueId());
+        // 5 FLAG
+        this.byteBuf.writeInt(msgInner.getFlag());
+        // 6 QUEUEOFFSET, need update later
+        this.byteBuf.writeLong(0);
+        // 7 PHYSICALOFFSET, need update later
+        this.byteBuf.writeLong(0);
+        // 8 SYSFLAG
+        this.byteBuf.writeInt(msgInner.getSysFlag());
+        // 9 BORNTIMESTAMP
+        this.byteBuf.writeLong(msgInner.getBornTimestamp());
+
+        // 10 BORNHOST
+        ByteBuffer bornHostBytes = msgInner.getBornHostBytes();
+        this.byteBuf.writeBytes(bornHostBytes.array());
+
+        // 11 STORETIMESTAMP
+        this.byteBuf.writeLong(msgInner.getStoreTimestamp());
+
+        // 12 STOREHOSTADDRESS
+        ByteBuffer storeHostBytes = msgInner.getStoreHostBytes();
+        this.byteBuf.writeBytes(storeHostBytes.array());
+
+        // 13 RECONSUMETIMES
+        this.byteBuf.writeInt(msgInner.getReconsumeTimes());
+        // 14 Prepared Transaction Offset
+        this.byteBuf.writeLong(msgInner.getPreparedTransactionOffset());
+        // 15 BODY
+        this.byteBuf.writeInt(bodyLength);
+        if (bodyLength > 0)
+            this.byteBuf.writeBytes(msgInner.getBody());
+
+        // 16 TOPIC
+        if (MessageVersion.MESSAGE_VERSION_V2.equals(msgInner.getVersion())) {
+            this.byteBuf.writeShort((short) topicLength);
+        } else {
+            this.byteBuf.writeByte((byte) topicLength);
+        }
+        this.byteBuf.writeBytes(topicData);
+
+        return null;
+    }
+
     public PutMessageResult encode(MessageExtBrokerInner msgInner) {
         this.byteBuf.clear();
+
+        if (messageStoreConfig.isEnableMultiDispatch() && 
CommitLog.isMultiDispatchMsg(msgInner)) {
+            return encodeWithoutProperties(msgInner);
+        }
+
         /**
          * Serialize message
          */
@@ -303,7 +401,7 @@ public class MessageExtEncoder {
     }
 
     public ByteBuffer getEncoderBuffer() {
-        return this.byteBuf.nioBuffer();
+        return this.byteBuf.nioBuffer(0, this.byteBuf.capacity());
     }
 
     public int getMaxMessageBodySize() {
@@ -322,12 +420,8 @@ public class MessageExtEncoder {
         private final MessageExtEncoder encoder;
         private final StringBuilder keyBuilder;
 
-        PutMessageThreadLocal(int size) {
-            this(size, false);
-        }
-
-        PutMessageThreadLocal(int size, boolean enabledAppendPropCRC) {
-            encoder = new MessageExtEncoder(size, enabledAppendPropCRC);
+        PutMessageThreadLocal(MessageStoreConfig messageStoreConfig) {
+            encoder = new MessageExtEncoder(messageStoreConfig);
             keyBuilder = new StringBuilder();
         }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java 
b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
new file mode 100644
index 0000000000..5bc587a8e0
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/MultiDispatch.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+
+/**
+ * MultiDispatch for lmq, not-thread-safe
+ */
+public class MultiDispatch {
+    private final StringBuilder keyBuilder = new StringBuilder();
+    private final DefaultMessageStore messageStore;
+    private static final short VALUE_OF_EACH_INCREMENT = 1;
+
+    public MultiDispatch(DefaultMessageStore messageStore) {
+        this.messageStore = messageStore;
+    }
+
+    public String queueKey(String queueName, MessageExtBrokerInner msgInner) {
+        keyBuilder.delete(0, keyBuilder.length());
+        keyBuilder.append(queueName);
+        keyBuilder.append('-');
+        int queueId = msgInner.getQueueId();
+        if (messageStore.getMessageStoreConfig().isEnableLmq() && 
MixAll.isLmq(queueName)) {
+            queueId = 0;
+        }
+        keyBuilder.append(queueId);
+        return keyBuilder.toString();
+    }
+
+    public void wrapMultiDispatch(final MessageExtBrokerInner msg) {
+
+        String multiDispatchQueue = 
msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+        String[] queues = 
multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+        Long[] queueOffsets = new Long[queues.length];
+        if (messageStore.getMessageStoreConfig().isEnableLmq()) {
+            for (int i = 0; i < queues.length; i++) {
+                String key = queueKey(queues[i], msg);
+                if (MixAll.isLmq(key)) {
+                    queueOffsets[i] = 
messageStore.getQueueStore().getLmqQueueOffset(key);
+                }
+            }
+        }
+        MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
+                StringUtils.join(queueOffsets, 
MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
+        msg.removeWaitStorePropertyString();
+    }
+
+    public void updateMultiQueueOffset(final MessageExtBrokerInner msgInner) {
+        String multiDispatchQueue = 
msgInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+        String[] queues = 
multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+        for (String queue : queues) {
+            String key = queueKey(queue, msgInner);
+            if (messageStore.getMessageStoreConfig().isEnableLmq() && 
MixAll.isLmq(key)) {
+                messageStore.getQueueStore().increaseLmqOffset(key, 
VALUE_OF_EACH_INCREMENT);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
index 30054fa509..d76b055773 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
@@ -74,6 +74,16 @@ public abstract class AbstractConsumeQueueStore implements 
ConsumeQueueStoreInte
         consumeQueue.increaseQueueOffset(this.queueOffsetOperator, msg, 
messageNum);
     }
 
+    @Override
+    public void increaseLmqOffset(String queueKey, short messageNum) {
+        queueOffsetOperator.increaseLmqOffset(queueKey, messageNum);
+    }
+
+    @Override
+    public long getLmqQueueOffset(String queueKey) {
+        return queueOffsetOperator.getLmqOffset(queueKey);
+    }
+
     @Override
     public void removeTopicQueueTable(String topic, Integer queueId) {
         this.queueOffsetOperator.remove(topic, queueId);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
index c65f2a68b0..768c782b1d 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
@@ -181,7 +181,6 @@ public interface ConsumeQueueInterface extends 
FileQueueLifeCycle {
      */
     void assignQueueOffset(QueueOffsetOperator queueOffsetAssigner, 
MessageExtBrokerInner msg) throws RocksDBException;
 
-
     /**
      * Increase queue offset.
      * @param queueOffsetAssigner the delegated queue offset assigner
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java
index 268803dcca..e68880a828 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStoreInterface.java
@@ -183,6 +183,20 @@ public interface ConsumeQueueStoreInterface {
      */
     void increaseQueueOffset(MessageExtBrokerInner msg, short messageNum);
 
+    /**
+     * Increase lmq offset
+     * @param queueKey
+     * @param messageNum
+     */
+    void increaseLmqOffset(String queueKey, short messageNum);
+
+    /**
+     * get lmq queue offset
+     * @param queueKey
+     * @return
+     */
+    long getLmqQueueOffset(String queueKey);
+
     /**
      * recover topicQueue table by minPhyOffset
      * @param minPhyOffset
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java
similarity index 78%
rename from 
store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java
rename to 
store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java
index d6291d9087..44397a2fce 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatch.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java
@@ -16,8 +16,6 @@
  */
 package org.apache.rocketmq.store.queue;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang3.StringUtils;
@@ -27,7 +25,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 
-public class MultiDispatch {
+public class MultiDispatchUtils {
 
     public static String lmqQueueKey(String queueName) {
         StringBuilder keyBuilder = new StringBuilder();
@@ -60,17 +58,4 @@ public class MultiDispatch {
         }
         return true;
     }
-
-    public static List<DispatchRequest> 
checkMultiDispatchQueue(MessageStoreConfig messageStoreConfig, 
List<DispatchRequest> dispatchRequests) {
-        if (!messageStoreConfig.isEnableMultiDispatch() || dispatchRequests == 
null || dispatchRequests.size() == 0) {
-            return null;
-        }
-        List<DispatchRequest> result = new ArrayList<>();
-        for (DispatchRequest dispatchRequest : dispatchRequests) {
-            if (checkMultiDispatchQueue(messageStoreConfig, dispatchRequest)) {
-                result.add(dispatchRequest);
-            }
-        }
-        return dispatchRequests;
-    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
index 8da3748281..5b4bf994e0 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetOperator.java
@@ -71,9 +71,9 @@ public class QueueOffsetOperator {
         return this.lmqTopicQueueTable.get(topicQueueKey);
     }
 
-    public void increaseLmqOffset(String topicQueueKey, short messageNum) {
-        Long lmqOffset = 
ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, topicQueueKey, 
k -> 0L);
-        this.lmqTopicQueueTable.put(topicQueueKey, lmqOffset + messageNum);
+    public void increaseLmqOffset(String queueKey, short messageNum) {
+        Long lmqOffset = 
ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, queueKey, k -> 
0L);
+        this.lmqTopicQueueTable.put(queueKey, lmqOffset + messageNum);
     }
 
     public long currentQueueOffset(String topicQueueKey) {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
index 759be395d5..5a981bb4df 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
@@ -19,14 +19,10 @@ package org.apache.rocketmq.store.queue;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.BoundaryType;
-import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.MessageAccessor;
-import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -217,50 +213,12 @@ public class RocksDBConsumeQueue implements 
ConsumeQueueInterface {
             queueOffsetOperator.updateQueueOffset(topicQueueKey, queueOffset);
         }
         msg.setQueueOffset(queueOffset);
-
-        // Handling the multi dispatch message. In the context of a light 
message queue (as defined in RIP-28),
-        // light message queues are constructed based on message properties, 
which requires special handling of queue offset of the light message queue.
-        if 
(!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(),
 msg.getTopic())) {
-            return;
-        }
-        String multiDispatchQueue = 
msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
-        if (StringUtils.isBlank(multiDispatchQueue)) {
-            return;
-        }
-        String[] queues = 
multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
-        Long[] queueOffsets = new Long[queues.length];
-        for (int i = 0; i < queues.length; i++) {
-            if (this.messageStore.getMessageStoreConfig().isEnableLmq() && 
MixAll.isLmq(queues[i])) {
-                String key = MultiDispatch.lmqQueueKey(queues[i]);
-                queueOffsets[i] = 
queueOffsetOperator.getLmqTopicQueueNextOffset(key);
-            }
-        }
-        MessageAccessor.putProperty(msg, 
MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET,
-            StringUtils.join(queueOffsets, 
MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
-        msg.removeWaitStorePropertyString();
     }
 
     @Override
     public void increaseQueueOffset(QueueOffsetOperator queueOffsetOperator, 
MessageExtBrokerInner msg, short messageNum) {
         String topicQueueKey = getTopic() + "-" + getQueueId();
         queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
-
-        // Handling the multi dispatch message. In the context of a light 
message queue (as defined in RIP-28),
-        // light message queues are constructed based on message properties, 
which requires special handling of queue offset of the light message queue.
-        if 
(!MultiDispatch.isNeedHandleMultiDispatch(this.messageStore.getMessageStoreConfig(),
 msg.getTopic())) {
-            return;
-        }
-        String multiDispatchQueue = 
msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
-        if (StringUtils.isBlank(multiDispatchQueue)) {
-            return;
-        }
-        String[] queues = 
multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
-        for (int i = 0; i < queues.length; i++) {
-            if (this.messageStore.getMessageStoreConfig().isEnableLmq() && 
MixAll.isLmq(queues[i])) {
-                String key = MultiDispatch.lmqQueueKey(queues[i]);
-                queueOffsetOperator.increaseLmqOffset(key, (short) 1);
-            }
-        }
     }
 
     @Override
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java 
b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
index 87bfe85da2..3748571496 100644
--- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
@@ -44,7 +44,7 @@ public class AppendCallbackTest {
 
     AppendMessageCallback callback;
 
-    MessageExtEncoder batchEncoder = new MessageExtEncoder(10 * 1024 * 1024);
+    MessageExtEncoder batchEncoder;
 
     @Before
     public void init() throws Exception {
@@ -53,12 +53,14 @@ public class AppendCallbackTest {
         messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4);
         messageStoreConfig.setMaxHashSlotNum(100);
         messageStoreConfig.setMaxIndexNum(100 * 10);
+        messageStoreConfig.setMaxMessageSize(10 * 1024 * 1024);
         
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + 
File.separator + "unitteststore");
         
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + 
File.separator + "unitteststore" + File.separator + "commitlog");
         //too much reference
         DefaultMessageStore messageStore = new 
DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new 
ConcurrentHashMap<>());
         CommitLog commitLog = new CommitLog(messageStore);
-        callback = commitLog.new DefaultAppendMessageCallback();
+        callback = commitLog.new 
DefaultAppendMessageCallback(messageStoreConfig);
+        batchEncoder = new MessageExtEncoder(messageStoreConfig);
     }
 
     @After
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java 
b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
index c8ed4d74db..d882fc9d9b 100644
--- a/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendPropCRCTest.java
@@ -56,6 +56,7 @@ public class AppendPropCRCTest {
         messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 4);
         messageStoreConfig.setMaxHashSlotNum(100);
         messageStoreConfig.setMaxIndexNum(100 * 10);
+        messageStoreConfig.setMaxMessageSize(10 * 1024 * 1024);
         
messageStoreConfig.setStorePathRootDir(System.getProperty("java.io.tmpdir") + 
File.separator + "unitteststore");
         
messageStoreConfig.setStorePathCommitLog(System.getProperty("java.io.tmpdir") + 
File.separator + "unitteststore" + File.separator + "commitlog");
         messageStoreConfig.setForceVerifyPropCRC(true);
@@ -63,8 +64,8 @@ public class AppendPropCRCTest {
         //too much reference
         DefaultMessageStore messageStore = new 
DefaultMessageStore(messageStoreConfig, null, null, new BrokerConfig(), new 
ConcurrentHashMap<>());
         commitLog = new CommitLog(messageStore);
-        encoder = new MessageExtEncoder(10 * 1024 * 1024, true);
-        callback = commitLog.new DefaultAppendMessageCallback();
+        encoder = new MessageExtEncoder(messageStoreConfig);
+        callback = commitLog.new 
DefaultAppendMessageCallback(messageStoreConfig);
     }
 
     @After
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java 
b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
index 2447bbf68f..eae5eaa07a 100644
--- a/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/MultiDispatchTest.java
@@ -28,20 +28,19 @@ import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.queue.MultiDispatch;
+import org.apache.rocketmq.store.queue.MultiDispatchUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.rocksdb.RocksDBException;
 
-import static 
org.apache.rocketmq.store.config.StorePathConfigHelper.getStorePathConsumeQueue;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class MultiDispatchTest {
 
-    private ConsumeQueue consumeQueue;
+    private MultiDispatch multiDispatch;
 
     private DefaultMessageStore messageStore;
 
@@ -61,8 +60,7 @@ public class MultiDispatchTest {
         BrokerConfig brokerConfig = new BrokerConfig();
         //too much reference
         messageStore = new DefaultMessageStore(messageStoreConfig, null, null, 
brokerConfig, new ConcurrentHashMap<>());
-        consumeQueue = new ConsumeQueue("xxx", 0,
-            
getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir()), 
messageStoreConfig.getMappedFileSizeConsumeQueue(), messageStore);
+        multiDispatch = new MultiDispatch(messageStore);
     }
 
     @After
@@ -74,14 +72,14 @@ public class MultiDispatchTest {
     public void lmqQueueKey() {
         MessageExtBrokerInner messageExtBrokerInner = 
mock(MessageExtBrokerInner.class);
         when(messageExtBrokerInner.getQueueId()).thenReturn(2);
-        String ret = MultiDispatch.lmqQueueKey("%LMQ%lmq123");
+        String ret = MultiDispatchUtils.lmqQueueKey("%LMQ%lmq123");
         assertEquals(ret, "%LMQ%lmq123-0");
     }
 
     @Test
     public void wrapMultiDispatch() throws RocksDBException {
         MessageExtBrokerInner messageExtBrokerInner = buildMessageMultiQueue();
-        messageStore.assignOffset(messageExtBrokerInner);
+        multiDispatch.wrapMultiDispatch(messageExtBrokerInner);
         
assertEquals(messageExtBrokerInner.getProperty(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET),
 "0,0");
     }
 
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java 
b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
index df3c31c6ed..e113b18f1e 100644
--- a/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/kv/CompactionLogTest.java
@@ -86,7 +86,7 @@ public class CompactionLogTest {
     int compactionCqFileSize = 1024;
 
 
-    private static MessageExtEncoder encoder = new MessageExtEncoder(1024);
+    private static MessageExtEncoder encoder = new MessageExtEncoder(1024, new 
MessageStoreConfig());
     private static SocketAddress storeHost;
     private static SocketAddress bornHost;
 


Reply via email to