This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by 
this push:
     new e952dd6  Finish the produce and consume test for remapped static topic
e952dd6 is described below

commit e952dd67f19a6b849d5d980fd24f9ebcd64fd356
Author: dongeforever <[email protected]>
AuthorDate: Wed Dec 1 16:36:07 2021 +0800

    Finish the produce and consume test for remapped static topic
---
 .../broker/processor/AdminBrokerProcessor.java     |  40 +++----
 .../broker/processor/PullMessageProcessor.java     | 125 +++++++++++++++------
 .../broker/processor/SendMessageProcessor.java     |   4 +-
 .../broker/topic/TopicQueueMappingManager.java     |  35 +++---
 .../client/impl/consumer/PullAPIWrapper.java       |   4 +
 .../common/statictopic/LogicQueueMappingItem.java  |   7 ++
 .../statictopic/TopicQueueMappingContext.java      |  63 ++++-------
 .../statictopic/TopicQueueMappingDetail.java       |  22 ----
 .../common/statictopic/TopicQueueMappingUtils.java |  54 +++++++++
 .../apache/rocketmq/test/smoke/StaticTopicIT.java  |  19 +++-
 10 files changed, 219 insertions(+), 154 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index d9684e7..16bf677 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -109,6 +109,7 @@ import 
org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.common.stats.StatsItem;
 import org.apache.rocketmq.common.stats.StatsSnapshot;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
@@ -621,14 +622,10 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
             if (mappingContext.getMappingDetail() == null) {
                 return null;
             }
-            if (requestHeader.getPhysical() != null && 
requestHeader.getPhysical()) {
-                return null;
-            }
 
             TopicQueueMappingDetail mappingDetail = 
mappingContext.getMappingDetail();
             List<LogicQueueMappingItem> mappingItems = 
mappingContext.getMappingItemList();
-            if (mappingItems == null
-                    || mappingItems.isEmpty()) {
+            if (!mappingContext.isLeader()) {
                 return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exit in request process of current broker %s", 
mappingContext.getTopic(), mappingContext.getGlobalId(), 
mappingDetail.getBname()));
             }
             //TODO should make sure the timestampOfOffset is equal or bigger 
than the searched timestamp
@@ -702,14 +699,10 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
         if (mappingContext.getMappingDetail() == null) {
             return null;
         }
-        if (requestHeader.getPhysical() != null && 
requestHeader.getPhysical()) {
-            return null;
-        }
 
         TopicQueueMappingDetail mappingDetail = 
mappingContext.getMappingDetail();
-        LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
-        if (mappingItem == null
-                || !mappingDetail.getBname().equals(mappingItem.getBname())) {
+        LogicQueueMappingItem mappingItem = mappingContext.getLeaderItem();
+        if (!mappingContext.isLeader()) {
             return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exit in request process of current broker %s", 
mappingContext.getTopic(), mappingContext.getGlobalId(), 
mappingDetail.getBname()));
         }
         long offset = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(),
 mappingItem.getQueueId());
@@ -750,16 +743,15 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
         if (mappingContext.getMappingDetail() == null) {
             return null;
         }
-        if (requestHeader.getPhysical() != null && 
requestHeader.getPhysical()) {
-            return null;
-        }
 
         TopicQueueMappingDetail mappingDetail = 
mappingContext.getMappingDetail();
-        LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
-        if (mappingItem == null) {
+        if (!mappingContext.isLeader()) {
             //this may not
             return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exit in request process of current broker %s", 
mappingContext.getTopic(), mappingContext.getGlobalId(), 
mappingDetail.getBname()));
         };
+
+        LogicQueueMappingItem mappingItem = 
TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(),
 0L, true);
+        assert  mappingItem != null;
         try {
             requestHeader.setBname(mappingItem.getBname());
             requestHeader.setPhysical(true);
@@ -786,7 +778,6 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
             response.setRemark(null);
             return response;
         } catch (Throwable t) {
-            t.printStackTrace();
             log.error("rewriteRequestForStaticTopic failed", t);
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, 
t.getMessage());
         }
@@ -800,7 +791,7 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
             (GetMinOffsetRequestHeader) 
request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
 
 
-        TopicQueueMappingContext mappingContext = 
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
 false, 0L);
+        TopicQueueMappingContext mappingContext = 
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
 false);
         RemotingCommand rewriteResult = 
rewriteRequestForStaticTopic(requestHeader, mappingContext);
         if (rewriteResult != null) {
             return rewriteResult;
@@ -818,19 +809,18 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
         if (mappingContext.getMappingDetail() == null) {
             return null;
         }
-        if (requestHeader.getPhysical() != null && 
requestHeader.getPhysical()) {
-            return null;
-        }
+
         TopicQueueMappingDetail mappingDetail = 
mappingContext.getMappingDetail();
-        LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
-        if (mappingItem == null) {
+        if (!mappingContext.isLeader()) {
             return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exit in request process of current broker %s", 
mappingContext.getTopic(), mappingContext.getGlobalId(), 
mappingDetail.getBname()));
         };
+        LogicQueueMappingItem mappingItem = 
TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(),
 0L, true);
+        assert mappingItem != null;
         try {
             requestHeader.setBname(mappingItem.getBname());
             requestHeader.setPhysical(true);
             RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, 
requestHeader, null);
-            //TODO check if it is leader
+            //TODO check if it is in current broker
             RpcResponse rpcResponse = 
this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, 
this.brokerController.getBrokerConfig().getForwardTimeout()).get();
             if (rpcResponse.getException() != null) {
                 throw rpcResponse.getException();
@@ -855,7 +845,7 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
         final GetEarliestMsgStoretimeRequestHeader requestHeader =
             (GetEarliestMsgStoretimeRequestHeader) 
request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class);
 
-        TopicQueueMappingContext mappingContext = 
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
 false, 0L);
+        TopicQueueMappingContext mappingContext = 
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
 false);
         RemotingCommand rewriteResult = 
rewriteRequestForStaticTopic(requestHeader, mappingContext);
         if (rewriteResult != null) {
             return rewriteResult;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index b98295c..aa6879b 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -54,6 +54,7 @@ import org.apache.rocketmq.common.rpc.RpcResponse;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
@@ -68,7 +69,6 @@ import 
org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.netty.RequestTask;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.GetMessageResult;
-import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.PutMessageResult;
@@ -79,7 +79,6 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import static 
org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
-import static org.apache.rocketmq.remoting.protocol.RemotingCommand.decode;
 
 public class PullMessageProcessor extends AsyncNettyRequestProcessor 
implements NettyRequestProcessor {
     private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -111,15 +110,14 @@ public class PullMessageProcessor extends 
AsyncNettyRequestProcessor implements
             TopicQueueMappingDetail mappingDetail =  
mappingContext.getMappingDetail();
             String topic = mappingContext.getTopic();
             Integer globalId = mappingContext.getGlobalId();
-            Long globalOffset = mappingContext.getGlobalOffset();
-
-            LogicQueueMappingItem mappingItem = 
mappingContext.getMappingItem();
-            if (mappingItem == null) {
+            // if the leader? consider the order consumer, which will lock the 
mq
+            if (!mappingContext.isLeader()) {
                 return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d cannot find mapping item in request process of current 
broker %s", topic, globalId, mappingDetail.getBname()));
             }
+            Long globalOffset = requestHeader.getQueueOffset();
 
-            //TODO Check if the leader? consider the order consumer, which 
will lock the mq
-            //
+            LogicQueueMappingItem mappingItem = 
TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(),
 globalOffset, true);
+            mappingContext.setCurrentItem(mappingItem);
 
             if (globalOffset < mappingItem.getLogicOffset()) {
                 //handleOffsetMoved
@@ -174,68 +172,122 @@ public class PullMessageProcessor extends 
AsyncNettyRequestProcessor implements
     private RemotingCommand 
rewriteResponseForStaticTopic(PullMessageRequestHeader requestHeader, 
PullMessageResponseHeader responseHeader,
                                                           
TopicQueueMappingContext mappingContext, final int code) {
         try {
-            if (mappingContext == null) {
+            if (mappingContext.getMappingDetail() == null) {
                 return null;
             }
             TopicQueueMappingDetail mappingDetail = 
mappingContext.getMappingDetail();
-            LogicQueueMappingItem mappingItem = 
mappingContext.getMappingItem();
+            LogicQueueMappingItem leaderItem = mappingContext.getLeaderItem();
+
+            LogicQueueMappingItem currentItem = 
mappingContext.getCurrentItem();
+
+            LogicQueueMappingItem earlistItem = 
TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(),
 0L, true);
+
+            assert currentItem.getLogicOffset() >= 0;
 
             long requestOffset = requestHeader.getQueueOffset();
             long nextBeginOffset = responseHeader.getNextBeginOffset();
             long minOffset = responseHeader.getMinOffset();
             long maxOffset = responseHeader.getMaxOffset();
             int responseCode = code;
-            if (responseCode != ResponseCode.SUCCESS
-                    && responseCode != ResponseCode.PULL_RETRY_IMMEDIATELY) {
-                if (mappingContext.isLeader()) {
+            //consider the following situations
+            // 1. read from slave, currently not supported
+            // 2. the middle queue is truncated because of deleting commitlog
+            if (code != ResponseCode.SUCCESS) {
+                //note the currentItem maybe both the leader and  the earliest
+                if (leaderItem.getGen() == currentItem.getGen()) {
+                    //read the leader
+                    if (requestOffset > maxOffset) {
+                        //actually, we need do nothing, but keep the code 
structure here
+                        if (code == ResponseCode.PULL_OFFSET_MOVED) {
+                            responseCode = ResponseCode.PULL_OFFSET_MOVED;
+                        } else {
+                            //maybe current broker is the slave
+                            responseCode = code;
+                        }
+                    } else  if (requestOffset < minOffset) {
+                        nextBeginOffset = minOffset;
+                        responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
+                    } else {
+                        responseCode = code;
+                    }
+                }
+                //note the currentItem maybe both the leader and  the earliest
+                if (earlistItem.getGen() == currentItem.getGen()) {
+                    //read the earliest one
                     if (requestOffset < minOffset) {
+                        if (code == ResponseCode.PULL_OFFSET_MOVED) {
+                            responseCode = ResponseCode.PULL_OFFSET_MOVED;
+                        } else {
+                            //maybe read from slave, but we still set it to 
moved
+                            responseCode = ResponseCode.PULL_OFFSET_MOVED;
+                        }
                         nextBeginOffset = minOffset;
-                        responseCode = ResponseCode.PULL_NOT_FOUND;
-                    } else if (requestOffset > maxOffset) {
-                        responseCode = ResponseCode.PULL_OFFSET_MOVED;
-                    } else if (requestOffset == maxOffset) {
-                        responseCode = ResponseCode.PULL_NOT_FOUND;
+                    } else if (requestOffset >= maxOffset) {
+                        //just move to another item
+                        LogicQueueMappingItem nextItem = 
TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), 
currentItem, true);
+                        if (nextItem != null) {
+                            currentItem = nextItem;
+                            nextBeginOffset = currentItem.getStartOffset();
+                            minOffset = currentItem.getStartOffset();
+                            maxOffset = minOffset;
+                            responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
+                        } else {
+                            //maybe the next one's logic offset is -1
+                            responseCode = ResponseCode.PULL_NOT_FOUND;
+                        }
                     } else {
                         //let it go
+                        responseCode = code;
                     }
-                } else {
+                }
+
+                //read from the middle item, ignore the PULL_OFFSET_MOVED
+                if (leaderItem.getGen() != currentItem.getGen()
+                    && earlistItem.getGen() != currentItem.getGen()) {
                     if (requestOffset < minOffset) {
                         nextBeginOffset = minOffset;
-                        responseCode = ResponseCode.PULL_NOT_FOUND;
+                        responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
                     } else if (requestOffset >= maxOffset) {
-                        responseCode = ResponseCode.PULL_NOT_FOUND;
                         //just move to another item
-                        mappingItem = mappingContext.findNext();
-                        assert  mappingItem != null;
-                        nextBeginOffset = mappingItem.getStartOffset();
-                        minOffset = mappingItem.getStartOffset();
-                        maxOffset = minOffset;
+                        LogicQueueMappingItem nextItem = 
TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), 
currentItem, true);
+                        if (nextItem != null) {
+                            currentItem = nextItem;
+                            nextBeginOffset = currentItem.getStartOffset();
+                            minOffset = currentItem.getStartOffset();
+                            maxOffset = minOffset;
+                            responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
+                        } else {
+                            //maybe the next one's logic offset is -1
+                            responseCode = ResponseCode.PULL_NOT_FOUND;
+                        }
+                    } else {
+                        responseCode = code;
                     }
                 }
             }
 
             //handle nextBeginOffset
             //the next begin offset should no more than the end offset
-            if (mappingItem.checkIfEndOffsetDecided()
-                    && nextBeginOffset >= mappingItem.getEndOffset()) {
-                nextBeginOffset = mappingItem.getEndOffset();
+            if (currentItem.checkIfEndOffsetDecided()
+                    && nextBeginOffset >= currentItem.getEndOffset()) {
+                nextBeginOffset = currentItem.getEndOffset();
             }
-            
responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset));
+            
responseHeader.setNextBeginOffset(currentItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset));
             //handle min offset
-            
responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(),
 minOffset)));
+            
responseHeader.setMinOffset(currentItem.computeStaticQueueOffsetUpToEnd(Math.max(currentItem.getStartOffset(),
 minOffset)));
             //handle max offset
-            
responseHeader.setMaxOffset(Math.max(mappingItem.computeStaticQueueOffsetUpToEnd(maxOffset),
+            
responseHeader.setMaxOffset(Math.max(currentItem.computeStaticQueueOffsetUpToEnd(maxOffset),
                     
TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, 
mappingContext.getGlobalId())));
             //set the offsetDelta
-            responseHeader.setOffsetDelta(mappingItem.computeOffsetDelta());
+            responseHeader.setOffsetDelta(currentItem.computeOffsetDelta());
 
-            if (code != ResponseCode.SUCCESS
-                && code != ResponseCode.PULL_RETRY_IMMEDIATELY) {
+            if (code != ResponseCode.SUCCESS) {
                 return 
RemotingCommand.createResponseCommandWithHeader(responseCode, responseHeader);
             } else {
                 return null;
             }
         } catch (Throwable t) {
+            t.printStackTrace();
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, 
t.getMessage());
         }
     }
@@ -292,7 +344,8 @@ public class PullMessageProcessor extends 
AsyncNettyRequestProcessor implements
             return response;
         }
 
-        TopicQueueMappingContext mappingContext = 
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
 false, requestHeader.getQueueOffset());
+
+        TopicQueueMappingContext mappingContext = 
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
 false);
 
         {
             RemotingCommand rewriteResult = 
rewriteRequestForStaticTopic(requestHeader, mappingContext);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index d254b8a..03a5dba 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -101,7 +101,7 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
                 if (requestHeader == null) {
                     return CompletableFuture.completedFuture(null);
                 }
-                TopicQueueMappingContext mappingContext = 
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
 true, Long.MAX_VALUE);
+                TopicQueueMappingContext mappingContext = 
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
 true);
                 RemotingCommand rewriteResult =  
this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader,
 mappingContext);
                 if (rewriteResult != null) {
                     return CompletableFuture.completedFuture(rewriteResult);
@@ -130,7 +130,7 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
             }
             TopicQueueMappingDetail mappingDetail = 
mappingContext.getMappingDetail();
 
-            LogicQueueMappingItem mappingItem = 
mappingContext.getMappingItem();
+            LogicQueueMappingItem mappingItem = mappingContext.getLeaderItem();
             if (mappingItem == null) {
                 return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exit in request process of current broker %s", 
mappingContext.getTopic(), mappingContext.getGlobalId(), 
mappingDetail.getBname()));
             }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 5fa3a31..1dd9cbf 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.broker.topic;
 
 import com.alibaba.fastjson.JSON;
-import com.google.common.collect.ImmutableList;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
@@ -164,24 +163,24 @@ public class TopicQueueMappingManager extends 
ConfigManager {
     }
 
     public TopicQueueMappingContext 
buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader) {
-        return buildTopicQueueMappingContext(requestHeader, false, 
Long.MAX_VALUE);
+        return buildTopicQueueMappingContext(requestHeader, false);
     }
 
     //Do not return a null context
-    public TopicQueueMappingContext 
buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean 
selectOneWhenMiss,  Long globalOffset) {
+    public TopicQueueMappingContext 
buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean 
selectOneWhenMiss) {
         if (requestHeader.getPhysical() != null
                 && Boolean.TRUE.equals(requestHeader.getPhysical())) {
-            return new TopicQueueMappingContext(requestHeader.getTopic(), 
requestHeader.getQueueId(), null, null, null, null);
+            return new TopicQueueMappingContext(requestHeader.getTopic(), 
requestHeader.getQueueId(), null, null, null);
         }
         TopicQueueMappingDetail mappingDetail = 
getTopicQueueMapping(requestHeader.getTopic());
         if (mappingDetail == null) {
             //it is not static topic
-            return new TopicQueueMappingContext(requestHeader.getTopic(), 
requestHeader.getQueueId(), null, null, null, null);
+            return new TopicQueueMappingContext(requestHeader.getTopic(), 
requestHeader.getQueueId(), null, null, null);
         }
         //If not find mappingItem, it encounters some errors
         Integer globalId = requestHeader.getQueueId();
         if (globalId < 0 && !selectOneWhenMiss) {
-            return new TopicQueueMappingContext(requestHeader.getTopic(), 
globalId, globalOffset, mappingDetail, null, null);
+            return new TopicQueueMappingContext(requestHeader.getTopic(), 
globalId, mappingDetail, null, null);
         }
 
         if (globalId < 0) {
@@ -194,24 +193,17 @@ public class TopicQueueMappingManager extends 
ConfigManager {
             }
         }
         if (globalId < 0) {
-            return new TopicQueueMappingContext(requestHeader.getTopic(), 
globalId, globalOffset, mappingDetail, null, null);
+            return new TopicQueueMappingContext(requestHeader.getTopic(), 
globalId,  mappingDetail, null, null);
         }
 
         List<LogicQueueMappingItem> mappingItemList = null;
-        LogicQueueMappingItem mappingItem = null;
-
-        if (globalOffset == null
-                || Long.MAX_VALUE == globalOffset) {
-            mappingItemList = 
TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
-            if (mappingItemList != null
+        LogicQueueMappingItem leaderItem = null;
+        mappingItemList = 
TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
+        if (mappingItemList != null
                 && mappingItemList.size() > 0) {
-                mappingItem = mappingItemList.get(mappingItemList.size() - 1);
-            }
-        } else {
-            mappingItemList = 
TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
-            mappingItem = 
TopicQueueMappingDetail.findLogicQueueMappingItem(mappingItemList, 
globalOffset);
+            leaderItem = mappingItemList.get(mappingItemList.size() - 1);
         }
-        return new TopicQueueMappingContext(requestHeader.getTopic(), 
globalId, globalOffset, mappingDetail, mappingItemList, mappingItem);
+        return new TopicQueueMappingContext(requestHeader.getTopic(), 
globalId, mappingDetail, mappingItemList, leaderItem);
     }
 
 
@@ -221,11 +213,10 @@ public class TopicQueueMappingManager extends 
ConfigManager {
                 return null;
             }
             TopicQueueMappingDetail mappingDetail = 
mappingContext.getMappingDetail();
-            LogicQueueMappingItem mappingItem = 
mappingContext.getMappingItem();
-            if (mappingItem == null
-                    || 
!mappingDetail.getBname().equals(mappingItem.getBname())) {
+            if (!mappingContext.isLeader()) {
                 return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exit in request process of current broker %s", 
requestHeader.getTopic(), requestHeader.getQueueId(), 
mappingDetail.getBname()));
             }
+            LogicQueueMappingItem mappingItem = mappingContext.getLeaderItem();
             requestHeader.setQueueId(mappingItem.getQueueId());
             return null;
         } catch (Throwable t) {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index da9c8b3..273add4 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -23,6 +23,8 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
+
+import com.alibaba.fastjson.JSON;
 import org.apache.rocketmq.client.consumer.PopCallback;
 import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
 import org.apache.rocketmq.client.consumer.PullCallback;
@@ -170,6 +172,7 @@ public class PullAPIWrapper {
                     this.recalculatePullFromWhichNode(mq), false);
         }
 
+
         if (findBrokerResult != null) {
             {
                 // check version
@@ -203,6 +206,7 @@ public class PullAPIWrapper {
                 brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), 
brokerAddr);
             }
 
+
             PullResult pullResult = 
this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                 brokerAddr,
                 requestHeader,
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
index 959207e..9f79e9d 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
@@ -32,6 +32,10 @@ public class LogicQueueMappingItem extends 
RemotingSerializable {
     }
 
     public long computeStaticQueueOffsetUpToEnd(long physicalQueueOffset) {
+        //consider the newly mapped item
+        if (logicOffset < 0) {
+            return -1;
+        }
         if (physicalQueueOffset < startOffset) {
             return logicOffset;
         }
@@ -43,6 +47,9 @@ public class LogicQueueMappingItem extends 
RemotingSerializable {
     }
 
     public long computeStaticQueueOffset(long physicalQueueOffset) {
+        if (logicOffset < 0) {
+            return logicOffset;
+        }
         if (physicalQueueOffset < startOffset) {
             return logicOffset;
         }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
index 4a788ab..f705c43 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
@@ -23,18 +23,18 @@ import java.util.List;
 public class TopicQueueMappingContext  {
     private String topic;
     private Integer globalId;
-    private Long globalOffset;
     private TopicQueueMappingDetail mappingDetail;
     private List<LogicQueueMappingItem> mappingItemList;
-    private LogicQueueMappingItem mappingItem;
+    private LogicQueueMappingItem leaderItem;
 
-    public TopicQueueMappingContext(String topic, Integer globalId, Long 
globalOffset, TopicQueueMappingDetail mappingDetail, 
List<LogicQueueMappingItem> mappingItemList, LogicQueueMappingItem mappingItem) 
{
+    private LogicQueueMappingItem currentItem;
+
+    public TopicQueueMappingContext(String topic, Integer globalId, 
TopicQueueMappingDetail mappingDetail, List<LogicQueueMappingItem> 
mappingItemList, LogicQueueMappingItem leaderItem) {
         this.topic = topic;
         this.globalId = globalId;
-        this.globalOffset = globalOffset;
         this.mappingDetail = mappingDetail;
         this.mappingItemList = mappingItemList;
-        this.mappingItem = mappingItem;
+        this.leaderItem = leaderItem;
 
     }
 
@@ -45,33 +45,7 @@ public class TopicQueueMappingContext  {
     }
 
     public boolean isLeader() {
-        if (mappingDetail == null
-                || mappingItemList == null
-                || mappingItemList.isEmpty()) {
-            return false;
-        }
-        LogicQueueMappingItem mappingItem = 
mappingItemList.get(mappingItemList.size() - 1);
-        return mappingItem.getBname().equals(mappingDetail.getBname());
-    }
-
-    public LogicQueueMappingItem findNext() {
-        if (mappingDetail == null
-                || mappingItem == null
-                || mappingItemList == null
-                || mappingItemList.isEmpty()) {
-            return null;
-        }
-        for (int i = 0; i < mappingItemList.size(); i++) {
-            LogicQueueMappingItem item = mappingItemList.get(i);
-            if (item.getGen() == mappingItem.getGen()) {
-                if (i < mappingItemList.size() - 1) {
-                    return mappingItemList.get(i + 1);
-                } else {
-                    return null;
-                }
-            }
-        }
-        return null;
+        return leaderItem != null && 
leaderItem.getBname().equals(mappingDetail.getBname());
     }
 
 
@@ -91,13 +65,6 @@ public class TopicQueueMappingContext  {
         this.globalId = globalId;
     }
 
-    public Long getGlobalOffset() {
-        return globalOffset;
-    }
-
-    public void setGlobalOffset(Long globalOffset) {
-        this.globalOffset = globalOffset;
-    }
 
     public TopicQueueMappingDetail getMappingDetail() {
         return mappingDetail;
@@ -115,13 +82,23 @@ public class TopicQueueMappingContext  {
         this.mappingItemList = mappingItemList;
     }
 
-    public LogicQueueMappingItem getMappingItem() {
-        return mappingItem;
+    public LogicQueueMappingItem getLeaderItem() {
+        return leaderItem;
+    }
+
+    public void setLeaderItem(LogicQueueMappingItem leaderItem) {
+        this.leaderItem = leaderItem;
     }
 
-    public void setMappingItem(LogicQueueMappingItem mappingItem) {
-        this.mappingItem = mappingItem;
+    public LogicQueueMappingItem getCurrentItem() {
+        return currentItem;
     }
 
+    public void setCurrentItem(LogicQueueMappingItem currentItem) {
+        this.currentItem = currentItem;
+    }
 
+    public void setMappingItemList(List<LogicQueueMappingItem> 
mappingItemList) {
+        this.mappingItemList = mappingItemList;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
index 0659572..1749b8e 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
@@ -77,28 +77,6 @@ public class TopicQueueMappingDetail extends 
TopicQueueMappingInfo {
     }
 
 
-    public static LogicQueueMappingItem 
findLogicQueueMappingItem(List<LogicQueueMappingItem> mappingItems, long 
logicOffset) {
-        if (mappingItems == null
-                || mappingItems.isEmpty()) {
-            return null;
-        }
-        //Could use bi-search to polish performance
-        for (int i = mappingItems.size() - 1; i >= 0; i--) {
-            LogicQueueMappingItem item =  mappingItems.get(i);
-            if (item.getLogicOffset() >= 0
-                    && logicOffset >= item.getLogicOffset()) {
-                return item;
-            }
-        }
-        //if not found, maybe out of range, return the first one
-        for (int i = 0; i < mappingItems.size(); i++) {
-            if (!mappingItems.get(i).checkIfShouldDeleted()) {
-                return mappingItems.get(i);
-            }
-        }
-        return null;
-    }
-
     public static long computeMaxOffsetFromMapping(TopicQueueMappingDetail 
mappingDetail, Integer globalId) {
         List<LogicQueueMappingItem> mappingItems = 
getMappingInfo(mappingDetail, globalId);
         if (mappingItems == null
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 5527974..975a5ba 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -618,4 +618,58 @@ public class TopicQueueMappingUtils {
         return new TopicRemappingDetailWrapper(topic, 
TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, brokerConfigMap, 
brokersToMapIn, brokersToMapOut);
     }
 
+    public static LogicQueueMappingItem 
findLogicQueueMappingItem(List<LogicQueueMappingItem> mappingItems, long 
logicOffset, boolean ignoreNegative) {
+        if (mappingItems == null
+                || mappingItems.isEmpty()) {
+            return null;
+        }
+        //Could use bi-search to polish performance
+        for (int i = mappingItems.size() - 1; i >= 0; i--) {
+            LogicQueueMappingItem item =  mappingItems.get(i);
+            if (ignoreNegative && item.getLogicOffset() < 0) {
+                continue;
+            }
+            if (logicOffset >= item.getLogicOffset()) {
+                return item;
+            }
+        }
+        //if not found, maybe out of range, return the first one
+        for (int i = 0; i < mappingItems.size(); i++) {
+            LogicQueueMappingItem item =  mappingItems.get(i);
+            if (ignoreNegative && item.getLogicOffset() < 0) {
+                continue;
+            }
+            if (!item.checkIfShouldDeleted()) {
+                return mappingItems.get(i);
+            }
+        }
+        return null;
+    }
+
+    public static LogicQueueMappingItem findNext(List<LogicQueueMappingItem> 
items, LogicQueueMappingItem currentItem, boolean ignoreNegative) {
+        if (items == null
+            || currentItem == null) {
+            return null;
+        }
+        for (int i = 0; i < items.size(); i++) {
+            LogicQueueMappingItem item = items.get(i);
+            if (ignoreNegative && item.getLogicOffset() < 0) {
+                continue;
+            }
+            if (item.getGen() == currentItem.getGen()) {
+                if (i < items.size() - 1) {
+                    item = items.get(i  + 1);
+                    if (ignoreNegative && item.getLogicOffset() < 0) {
+                        return null;
+                    } else {
+                        return item;
+                    }
+                } else {
+                    return null;
+                }
+            }
+        }
+        return null;
+    }
+
 }
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java 
b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
index 03a92e8..abe1eee 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -48,6 +48,7 @@ public class StaticTopicIT extends BaseConf {
 
     @Before
     public void setUp() throws Exception {
+        System.setProperty("rocketmq.client.rebalance.waitInterval", "500");
         defaultMQAdminExt = getAdmin(nsAddr);
         waitBrokerRegistered(nsAddr, clusterName);
         clientMetadata = new ClientMetadata();
@@ -173,7 +174,6 @@ public class StaticTopicIT extends BaseConf {
         String topic = "static" + MQRandomUtils.getRandomTopic();
         RMQNormalProducer producer = getProducer(nsAddr, topic);
         RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new 
RMQNormalListener());
-        producer.getProducer().setPollNameServerInterval(100);
 
         int queueNum = 10;
         int msgEachQueue = 100;
@@ -183,6 +183,9 @@ public class StaticTopicIT extends BaseConf {
             targetBrokers.add(broker1Name);
             createStaticTopic(topic, queueNum, targetBrokers);
         }
+        //System.out.printf("%s %s\n", broker1Name, 
clientMetadata.findMasterBrokerAddr(broker1Name));
+        //System.out.printf("%s %s\n", broker2Name, 
clientMetadata.findMasterBrokerAddr(broker2Name));
+
         //produce the messages
         {
             List<MessageQueue> messageQueueList = producer.getMessageQueue();
@@ -203,6 +206,11 @@ public class StaticTopicIT extends BaseConf {
             }
         }
 
+        consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 
3000);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                consumer.getListener().getAllMsgBody()))
+                .containsExactlyElementsIn(producer.getAllMsgBody());
+
         //remapping the static topic
         {
             Set<String> targetBrokers = new HashSet<>();
@@ -242,8 +250,7 @@ public class StaticTopicIT extends BaseConf {
             }
         }
         {
-            
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
-            System.out.println("Consume: " + 
consumer.getListener().getAllMsgBody().size());
+            
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 30000);
             assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
                     consumer.getListener().getAllMsgBody()))
                     .containsExactlyElementsIn(producer.getAllMsgBody());
@@ -258,7 +265,7 @@ public class StaticTopicIT extends BaseConf {
             Assert.assertEquals(queueNum, messagesByQueue.size());
             for (int i = 0; i < queueNum; i++) {
                 List<MessageExt> messageExts = messagesByQueue.get(i);
-                Assert.assertEquals(msgEachQueue, messageExts.size());
+                Assert.assertEquals(msgEachQueue * 2, messageExts.size());
                 Collections.sort(messageExts, new Comparator<MessageExt>() {
                     @Override
                     public int compare(MessageExt o1, MessageExt o2) {
@@ -268,12 +275,16 @@ public class StaticTopicIT extends BaseConf {
                 for (int j = 0; j < msgEachQueue; j++) {
                     Assert.assertEquals(j, 
messageExts.get(j).getQueueOffset());
                 }
+                for (int j = msgEachQueue; j < msgEachQueue * 2; j++) {
+                    Assert.assertEquals(j + 
TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE - msgEachQueue, 
messageExts.get(j).getQueueOffset());
+                }
             }
         }
     }
 
     @After
     public void tearDown() {
+        System.setProperty("rocketmq.client.rebalance.waitInterval", "20000");
         super.shutdown();
     }
 

Reply via email to