This is an automated email from the ASF dual-hosted git repository. huangli pushed a commit to branch 4.9.2_dev_community in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit f9eb9207f1398fbb25275fa81cb6330761b46464 Author: coding <[email protected]> AuthorDate: Fri Nov 5 11:49:11 2021 +0800 混合 topic 的 Batch 生产,一次batch可以发送到多个topic和queue,用于外挂式延迟消息服务 --- .reviewboardrc | 4 + .../processor/AbstractSendMessageProcessor.java | 48 +++++++- .../broker/processor/SendMessageProcessor.java | 32 +++++- .../org/apache/rocketmq/client/Validators.java | 9 +- .../rocketmq/client/common/ClientErrorCode.java | 1 + .../client/impl/factory/MQClientInstance.java | 2 + .../impl/producer/DefaultMQProducerImpl.java | 104 ++++++++++++++++- .../client/impl/producer/TopicPublishInfo.java | 33 +++++- .../client/producer/DefaultMQProducer.java | 16 ++- .../rocketmq/client/producer/MQProducer.java | 4 + .../java/org/apache/rocketmq/common/MixAll.java | 2 + .../rocketmq/common/message/MessageBatch.java | 74 +++++++++++- .../rocketmq/common/message/MessageDecoder.java | 24 +++- .../rocketmq/common/message/MessageExtBatch.java | 30 +++++ .../protocol/header/SendMessageRequestHeader.java | 20 ++++ .../header/SendMessageRequestHeaderV2.java | 36 ++++++ .../apache/rocketmq/common/MessageBatchTest.java | 29 ++++- .../rocketmq/common/MessageEncodeDecodeTest.java | 2 +- .../java/org/apache/rocketmq/store/CommitLog.java | 125 +++++++++++++++++++-- .../apache/rocketmq/store/DefaultMessageStore.java | 15 ++- 20 files changed, 570 insertions(+), 40 deletions(-) diff --git a/.reviewboardrc b/.reviewboardrc new file mode 100644 index 0000000..16bd045 --- /dev/null +++ b/.reviewboardrc @@ -0,0 +1,4 @@ +REVIEWBOARD_URL = "http://rb.corp.kuaishou.com/reviewboard/" +REPOSITORY = "[email protected]:infra/rocketmq.git" +BRANCH = "master" +LAND_DEST_BRANCH = "master" diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index 66480ad..85cb705 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -16,10 +16,8 @@ */ package org.apache.rocketmq.broker.processor; -import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -27,7 +25,6 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageHook; -import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; @@ -46,6 +43,7 @@ import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; import org.apache.rocketmq.common.sysflag.MessageSysFlag; import org.apache.rocketmq.common.sysflag.TopicSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.utils.ChannelUtil; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -56,6 +54,8 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.MessageExtBrokerInner; +import io.netty.channel.ChannelHandlerContext; + public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor { protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -163,8 +163,50 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc return response; } + protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, RemotingCommand response, String[] topics, int[] queueIds) { + for (int i = 0; i < topics.length; i++) { + String topic = topics[i]; + int queueId = queueIds[i]; + if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()) + && this.brokerController.getTopicConfigManager().isOrderTopic(topic)) { + response.setCode(ResponseCode.NO_PERMISSION); + response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden"); + return response; + } + + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); + + if (null == topicConfig) { + response.setCode(ResponseCode.TOPIC_NOT_EXIST); + response.setRemark("topic[" + topic + "] not exist, apply first please!" + + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); + return response; + } + + int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums()); + if (queueId >= idValid) { + String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s", + queueId, + topicConfig, + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); + + log.warn(errorInfo); + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(errorInfo); + + return response; + } + } + + return response; + } + protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, final SendMessageRequestHeader requestHeader, final RemotingCommand response) { + if (requestHeader.isMultiTopic()) { + return response; + } + if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()) && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) { response.setCode(ResponseCode.NO_PERMISSION); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index b31c71e..994d596 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -17,13 +17,13 @@ package org.apache.rocketmq.broker.processor; import java.net.SocketAddress; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; -import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext; import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook; @@ -62,6 +62,8 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.StorePathConfigHelper; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import io.netty.channel.ChannelHandlerContext; + public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { private List<ConsumeMessageHook> consumeMessageHookList; @@ -584,6 +586,27 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return CompletableFuture.completedFuture(response); } + String[] topics = null; + int[] queueIds = null; + if (requestHeader.isMultiTopic()) { + topics = requestHeader.getTopic().split(MixAll.BATCH_TOPIC_SPLITTER); // decode topics + if (requestHeader.getQueueIds() == null) { + queueIds = new int[topics.length]; + for (int i = 0; i < topics.length; i++) { + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topics[i]); + queueIds[i] = randomQueueId(topicConfig.getWriteQueueNums()); + } + } else { + // decode queueIds + queueIds = Arrays.stream(requestHeader.getQueueIds().split(MixAll.BATCH_QUEUE_ID_SPLITTER)).mapToInt(Integer::parseInt).toArray(); + } + msgCheck(ctx, response, topics, queueIds); + if (response.getCode() != -1) { + return CompletableFuture.completedFuture(response); + } + requestHeader.setTopic(topics[0]); + } + int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); @@ -600,6 +623,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement MessageExtBatch messageExtBatch = new MessageExtBatch(); messageExtBatch.setTopic(requestHeader.getTopic()); messageExtBatch.setQueueId(queueIdInt); + if (requestHeader.isMultiTopic()) { + messageExtBatch.setMultiTopic(true); + messageExtBatch.setTopics(topics); + messageExtBatch.setQueueIds(queueIds); + } int sysFlag = requestHeader.getSysFlag(); if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { @@ -621,8 +649,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement return handlePutMessageResultFuture(putMessageResult, response, request, messageExtBatch, responseHeader, mqtraceContext, ctx, queueIdInt); } - - public boolean hasConsumeMessageHook() { return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty(); } diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java index e712e2f..cf5f078 100644 --- a/client/src/main/java/org/apache/rocketmq/client/Validators.java +++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java @@ -23,6 +23,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.topic.TopicValidator; @@ -84,8 +85,12 @@ public class Validators { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // topic - Validators.checkTopic(msg.getTopic()); - Validators.isNotAllowedSendTopic(msg.getTopic()); + if (msg instanceof MessageBatch && ((MessageBatch) msg).isMultiTopic()) { + // ignore check + } else { + Validators.checkTopic(msg.getTopic()); + Validators.isNotAllowedSendTopic(msg.getTopic()); + } // body if (null == msg.getBody()) { diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java index bc03b14..8aa4856 100644 --- a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java +++ b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java @@ -25,4 +25,5 @@ public class ClientErrorCode { public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005; public static final int REQUEST_TIMEOUT_EXCEPTION = 10006; public static final int CREATE_REPLY_MESSAGE_EXCEPTION = 10007; + public static final int NOT_FOUND_MULTI_TOPIC_EXCEPTION = 10008; } \ No newline at end of file diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 9651943..7db4ccf 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -168,6 +168,7 @@ public class MQClientInstance { for (int i = 0; i < nums; i++) { MessageQueue mq = new MessageQueue(topic, item[0], i); info.getMessageQueueList().add(mq); + info.getBrokers().add(mq.getBrokerName()); } } @@ -196,6 +197,7 @@ public class MQClientInstance { for (int i = 0; i < qd.getWriteQueueNums(); i++) { MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); info.getMessageQueueList().add(mq); + info.getBrokers().add(mq.getBrokerName()); } } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 2a784b5..117a8c6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -16,12 +16,15 @@ */ package org.apache.rocketmq.client.impl.producer; + import java.io.IOException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -36,9 +39,11 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.common.ClientErrorCode; +import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.RequestTimeoutException; @@ -98,6 +103,7 @@ import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; public class DefaultMQProducerImpl implements MQProducerInner { private final InternalLogger log = ClientLogger.getLog(); private final Random random = new Random(); + private final ThreadLocalIndex sendWhichBroker = new ThreadLocalIndex(); private final DefaultMQProducer defaultMQProducer; private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>(); @@ -547,6 +553,52 @@ public class DefaultMQProducerImpl implements MQProducerInner { } + public MessageQueue selectOneMessageQueue(MessageBatch message, final Map<String, TopicPublishInfo> tpInfoMap, final String lastBrokerName) + throws MQClientException { + // select intersection brokerName first. + Set<String> sharedBrokers = null; + for (TopicPublishInfo tpi : tpInfoMap.values()) { + Set<String> brokers = tpi.getBrokers(); + if (sharedBrokers == null) { + sharedBrokers = brokers; + } else { + sharedBrokers.retainAll(brokers); + } + } + + if (sharedBrokers == null || sharedBrokers.isEmpty()) { + throw new MQClientException(ClientErrorCode.NOT_FOUND_MULTI_TOPIC_EXCEPTION, "multi topic batch route not found, sharedBroker empty"); + } + + List<String> brokers = new ArrayList<>(sharedBrokers); + int index = Math.abs(sendWhichBroker.incrementAndGet()) % brokers.size(); + if (index < 0) index = 0; + String brokerName = brokers.get(index); + if (lastBrokerName != null && brokers.size() != 1 && brokerName.equals(lastBrokerName)) { + index++; + if (index == brokers.size()) { + index = 0; + } + brokerName = brokers.get(index); + } + + Map<String, Integer> queueIdMap = new HashMap<>(tpInfoMap.size()); + String firstTopic = null; + for (Map.Entry<String, TopicPublishInfo> entry : tpInfoMap.entrySet()) { + MessageQueue mq = entry.getValue().selectOneMessageQueueByBrokerName(brokerName); + if (mq == null) { + throw new MQClientException(ClientErrorCode.NOT_FOUND_MULTI_TOPIC_EXCEPTION, "multi topic batch route not found"); + } + queueIdMap.put(entry.getKey(), mq.getQueueId()); + if (firstTopic == null) { + firstTopic = entry.getKey(); + } + } + + message.setQueueIdMap(queueIdMap); + return new MessageQueue(firstTopic, brokerName, 0); // only brokerName matters. + } + public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); } @@ -576,8 +628,19 @@ public class DefaultMQProducerImpl implements MQProducerInner { long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; - TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); - if (topicPublishInfo != null && topicPublishInfo.ok()) { + + Map<String, TopicPublishInfo> topicPublishInfoMap = null; + TopicPublishInfo topicPublishInfo = null; + boolean multiTopic = false; + if (msg instanceof MessageBatch && ((MessageBatch) msg).isMultiTopic()) { + multiTopic = true; + MessageBatch messageBatch = (MessageBatch) msg; + topicPublishInfoMap = this.tryToFindTopicPublishInfo(messageBatch.getTopicIndexMap().keySet()); + } else { + topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); + } + + if (multiTopic || (topicPublishInfo != null && topicPublishInfo.ok())) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; @@ -587,7 +650,12 @@ public class DefaultMQProducerImpl implements MQProducerInner { String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); - MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); + MessageQueue mqSelected; + if (multiTopic) { + mqSelected = this.selectOneMessageQueue((MessageBatch) msg, topicPublishInfoMap, lastBrokerName); + } else { + mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); + } if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); @@ -695,6 +763,19 @@ public class DefaultMQProducerImpl implements MQProducerInner { null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); } + public Map<String, TopicPublishInfo> tryToFindTopicPublishInfo(final Set<String> topics) throws MQClientException { + Map<String, TopicPublishInfo> topicPublishInfoMap = new HashMap<>(topics.size()); + for (String topic : topics) { + TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo(topic); + if (topicPublishInfo == null || !topicPublishInfo.ok()) { + throw new MQClientException("No route info of this topic: " + topic + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), + null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); + } + topicPublishInfoMap.put(topic, topicPublishInfo); + } + return topicPublishInfoMap; + } + private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { @@ -800,6 +881,23 @@ public class DefaultMQProducerImpl implements MQProducerInner { requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); + + if (msg instanceof MessageBatch && ((MessageBatch) msg).isMultiTopic()) { + Map<String, Integer> queueIdMap = ((MessageBatch) msg).getQueueIdMap(); + String topic = msg.getTopic(); + StringBuilder sb = new StringBuilder(); + int idx = 0; + for (String s : topic.split(MixAll.BATCH_TOPIC_SPLITTER)) { + if (idx != 0) { + sb.append(MixAll.BATCH_QUEUE_ID_SPLITTER); + } + sb.append(queueIdMap.get(s)); + idx++; + } + requestHeader.setMultiTopic(true); + requestHeader.setQueueIds(sb.toString()); + } + if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java index 2f8337e..29c2b12 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java @@ -17,7 +17,11 @@ package org.apache.rocketmq.client.impl.producer; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + import org.apache.rocketmq.client.common.ThreadLocalIndex; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.route.QueueData; @@ -27,6 +31,7 @@ public class TopicPublishInfo { private boolean orderTopic = false; private boolean haveTopicRouterInfo = false; private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); + private Set<String> brokers = new HashSet<>(); private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private TopicRouteData topicRouteData; @@ -42,6 +47,14 @@ public class TopicPublishInfo { return null != this.messageQueueList && !this.messageQueueList.isEmpty(); } + public Set<String> getBrokers() { + return brokers; + } + + public void setBrokers(Set<String> brokers) { + this.brokers = brokers; + } + public List<MessageQueue> getMessageQueueList() { return messageQueueList; } @@ -84,6 +97,24 @@ public class TopicPublishInfo { } } + public MessageQueue selectOneMessageQueueByBrokerName(final String brokerName) { + if (this.messageQueueList == null) { + return null; + } + List<MessageQueue> messageQueues = this.messageQueueList.stream() + .filter(mq -> mq.getBrokerName().equals(brokerName)) + .collect(Collectors.toList()); + if (messageQueues.isEmpty()) { + return null; + } + + int index = this.sendWhichQueue.incrementAndGet(); + int pos = Math.abs(index) % messageQueues.size(); + if (pos < 0) + pos = 0; + return messageQueues.get(pos); + } + public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); @@ -106,7 +137,7 @@ public class TopicPublishInfo { @Override public String toString() { return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList - + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]"; + + ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + ", brokers=" + brokers + "]"; } public TopicRouteData getTopicRouteData() { diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 9f91b41..92e5a41 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; + import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; @@ -910,6 +911,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } @Override + public SendResult sendMultiTopicBatch(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQProducerImpl.send(multiTopicBatch(msgs)); + } + + @Override public SendResult send(Collection<Message> msgs, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(batch(msgs), timeout); @@ -980,9 +986,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } private MessageBatch batch(Collection<Message> msgs) throws MQClientException { + return batch(msgs, false); + } + + private MessageBatch batch(Collection<Message> msgs, boolean allowMultiTopic) throws MQClientException { MessageBatch msgBatch; try { - msgBatch = MessageBatch.generateFromList(msgs); + msgBatch = MessageBatch.generateFromList(msgs, allowMultiTopic); for (Message message : msgBatch) { Validators.checkMessage(message, this); MessageClientIDSetter.setUniqID(message); @@ -996,6 +1006,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { return msgBatch; } + private MessageBatch multiTopicBatch(Collection<Message> msgs) throws MQClientException { + return batch(msgs, true); + } + public String getProducerGroup() { return producerGroup; } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index f70ddb2..9325a62 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -91,6 +91,10 @@ public interface MQProducer extends MQAdmin { SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; + // for batch msgs with multi topics. + SendResult sendMultiTopicBatch(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException; + SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index ec1e1f0..1ab67cb 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -83,6 +83,8 @@ public class MixAll { public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS"; public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml"; public static final String REPLY_MESSAGE_FLAG = "reply"; + public static final String BATCH_TOPIC_SPLITTER = "%%"; + public static final String BATCH_QUEUE_ID_SPLITTER = ","; private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); public static String getWSAddr() { diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java index a6b801e..9d1e340 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageBatch.java @@ -18,8 +18,13 @@ package org.apache.rocketmq.common.message; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.rocketmq.common.MixAll; public class MessageBatch extends Message implements Iterable<Message> { @@ -27,23 +32,59 @@ public class MessageBatch extends Message implements Iterable<Message> { private static final long serialVersionUID = 621335151046335557L; private final List<Message> messages; + private boolean multiTopic = false; + + private Map<String/*topic*/, Integer/*index*/> topicIndexMap; + private Map<String/*topic*/, Integer/*queueId*/> queueIdMap; + private MessageBatch(List<Message> messages) { this.messages = messages; } + private MessageBatch(List<Message> messages, boolean multiTopic) { + this.messages = messages; + this.multiTopic = multiTopic; + } + + public boolean isMultiTopic() { + return multiTopic; + } + + public void setTopicIndexMap(Map<String, Integer> topicIndexMap) { + this.topicIndexMap = topicIndexMap; + } + + public Map<String, Integer> getTopicIndexMap() { + return topicIndexMap; + } + + public Map<String, Integer> getQueueIdMap() { + return queueIdMap; + } + + public void setQueueIdMap(Map<String, Integer> queueIdMap) { + this.queueIdMap = queueIdMap; + } + public byte[] encode() { - return MessageDecoder.encodeMessages(messages); + if (multiTopic) { + return MessageDecoder.encodeMultiTopicMessages(messages, topicIndexMap); + } else { + return MessageDecoder.encodeMessages(messages); + } } public Iterator<Message> iterator() { return messages.iterator(); } - public static MessageBatch generateFromList(Collection<Message> messages) { + public static MessageBatch generateFromList(Collection<Message> messages, boolean allowMultiTopic) { assert messages != null; assert messages.size() > 0; List<Message> messageList = new ArrayList<Message>(messages.size()); Message first = null; + boolean multiTopic = false; + Set<String> topics = new HashSet<>(); for (Message message : messages) { if (message.getDelayTimeLevel() > 0) { throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching"); @@ -55,19 +96,40 @@ public class MessageBatch extends Message implements Iterable<Message> { first = message; } else { if (!first.getTopic().equals(message.getTopic())) { - throw new UnsupportedOperationException("The topic of the messages in one batch should be the same"); + if (!allowMultiTopic) { + throw new UnsupportedOperationException("The topic of the messages in one batch should be the same"); + } + multiTopic = true; } if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) { throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same"); } } messageList.add(message); + topics.add(message.getTopic()); } - MessageBatch messageBatch = new MessageBatch(messageList); - messageBatch.setTopic(first.getTopic()); + MessageBatch messageBatch = new MessageBatch(messageList, multiTopic); + + if (multiTopic) { + Map<String, Integer> topicIndexMap = new HashMap<>(topics.size()); + int index = 0; + StringBuilder sb = new StringBuilder(); + for (String topic : topics) { + if (index != 0) { + sb.append(MixAll.BATCH_TOPIC_SPLITTER); + } + sb.append(topic); + topicIndexMap.put(topic, index); + index++; + } + messageBatch.setTopic(sb.toString()); + messageBatch.setTopicIndexMap(topicIndexMap); + } else { + messageBatch.setTopic(first.getTopic()); + } messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK()); + return messageBatch; } - } diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java index c94700e..f6a9bf9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; + import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.sysflag.MessageSysFlag; @@ -471,7 +472,7 @@ public class MessageDecoder { return map; } - public static byte[] encodeMessage(Message message) { + public static byte[] encodeMessage(Message message, boolean multiTopic, Map<String, Integer> topicIndex) { //only need flag, body, properties byte[] body = message.getBody(); int bodyLen = body.length; @@ -484,8 +485,9 @@ public class MessageDecoder { + 4 // 2 MAGICCOD + 4 // 3 BODYCRC + 4 // 4 FLAG - + 4 + bodyLen // 4 BODY - + 2 + propertiesLength; + + 4 + bodyLen // 5 BODY + + 2 + propertiesLength // 6 PROPERTY + + (multiTopic ? 4 : 0); // 7 TOPIC_INDEX ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize); // 1 TOTALSIZE byteBuffer.putInt(storeSize); @@ -508,6 +510,10 @@ public class MessageDecoder { byteBuffer.putShort(propertiesLength); byteBuffer.put(propertiesBytes); + if (multiTopic) { + // 7. topic_index. + byteBuffer.putInt(topicIndex.get(message.getTopic())); + } return byteBuffer.array(); } @@ -542,12 +548,16 @@ public class MessageDecoder { return message; } - public static byte[] encodeMessages(List<Message> messages) { + public static byte[] encodeMultiTopicMessages(List<Message> messages, Map<String, Integer> map) { + return doEncodeMessages(messages, true, map); + } + + private static byte[] doEncodeMessages(List<Message> messages, boolean multiTopic, Map<String, Integer> topicIndexMap) { //TO DO refactor, accumulate in one buffer, avoid copies List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size()); int allSize = 0; for (Message message : messages) { - byte[] tmp = encodeMessage(message); + byte[] tmp = encodeMessage(message, multiTopic, topicIndexMap); encodedMessages.add(tmp); allSize += tmp.length; } @@ -560,6 +570,10 @@ public class MessageDecoder { return allBytes; } + public static byte[] encodeMessages(List<Message> messages) { + return doEncodeMessages(messages, false, null); + } + public static List<Message> decodeMessages(ByteBuffer byteBuffer) throws Exception { //TO DO add a callback for processing, avoid creating lists List<Message> msgs = new ArrayList<Message>(); diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java index a2713cb..21fa505 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBatch.java @@ -30,6 +30,12 @@ public class MessageExtBatch extends MessageExt { private ByteBuffer encodedBuff; + private boolean multiTopic; + + private String[] topics; + + private int[] queueIds; + public ByteBuffer getEncodedBuff() { return encodedBuff; } @@ -37,4 +43,28 @@ public class MessageExtBatch extends MessageExt { public void setEncodedBuff(ByteBuffer encodedBuff) { this.encodedBuff = encodedBuff; } + + public boolean isMultiTopic() { + return multiTopic; + } + + public void setMultiTopic(boolean multiTopic) { + this.multiTopic = multiTopic; + } + + public String[] getTopics() { + return topics; + } + + public void setTopics(String[] topics) { + this.topics = topics; + } + + public int[] getQueueIds() { + return queueIds; + } + + public void setQueueIds(int[] queueIds) { + this.queueIds = queueIds; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java index 2df31e6..fc828d9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java @@ -50,6 +50,10 @@ public class SendMessageRequestHeader implements CommandCustomHeader { private boolean unitMode = false; @CFNullable private boolean batch = false; + @CFNullable + private boolean multiTopic = false; + @CFNullable + private String queueIds; private Integer maxReconsumeTimes; @Override @@ -159,4 +163,20 @@ public class SendMessageRequestHeader implements CommandCustomHeader { public void setBatch(boolean batch) { this.batch = batch; } + + public boolean isMultiTopic() { + return multiTopic; + } + + public void setMultiTopic(boolean multiTopic) { + this.multiTopic = multiTopic; + } + + public String getQueueIds() { + return queueIds; + } + + public void setQueueIds(String queueIds) { + this.queueIds = queueIds; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java index 498a7fa..de26947 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java @@ -57,6 +57,12 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode @CFNullable private boolean m; //batch + @CFNullable + private boolean n; //multi topic + + @CFNullable + private String o; // queueIds + public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) { SendMessageRequestHeader v1 = new SendMessageRequestHeader(); v1.setProducerGroup(v2.a); @@ -72,6 +78,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode v1.setUnitMode(v2.k); v1.setMaxReconsumeTimes(v2.l); v1.setBatch(v2.m); + v1.setMultiTopic(v2.n); + v1.setQueueIds(v2.o); return v1; } @@ -90,6 +98,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode v2.k = v1.isUnitMode(); v2.l = v1.getMaxReconsumeTimes(); v2.m = v1.isBatch(); + v2.n = v1.isMultiTopic(); + v2.o = v1.getQueueIds(); return v2; } @@ -156,6 +166,16 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode if (str != null) { m = Boolean.parseBoolean(str); } + + str = fields.get("n"); + if (str != null) { + n = Boolean.parseBoolean(str); + } + + str = fields.get("o"); + if (str != null) { + o = str; + } } public String getA() { @@ -261,4 +281,20 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader, FastCode public void setM(boolean m) { this.m = m; } + + public boolean isN() { + return n; + } + + public void setN(boolean n) { + this.n = n; + } + + public String getO() { + return o; + } + + public void setO(String o) { + this.o = o; + } } \ No newline at end of file diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java index f264420..363f576 100644 --- a/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/MessageBatchTest.java @@ -18,9 +18,11 @@ package org.apache.rocketmq.common; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageBatch; +import org.junit.Assert; import org.junit.Test; public class MessageBatchTest { @@ -38,34 +40,51 @@ public class MessageBatchTest { @Test public void testGenerate_OK() throws Exception { List<Message> messages = generateMessages(); - MessageBatch.generateFromList(messages); + MessageBatch.generateFromList(messages, false); } @Test(expected = UnsupportedOperationException.class) public void testGenerate_DiffTopic() throws Exception { List<Message> messages = generateMessages(); messages.get(1).setTopic("topic2"); - MessageBatch.generateFromList(messages); + MessageBatch.generateFromList(messages, false); } @Test(expected = UnsupportedOperationException.class) public void testGenerate_DiffWaitOK() throws Exception { List<Message> messages = generateMessages(); messages.get(1).setWaitStoreMsgOK(false); - MessageBatch.generateFromList(messages); + MessageBatch.generateFromList(messages, false); } @Test(expected = UnsupportedOperationException.class) public void testGenerate_Delay() throws Exception { List<Message> messages = generateMessages(); messages.get(1).setDelayTimeLevel(1); - MessageBatch.generateFromList(messages); + MessageBatch.generateFromList(messages, false); } @Test(expected = UnsupportedOperationException.class) public void testGenerate_Retry() throws Exception { List<Message> messages = generateMessages(); messages.get(1).setTopic(MixAll.RETRY_GROUP_TOPIC_PREFIX + "topic"); - MessageBatch.generateFromList(messages); + MessageBatch.generateFromList(messages, false); + } + + @Test + public void testGenerate_MultiTopic() { + List<Message> messages = Arrays.asList( + new Message("topicA", "bodyA1".getBytes()), + new Message("topicB", "bodyB1".getBytes()), + new Message("topicA", "bodyA2".getBytes()), + new Message("topicB", "bodyB2".getBytes()) + ); + + MessageBatch messageBatch = MessageBatch.generateFromList(messages, true); + Assert.assertEquals(messageBatch.getTopic(), "topicA%%topicB"); + String[] topics = messageBatch.getTopic().split(MixAll.BATCH_TOPIC_SPLITTER); + for (int i = 0; i < topics.length; i++) { + Assert.assertEquals((int)messageBatch.getTopicIndexMap().get(topics[i]), i); + } } } diff --git a/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java b/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java index 42d3909..fe7a939 100644 --- a/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/MessageEncodeDecodeTest.java @@ -33,7 +33,7 @@ public class MessageEncodeDecodeTest { Message message = new Message("topic", "body".getBytes()); message.setFlag(12); message.putUserProperty("key", "value"); - byte[] bytes = MessageDecoder.encodeMessage(message); + byte[] bytes = MessageDecoder.encodeMessage(message, false, null); ByteBuffer buffer = ByteBuffer.allocate(bytes.length); buffer.put(bytes); buffer.flip(); diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 36db2f5..49ad725 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -1259,16 +1259,13 @@ public class CommitLog { class DefaultAppendMessageCallback implements AppendMessageCallback { // File at the end of the minimum fixed length empty private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4; - private final ByteBuffer msgIdMemory; - private final ByteBuffer msgIdV6Memory; // Store the message content private final ByteBuffer msgStoreItemMemory; // The maximum length of the message private final int maxMessageSize; + private final StringBuilder keyBuilder = new StringBuilder(); DefaultAppendMessageCallback(final int size) { - this.msgIdMemory = ByteBuffer.allocate(4 + 4 + 8); - this.msgIdV6Memory = ByteBuffer.allocate(16 + 4 + 8); this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH); this.maxMessageSize = size; } @@ -1370,6 +1367,9 @@ public class CommitLog { public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) { + if (messageExtBatch.isMultiTopic()) { + return doAppendMultiTopic(fileFromOffset, byteBuffer, maxBlank, messageExtBatch, putMessageContext); + } byteBuffer.mark(); //physical offset long wroteOffset = fileFromOffset + byteBuffer.position(); @@ -1468,6 +1468,105 @@ public class CommitLog { return result; } + private AppendMessageResult doAppendMultiTopic(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, + final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) { + //physical offset + long wroteOffset = fileFromOffset + byteBuffer.position(); + + int totalMsgLen = 0; + int msgNum = 0; + byteBuffer.mark(); + + final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); + ByteBuffer messageByteBuff = messageExtBatch.getEncodedBuff(); + messageByteBuff.mark(); + + int sysFlag = messageExtBatch.getSysFlag(); + int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; + int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; + Supplier<String> msgIdSupplier = () -> { + int msgIdLen = storeHostLength + 8; + int batchCount = putMessageContext.getBatchSize(); + long[] phyPosArray = putMessageContext.getPhyPos(); + ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen); + MessageExt.socketAddress2ByteBuffer(messageExtBatch.getStoreHost(), msgIdBuffer); + msgIdBuffer.clear(); + StringBuilder buffer = new StringBuilder(batchCount * msgIdLen * 2 + batchCount - 1); + for (int i = 0; i < phyPosArray.length; i++) { + msgIdBuffer.putLong(msgIdLen - 8, phyPosArray[i]); + String msgId = UtilAll.bytes2string(msgIdBuffer.array()); + if (i != 0) { + buffer.append(','); + } + buffer.append(msgId); + } + return buffer.toString(); + }; + + if (messageExtBatch.getStoreSize() + END_FILE_MIN_BLANK_LENGTH > maxBlank) { + this.msgStoreItemMemory.clear(); + this.msgStoreItemMemory.putInt(maxBlank); + this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); + messageByteBuff.reset(); + byteBuffer.reset(); + byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8); + return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdSupplier, messageExtBatch.getStoreTimestamp(), + 0, CommitLog.this.defaultMessageStore.now() - beginTimeMills); + } + + int index = 0; + while (messageByteBuff.hasRemaining()) { + final int msgPos = messageByteBuff.position(); + final int msgLen = messageByteBuff.getInt(); + totalMsgLen += msgLen; + + messageByteBuff.position(msgPos + 12); // move to queueId + int queueId = messageByteBuff.getInt(); + messageByteBuff.position(msgPos + 20); // move to topic_index(queueOffset) + int topicIndex = (int)messageByteBuff.getLong(); + String topic = messageExtBatch.getTopics()[topicIndex]; + + // move to add queue offset and commitlog offset + int pos = msgPos + 20; + messageByteBuff.putLong(pos, getQueueOffset(topic, queueId)); // update queueOffset + pos += 8; + messageByteBuff.putLong(pos, wroteOffset + totalMsgLen - msgLen); // update commitLog offset + // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP + // refresh store time stamp in lock + pos += 8 + 4 + 8 + bornHostLength; + messageByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp()); // update store timestamp + + putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen; + msgNum++; + messageByteBuff.position(msgPos + msgLen); + } + + messageByteBuff.position(0); + messageByteBuff.limit(totalMsgLen); + byteBuffer.put(messageByteBuff); + messageExtBatch.setEncodedBuff(null); + AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, + msgIdSupplier, messageExtBatch.getStoreTimestamp(), 0, + CommitLog.this.defaultMessageStore.now() - beginTimeMills); + result.setMsgNum(msgNum); + return result; + } + + private long getQueueOffset(String topic, int queueId) { + keyBuilder.setLength(0); + keyBuilder.append(topic); + keyBuilder.append('-'); + keyBuilder.append(queueId); + String key = keyBuilder.toString(); + Long queueOffset = CommitLog.this.topicQueueTable.get(key); + if (null == queueOffset) { + queueOffset = 0L; + } + + CommitLog.this.topicQueueTable.put(key, queueOffset + 1); + return queueOffset; + } + private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { byteBuffer.flip(); byteBuffer.limit(limit); @@ -1608,7 +1707,18 @@ public class CommitLog { int propertiesPos = messagesByteBuff.position(); messagesByteBuff.position(propertiesPos + propertiesLen); - final byte[] topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); + byte[] topicData; + int queueId; + int index = 0; + if (messageExtBatch.isMultiTopic()) { + // 7. index + index = messagesByteBuff.getInt(); + topicData = messageExtBatch.getTopics()[index].getBytes(MessageDecoder.CHARSET_UTF8); + queueId = messageExtBatch.getQueueIds()[index]; + } else { + topicData = messageExtBatch.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); + queueId = messageExtBatch.getQueueId(); + } final int topicLength = topicData.length; @@ -1635,11 +1745,11 @@ public class CommitLog { // 3 BODYCRC this.encoderBuffer.putInt(bodyCrc); // 4 QUEUEID - this.encoderBuffer.putInt(messageExtBatch.getQueueId()); + this.encoderBuffer.putInt(queueId); // 5 FLAG this.encoderBuffer.putInt(flag); // 6 QUEUEOFFSET - this.encoderBuffer.putLong(0); + this.encoderBuffer.putLong(index); // 7 PHYSICALOFFSET this.encoderBuffer.putLong(0); // 8 SYSFLAG @@ -1677,6 +1787,7 @@ public class CommitLog { putMessageContext.setBatchSize(batchSize); putMessageContext.setPhyPos(new long[batchSize]); encoderBuffer.flip(); + messageExtBatch.setStoreSize(maxMessageSize); return encoderBuffer; } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 7f4fcc8..3b788f0 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -375,9 +375,18 @@ public class DefaultMessageStore implements MessageStore { } private PutMessageStatus checkMessages(MessageExtBatch messageExtBatch) { - if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) { - log.warn("putMessage message topic length too long " + messageExtBatch.getTopic().length()); - return PutMessageStatus.MESSAGE_ILLEGAL; + if (messageExtBatch.isMultiTopic()) { + for (String topic : messageExtBatch.getTopics()) { + if (topic.length() > Byte.MAX_VALUE) { + log.warn("putMessage message topic length too long " + topic.length()); + return PutMessageStatus.MESSAGE_ILLEGAL; + } + } + } else { + if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) { + log.warn("putMessage message topic length too long " + messageExtBatch.getTopic().length()); + return PutMessageStatus.MESSAGE_ILLEGAL; + } } if (messageExtBatch.getBody().length > messageStoreConfig.getMaxMessageSize()) {
