This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new e952dd6 Finish the produce and consume test for remapped static topic
e952dd6 is described below
commit e952dd67f19a6b849d5d980fd24f9ebcd64fd356
Author: dongeforever <[email protected]>
AuthorDate: Wed Dec 1 16:36:07 2021 +0800
Finish the produce and consume test for remapped static topic
---
.../broker/processor/AdminBrokerProcessor.java | 40 +++----
.../broker/processor/PullMessageProcessor.java | 125 +++++++++++++++------
.../broker/processor/SendMessageProcessor.java | 4 +-
.../broker/topic/TopicQueueMappingManager.java | 35 +++---
.../client/impl/consumer/PullAPIWrapper.java | 4 +
.../common/statictopic/LogicQueueMappingItem.java | 7 ++
.../statictopic/TopicQueueMappingContext.java | 63 ++++-------
.../statictopic/TopicQueueMappingDetail.java | 22 ----
.../common/statictopic/TopicQueueMappingUtils.java | 54 +++++++++
.../apache/rocketmq/test/smoke/StaticTopicIT.java | 19 +++-
10 files changed, 219 insertions(+), 154 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index d9684e7..16bf677 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -109,6 +109,7 @@ import
org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.stats.StatsItem;
import org.apache.rocketmq.common.stats.StatsSnapshot;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
@@ -621,14 +622,10 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
if (mappingContext.getMappingDetail() == null) {
return null;
}
- if (requestHeader.getPhysical() != null &&
requestHeader.getPhysical()) {
- return null;
- }
TopicQueueMappingDetail mappingDetail =
mappingContext.getMappingDetail();
List<LogicQueueMappingItem> mappingItems =
mappingContext.getMappingItemList();
- if (mappingItems == null
- || mappingItems.isEmpty()) {
+ if (!mappingContext.isLeader()) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exit in request process of current broker %s",
mappingContext.getTopic(), mappingContext.getGlobalId(),
mappingDetail.getBname()));
}
//TODO should make sure the timestampOfOffset is equal or bigger
than the searched timestamp
@@ -702,14 +699,10 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
if (mappingContext.getMappingDetail() == null) {
return null;
}
- if (requestHeader.getPhysical() != null &&
requestHeader.getPhysical()) {
- return null;
- }
TopicQueueMappingDetail mappingDetail =
mappingContext.getMappingDetail();
- LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
- if (mappingItem == null
- || !mappingDetail.getBname().equals(mappingItem.getBname())) {
+ LogicQueueMappingItem mappingItem = mappingContext.getLeaderItem();
+ if (!mappingContext.isLeader()) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exit in request process of current broker %s",
mappingContext.getTopic(), mappingContext.getGlobalId(),
mappingDetail.getBname()));
}
long offset =
this.brokerController.getMessageStore().getMaxOffsetInQueue(mappingContext.getTopic(),
mappingItem.getQueueId());
@@ -750,16 +743,15 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
if (mappingContext.getMappingDetail() == null) {
return null;
}
- if (requestHeader.getPhysical() != null &&
requestHeader.getPhysical()) {
- return null;
- }
TopicQueueMappingDetail mappingDetail =
mappingContext.getMappingDetail();
- LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
- if (mappingItem == null) {
+ if (!mappingContext.isLeader()) {
//this may not
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exit in request process of current broker %s",
mappingContext.getTopic(), mappingContext.getGlobalId(),
mappingDetail.getBname()));
};
+
+ LogicQueueMappingItem mappingItem =
TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(),
0L, true);
+ assert mappingItem != null;
try {
requestHeader.setBname(mappingItem.getBname());
requestHeader.setPhysical(true);
@@ -786,7 +778,6 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
response.setRemark(null);
return response;
} catch (Throwable t) {
- t.printStackTrace();
log.error("rewriteRequestForStaticTopic failed", t);
return buildErrorResponse(ResponseCode.SYSTEM_ERROR,
t.getMessage());
}
@@ -800,7 +791,7 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
(GetMinOffsetRequestHeader)
request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
- TopicQueueMappingContext mappingContext =
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
false, 0L);
+ TopicQueueMappingContext mappingContext =
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
false);
RemotingCommand rewriteResult =
rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
@@ -818,19 +809,18 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
if (mappingContext.getMappingDetail() == null) {
return null;
}
- if (requestHeader.getPhysical() != null &&
requestHeader.getPhysical()) {
- return null;
- }
+
TopicQueueMappingDetail mappingDetail =
mappingContext.getMappingDetail();
- LogicQueueMappingItem mappingItem = mappingContext.getMappingItem();
- if (mappingItem == null) {
+ if (!mappingContext.isLeader()) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exit in request process of current broker %s",
mappingContext.getTopic(), mappingContext.getGlobalId(),
mappingDetail.getBname()));
};
+ LogicQueueMappingItem mappingItem =
TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(),
0L, true);
+ assert mappingItem != null;
try {
requestHeader.setBname(mappingItem.getBname());
requestHeader.setPhysical(true);
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET,
requestHeader, null);
- //TODO check if it is leader
+ //TODO check if it is in current broker
RpcResponse rpcResponse =
this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest,
this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
@@ -855,7 +845,7 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
final GetEarliestMsgStoretimeRequestHeader requestHeader =
(GetEarliestMsgStoretimeRequestHeader)
request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class);
- TopicQueueMappingContext mappingContext =
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
false, 0L);
+ TopicQueueMappingContext mappingContext =
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
false);
RemotingCommand rewriteResult =
rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index b98295c..aa6879b 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -54,6 +54,7 @@ import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
@@ -68,7 +69,6 @@ import
org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult;
-import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.PutMessageResult;
@@ -79,7 +79,6 @@ import java.nio.ByteBuffer;
import java.util.List;
import static
org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
-import static org.apache.rocketmq.remoting.protocol.RemotingCommand.decode;
public class PullMessageProcessor extends AsyncNettyRequestProcessor
implements NettyRequestProcessor {
private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -111,15 +110,14 @@ public class PullMessageProcessor extends
AsyncNettyRequestProcessor implements
TopicQueueMappingDetail mappingDetail =
mappingContext.getMappingDetail();
String topic = mappingContext.getTopic();
Integer globalId = mappingContext.getGlobalId();
- Long globalOffset = mappingContext.getGlobalOffset();
-
- LogicQueueMappingItem mappingItem =
mappingContext.getMappingItem();
- if (mappingItem == null) {
+ // if the leader? consider the order consumer, which will lock the
mq
+ if (!mappingContext.isLeader()) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d cannot find mapping item in request process of current
broker %s", topic, globalId, mappingDetail.getBname()));
}
+ Long globalOffset = requestHeader.getQueueOffset();
- //TODO Check if the leader? consider the order consumer, which
will lock the mq
- //
+ LogicQueueMappingItem mappingItem =
TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(),
globalOffset, true);
+ mappingContext.setCurrentItem(mappingItem);
if (globalOffset < mappingItem.getLogicOffset()) {
//handleOffsetMoved
@@ -174,68 +172,122 @@ public class PullMessageProcessor extends
AsyncNettyRequestProcessor implements
private RemotingCommand
rewriteResponseForStaticTopic(PullMessageRequestHeader requestHeader,
PullMessageResponseHeader responseHeader,
TopicQueueMappingContext mappingContext, final int code) {
try {
- if (mappingContext == null) {
+ if (mappingContext.getMappingDetail() == null) {
return null;
}
TopicQueueMappingDetail mappingDetail =
mappingContext.getMappingDetail();
- LogicQueueMappingItem mappingItem =
mappingContext.getMappingItem();
+ LogicQueueMappingItem leaderItem = mappingContext.getLeaderItem();
+
+ LogicQueueMappingItem currentItem =
mappingContext.getCurrentItem();
+
+ LogicQueueMappingItem earlistItem =
TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(),
0L, true);
+
+ assert currentItem.getLogicOffset() >= 0;
long requestOffset = requestHeader.getQueueOffset();
long nextBeginOffset = responseHeader.getNextBeginOffset();
long minOffset = responseHeader.getMinOffset();
long maxOffset = responseHeader.getMaxOffset();
int responseCode = code;
- if (responseCode != ResponseCode.SUCCESS
- && responseCode != ResponseCode.PULL_RETRY_IMMEDIATELY) {
- if (mappingContext.isLeader()) {
+ //consider the following situations
+ // 1. read from slave, currently not supported
+ // 2. the middle queue is truncated because of deleting commitlog
+ if (code != ResponseCode.SUCCESS) {
+ //note the currentItem maybe both the leader and the earliest
+ if (leaderItem.getGen() == currentItem.getGen()) {
+ //read the leader
+ if (requestOffset > maxOffset) {
+ //actually, we need do nothing, but keep the code
structure here
+ if (code == ResponseCode.PULL_OFFSET_MOVED) {
+ responseCode = ResponseCode.PULL_OFFSET_MOVED;
+ } else {
+ //maybe current broker is the slave
+ responseCode = code;
+ }
+ } else if (requestOffset < minOffset) {
+ nextBeginOffset = minOffset;
+ responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
+ } else {
+ responseCode = code;
+ }
+ }
+ //note the currentItem maybe both the leader and the earliest
+ if (earlistItem.getGen() == currentItem.getGen()) {
+ //read the earliest one
if (requestOffset < minOffset) {
+ if (code == ResponseCode.PULL_OFFSET_MOVED) {
+ responseCode = ResponseCode.PULL_OFFSET_MOVED;
+ } else {
+ //maybe read from slave, but we still set it to
moved
+ responseCode = ResponseCode.PULL_OFFSET_MOVED;
+ }
nextBeginOffset = minOffset;
- responseCode = ResponseCode.PULL_NOT_FOUND;
- } else if (requestOffset > maxOffset) {
- responseCode = ResponseCode.PULL_OFFSET_MOVED;
- } else if (requestOffset == maxOffset) {
- responseCode = ResponseCode.PULL_NOT_FOUND;
+ } else if (requestOffset >= maxOffset) {
+ //just move to another item
+ LogicQueueMappingItem nextItem =
TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(),
currentItem, true);
+ if (nextItem != null) {
+ currentItem = nextItem;
+ nextBeginOffset = currentItem.getStartOffset();
+ minOffset = currentItem.getStartOffset();
+ maxOffset = minOffset;
+ responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
+ } else {
+ //maybe the next one's logic offset is -1
+ responseCode = ResponseCode.PULL_NOT_FOUND;
+ }
} else {
//let it go
+ responseCode = code;
}
- } else {
+ }
+
+ //read from the middle item, ignore the PULL_OFFSET_MOVED
+ if (leaderItem.getGen() != currentItem.getGen()
+ && earlistItem.getGen() != currentItem.getGen()) {
if (requestOffset < minOffset) {
nextBeginOffset = minOffset;
- responseCode = ResponseCode.PULL_NOT_FOUND;
+ responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
} else if (requestOffset >= maxOffset) {
- responseCode = ResponseCode.PULL_NOT_FOUND;
//just move to another item
- mappingItem = mappingContext.findNext();
- assert mappingItem != null;
- nextBeginOffset = mappingItem.getStartOffset();
- minOffset = mappingItem.getStartOffset();
- maxOffset = minOffset;
+ LogicQueueMappingItem nextItem =
TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(),
currentItem, true);
+ if (nextItem != null) {
+ currentItem = nextItem;
+ nextBeginOffset = currentItem.getStartOffset();
+ minOffset = currentItem.getStartOffset();
+ maxOffset = minOffset;
+ responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
+ } else {
+ //maybe the next one's logic offset is -1
+ responseCode = ResponseCode.PULL_NOT_FOUND;
+ }
+ } else {
+ responseCode = code;
}
}
}
//handle nextBeginOffset
//the next begin offset should no more than the end offset
- if (mappingItem.checkIfEndOffsetDecided()
- && nextBeginOffset >= mappingItem.getEndOffset()) {
- nextBeginOffset = mappingItem.getEndOffset();
+ if (currentItem.checkIfEndOffsetDecided()
+ && nextBeginOffset >= currentItem.getEndOffset()) {
+ nextBeginOffset = currentItem.getEndOffset();
}
-
responseHeader.setNextBeginOffset(mappingItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset));
+
responseHeader.setNextBeginOffset(currentItem.computeStaticQueueOffsetUpToEnd(nextBeginOffset));
//handle min offset
-
responseHeader.setMinOffset(mappingItem.computeStaticQueueOffsetUpToEnd(Math.max(mappingItem.getStartOffset(),
minOffset)));
+
responseHeader.setMinOffset(currentItem.computeStaticQueueOffsetUpToEnd(Math.max(currentItem.getStartOffset(),
minOffset)));
//handle max offset
-
responseHeader.setMaxOffset(Math.max(mappingItem.computeStaticQueueOffsetUpToEnd(maxOffset),
+
responseHeader.setMaxOffset(Math.max(currentItem.computeStaticQueueOffsetUpToEnd(maxOffset),
TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail,
mappingContext.getGlobalId())));
//set the offsetDelta
- responseHeader.setOffsetDelta(mappingItem.computeOffsetDelta());
+ responseHeader.setOffsetDelta(currentItem.computeOffsetDelta());
- if (code != ResponseCode.SUCCESS
- && code != ResponseCode.PULL_RETRY_IMMEDIATELY) {
+ if (code != ResponseCode.SUCCESS) {
return
RemotingCommand.createResponseCommandWithHeader(responseCode, responseHeader);
} else {
return null;
}
} catch (Throwable t) {
+ t.printStackTrace();
return buildErrorResponse(ResponseCode.SYSTEM_ERROR,
t.getMessage());
}
}
@@ -292,7 +344,8 @@ public class PullMessageProcessor extends
AsyncNettyRequestProcessor implements
return response;
}
- TopicQueueMappingContext mappingContext =
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
false, requestHeader.getQueueOffset());
+
+ TopicQueueMappingContext mappingContext =
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
false);
{
RemotingCommand rewriteResult =
rewriteRequestForStaticTopic(requestHeader, mappingContext);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index d254b8a..03a5dba 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -101,7 +101,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
- TopicQueueMappingContext mappingContext =
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
true, Long.MAX_VALUE);
+ TopicQueueMappingContext mappingContext =
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader,
true);
RemotingCommand rewriteResult =
this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader,
mappingContext);
if (rewriteResult != null) {
return CompletableFuture.completedFuture(rewriteResult);
@@ -130,7 +130,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
}
TopicQueueMappingDetail mappingDetail =
mappingContext.getMappingDetail();
- LogicQueueMappingItem mappingItem =
mappingContext.getMappingItem();
+ LogicQueueMappingItem mappingItem = mappingContext.getLeaderItem();
if (mappingItem == null) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exit in request process of current broker %s",
mappingContext.getTopic(), mappingContext.getGlobalId(),
mappingDetail.getBname()));
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 5fa3a31..1dd9cbf 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.broker.topic;
import com.alibaba.fastjson.JSON;
-import com.google.common.collect.ImmutableList;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
@@ -164,24 +163,24 @@ public class TopicQueueMappingManager extends
ConfigManager {
}
public TopicQueueMappingContext
buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader) {
- return buildTopicQueueMappingContext(requestHeader, false,
Long.MAX_VALUE);
+ return buildTopicQueueMappingContext(requestHeader, false);
}
//Do not return a null context
- public TopicQueueMappingContext
buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean
selectOneWhenMiss, Long globalOffset) {
+ public TopicQueueMappingContext
buildTopicQueueMappingContext(TopicQueueRequestHeader requestHeader, boolean
selectOneWhenMiss) {
if (requestHeader.getPhysical() != null
&& Boolean.TRUE.equals(requestHeader.getPhysical())) {
- return new TopicQueueMappingContext(requestHeader.getTopic(),
requestHeader.getQueueId(), null, null, null, null);
+ return new TopicQueueMappingContext(requestHeader.getTopic(),
requestHeader.getQueueId(), null, null, null);
}
TopicQueueMappingDetail mappingDetail =
getTopicQueueMapping(requestHeader.getTopic());
if (mappingDetail == null) {
//it is not static topic
- return new TopicQueueMappingContext(requestHeader.getTopic(),
requestHeader.getQueueId(), null, null, null, null);
+ return new TopicQueueMappingContext(requestHeader.getTopic(),
requestHeader.getQueueId(), null, null, null);
}
//If not find mappingItem, it encounters some errors
Integer globalId = requestHeader.getQueueId();
if (globalId < 0 && !selectOneWhenMiss) {
- return new TopicQueueMappingContext(requestHeader.getTopic(),
globalId, globalOffset, mappingDetail, null, null);
+ return new TopicQueueMappingContext(requestHeader.getTopic(),
globalId, mappingDetail, null, null);
}
if (globalId < 0) {
@@ -194,24 +193,17 @@ public class TopicQueueMappingManager extends
ConfigManager {
}
}
if (globalId < 0) {
- return new TopicQueueMappingContext(requestHeader.getTopic(),
globalId, globalOffset, mappingDetail, null, null);
+ return new TopicQueueMappingContext(requestHeader.getTopic(),
globalId, mappingDetail, null, null);
}
List<LogicQueueMappingItem> mappingItemList = null;
- LogicQueueMappingItem mappingItem = null;
-
- if (globalOffset == null
- || Long.MAX_VALUE == globalOffset) {
- mappingItemList =
TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
- if (mappingItemList != null
+ LogicQueueMappingItem leaderItem = null;
+ mappingItemList =
TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
+ if (mappingItemList != null
&& mappingItemList.size() > 0) {
- mappingItem = mappingItemList.get(mappingItemList.size() - 1);
- }
- } else {
- mappingItemList =
TopicQueueMappingDetail.getMappingInfo(mappingDetail, globalId);
- mappingItem =
TopicQueueMappingDetail.findLogicQueueMappingItem(mappingItemList,
globalOffset);
+ leaderItem = mappingItemList.get(mappingItemList.size() - 1);
}
- return new TopicQueueMappingContext(requestHeader.getTopic(),
globalId, globalOffset, mappingDetail, mappingItemList, mappingItem);
+ return new TopicQueueMappingContext(requestHeader.getTopic(),
globalId, mappingDetail, mappingItemList, leaderItem);
}
@@ -221,11 +213,10 @@ public class TopicQueueMappingManager extends
ConfigManager {
return null;
}
TopicQueueMappingDetail mappingDetail =
mappingContext.getMappingDetail();
- LogicQueueMappingItem mappingItem =
mappingContext.getMappingItem();
- if (mappingItem == null
- ||
!mappingDetail.getBname().equals(mappingItem.getBname())) {
+ if (!mappingContext.isLeader()) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exit in request process of current broker %s",
requestHeader.getTopic(), requestHeader.getQueueId(),
mappingDetail.getBname()));
}
+ LogicQueueMappingItem mappingItem = mappingContext.getLeaderItem();
requestHeader.setQueueId(mappingItem.getQueueId());
return null;
} catch (Throwable t) {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index da9c8b3..273add4 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -23,6 +23,8 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
+
+import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.PopCallback;
import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.client.consumer.PullCallback;
@@ -170,6 +172,7 @@ public class PullAPIWrapper {
this.recalculatePullFromWhichNode(mq), false);
}
+
if (findBrokerResult != null) {
{
// check version
@@ -203,6 +206,7 @@ public class PullAPIWrapper {
brokerAddr = computePullFromWhichFilterServer(mq.getTopic(),
brokerAddr);
}
+
PullResult pullResult =
this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
diff --git
a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
index 959207e..9f79e9d 100644
---
a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
+++
b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
@@ -32,6 +32,10 @@ public class LogicQueueMappingItem extends
RemotingSerializable {
}
public long computeStaticQueueOffsetUpToEnd(long physicalQueueOffset) {
+ //consider the newly mapped item
+ if (logicOffset < 0) {
+ return -1;
+ }
if (physicalQueueOffset < startOffset) {
return logicOffset;
}
@@ -43,6 +47,9 @@ public class LogicQueueMappingItem extends
RemotingSerializable {
}
public long computeStaticQueueOffset(long physicalQueueOffset) {
+ if (logicOffset < 0) {
+ return logicOffset;
+ }
if (physicalQueueOffset < startOffset) {
return logicOffset;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
index 4a788ab..f705c43 100644
---
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
+++
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingContext.java
@@ -23,18 +23,18 @@ import java.util.List;
public class TopicQueueMappingContext {
private String topic;
private Integer globalId;
- private Long globalOffset;
private TopicQueueMappingDetail mappingDetail;
private List<LogicQueueMappingItem> mappingItemList;
- private LogicQueueMappingItem mappingItem;
+ private LogicQueueMappingItem leaderItem;
- public TopicQueueMappingContext(String topic, Integer globalId, Long
globalOffset, TopicQueueMappingDetail mappingDetail,
List<LogicQueueMappingItem> mappingItemList, LogicQueueMappingItem mappingItem)
{
+ private LogicQueueMappingItem currentItem;
+
+ public TopicQueueMappingContext(String topic, Integer globalId,
TopicQueueMappingDetail mappingDetail, List<LogicQueueMappingItem>
mappingItemList, LogicQueueMappingItem leaderItem) {
this.topic = topic;
this.globalId = globalId;
- this.globalOffset = globalOffset;
this.mappingDetail = mappingDetail;
this.mappingItemList = mappingItemList;
- this.mappingItem = mappingItem;
+ this.leaderItem = leaderItem;
}
@@ -45,33 +45,7 @@ public class TopicQueueMappingContext {
}
public boolean isLeader() {
- if (mappingDetail == null
- || mappingItemList == null
- || mappingItemList.isEmpty()) {
- return false;
- }
- LogicQueueMappingItem mappingItem =
mappingItemList.get(mappingItemList.size() - 1);
- return mappingItem.getBname().equals(mappingDetail.getBname());
- }
-
- public LogicQueueMappingItem findNext() {
- if (mappingDetail == null
- || mappingItem == null
- || mappingItemList == null
- || mappingItemList.isEmpty()) {
- return null;
- }
- for (int i = 0; i < mappingItemList.size(); i++) {
- LogicQueueMappingItem item = mappingItemList.get(i);
- if (item.getGen() == mappingItem.getGen()) {
- if (i < mappingItemList.size() - 1) {
- return mappingItemList.get(i + 1);
- } else {
- return null;
- }
- }
- }
- return null;
+ return leaderItem != null &&
leaderItem.getBname().equals(mappingDetail.getBname());
}
@@ -91,13 +65,6 @@ public class TopicQueueMappingContext {
this.globalId = globalId;
}
- public Long getGlobalOffset() {
- return globalOffset;
- }
-
- public void setGlobalOffset(Long globalOffset) {
- this.globalOffset = globalOffset;
- }
public TopicQueueMappingDetail getMappingDetail() {
return mappingDetail;
@@ -115,13 +82,23 @@ public class TopicQueueMappingContext {
this.mappingItemList = mappingItemList;
}
- public LogicQueueMappingItem getMappingItem() {
- return mappingItem;
+ public LogicQueueMappingItem getLeaderItem() {
+ return leaderItem;
+ }
+
+ public void setLeaderItem(LogicQueueMappingItem leaderItem) {
+ this.leaderItem = leaderItem;
}
- public void setMappingItem(LogicQueueMappingItem mappingItem) {
- this.mappingItem = mappingItem;
+ public LogicQueueMappingItem getCurrentItem() {
+ return currentItem;
}
+ public void setCurrentItem(LogicQueueMappingItem currentItem) {
+ this.currentItem = currentItem;
+ }
+ public void setMappingItemList(List<LogicQueueMappingItem>
mappingItemList) {
+ this.mappingItemList = mappingItemList;
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
index 0659572..1749b8e 100644
---
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
+++
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingDetail.java
@@ -77,28 +77,6 @@ public class TopicQueueMappingDetail extends
TopicQueueMappingInfo {
}
- public static LogicQueueMappingItem
findLogicQueueMappingItem(List<LogicQueueMappingItem> mappingItems, long
logicOffset) {
- if (mappingItems == null
- || mappingItems.isEmpty()) {
- return null;
- }
- //Could use bi-search to polish performance
- for (int i = mappingItems.size() - 1; i >= 0; i--) {
- LogicQueueMappingItem item = mappingItems.get(i);
- if (item.getLogicOffset() >= 0
- && logicOffset >= item.getLogicOffset()) {
- return item;
- }
- }
- //if not found, maybe out of range, return the first one
- for (int i = 0; i < mappingItems.size(); i++) {
- if (!mappingItems.get(i).checkIfShouldDeleted()) {
- return mappingItems.get(i);
- }
- }
- return null;
- }
-
public static long computeMaxOffsetFromMapping(TopicQueueMappingDetail
mappingDetail, Integer globalId) {
List<LogicQueueMappingItem> mappingItems =
getMappingInfo(mappingDetail, globalId);
if (mappingItems == null
diff --git
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index 5527974..975a5ba 100644
---
a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++
b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -618,4 +618,58 @@ public class TopicQueueMappingUtils {
return new TopicRemappingDetailWrapper(topic,
TopicRemappingDetailWrapper.TYPE_REMAPPING, newEpoch, brokerConfigMap,
brokersToMapIn, brokersToMapOut);
}
+ public static LogicQueueMappingItem
findLogicQueueMappingItem(List<LogicQueueMappingItem> mappingItems, long
logicOffset, boolean ignoreNegative) {
+ if (mappingItems == null
+ || mappingItems.isEmpty()) {
+ return null;
+ }
+ //Could use bi-search to polish performance
+ for (int i = mappingItems.size() - 1; i >= 0; i--) {
+ LogicQueueMappingItem item = mappingItems.get(i);
+ if (ignoreNegative && item.getLogicOffset() < 0) {
+ continue;
+ }
+ if (logicOffset >= item.getLogicOffset()) {
+ return item;
+ }
+ }
+ //if not found, maybe out of range, return the first one
+ for (int i = 0; i < mappingItems.size(); i++) {
+ LogicQueueMappingItem item = mappingItems.get(i);
+ if (ignoreNegative && item.getLogicOffset() < 0) {
+ continue;
+ }
+ if (!item.checkIfShouldDeleted()) {
+ return mappingItems.get(i);
+ }
+ }
+ return null;
+ }
+
+ public static LogicQueueMappingItem findNext(List<LogicQueueMappingItem>
items, LogicQueueMappingItem currentItem, boolean ignoreNegative) {
+ if (items == null
+ || currentItem == null) {
+ return null;
+ }
+ for (int i = 0; i < items.size(); i++) {
+ LogicQueueMappingItem item = items.get(i);
+ if (ignoreNegative && item.getLogicOffset() < 0) {
+ continue;
+ }
+ if (item.getGen() == currentItem.getGen()) {
+ if (i < items.size() - 1) {
+ item = items.get(i + 1);
+ if (ignoreNegative && item.getLogicOffset() < 0) {
+ return null;
+ } else {
+ return item;
+ }
+ } else {
+ return null;
+ }
+ }
+ }
+ return null;
+ }
+
}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
index 03a92e8..abe1eee 100644
--- a/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/smoke/StaticTopicIT.java
@@ -48,6 +48,7 @@ public class StaticTopicIT extends BaseConf {
@Before
public void setUp() throws Exception {
+ System.setProperty("rocketmq.client.rebalance.waitInterval", "500");
defaultMQAdminExt = getAdmin(nsAddr);
waitBrokerRegistered(nsAddr, clusterName);
clientMetadata = new ClientMetadata();
@@ -173,7 +174,6 @@ public class StaticTopicIT extends BaseConf {
String topic = "static" + MQRandomUtils.getRandomTopic();
RMQNormalProducer producer = getProducer(nsAddr, topic);
RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new
RMQNormalListener());
- producer.getProducer().setPollNameServerInterval(100);
int queueNum = 10;
int msgEachQueue = 100;
@@ -183,6 +183,9 @@ public class StaticTopicIT extends BaseConf {
targetBrokers.add(broker1Name);
createStaticTopic(topic, queueNum, targetBrokers);
}
+ //System.out.printf("%s %s\n", broker1Name,
clientMetadata.findMasterBrokerAddr(broker1Name));
+ //System.out.printf("%s %s\n", broker2Name,
clientMetadata.findMasterBrokerAddr(broker2Name));
+
//produce the messages
{
List<MessageQueue> messageQueueList = producer.getMessageQueue();
@@ -203,6 +206,11 @@ public class StaticTopicIT extends BaseConf {
}
}
+ consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(),
3000);
+ assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+ consumer.getListener().getAllMsgBody()))
+ .containsExactlyElementsIn(producer.getAllMsgBody());
+
//remapping the static topic
{
Set<String> targetBrokers = new HashSet<>();
@@ -242,8 +250,7 @@ public class StaticTopicIT extends BaseConf {
}
}
{
-
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 3000);
- System.out.println("Consume: " +
consumer.getListener().getAllMsgBody().size());
+
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), 30000);
assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()))
.containsExactlyElementsIn(producer.getAllMsgBody());
@@ -258,7 +265,7 @@ public class StaticTopicIT extends BaseConf {
Assert.assertEquals(queueNum, messagesByQueue.size());
for (int i = 0; i < queueNum; i++) {
List<MessageExt> messageExts = messagesByQueue.get(i);
- Assert.assertEquals(msgEachQueue, messageExts.size());
+ Assert.assertEquals(msgEachQueue * 2, messageExts.size());
Collections.sort(messageExts, new Comparator<MessageExt>() {
@Override
public int compare(MessageExt o1, MessageExt o2) {
@@ -268,12 +275,16 @@ public class StaticTopicIT extends BaseConf {
for (int j = 0; j < msgEachQueue; j++) {
Assert.assertEquals(j,
messageExts.get(j).getQueueOffset());
}
+ for (int j = msgEachQueue; j < msgEachQueue * 2; j++) {
+ Assert.assertEquals(j +
TopicQueueMappingUtils.DEFAULT_BLOCK_SEQ_SIZE - msgEachQueue,
messageExts.get(j).getQueueOffset());
+ }
}
}
}
@After
public void tearDown() {
+ System.setProperty("rocketmq.client.rebalance.waitInterval", "20000");
super.shutdown();
}