This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit b92a29e135c1a1b58351761d9621d5c3524f1ec7 Author: nowinkey <[email protected]> AuthorDate: Fri Jan 20 02:03:24 2023 +0800 Create ConcurrentReputMessageService and replace native thread with ServiceThread --- .../org/apache/rocketmq/common/BrokerConfig.java | 10 + .../apache/rocketmq/store/DefaultMessageStore.java | 409 +++++++++++++-------- 2 files changed, 259 insertions(+), 160 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 23307ab03..9bf615f61 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -394,6 +394,8 @@ public class BrokerConfig extends BrokerIdentity { private long channelExpiredTimeout = 1000 * 120; private long subscriptionExpiredTimeout = 1000 * 60 * 10; + private int batchDispatchRequestThreadPoolNums = 16; + /** * Estimate accumulation or not when subscription filter type is tag and is not SUB_ALL. */ @@ -1646,4 +1648,12 @@ public class BrokerConfig extends BrokerIdentity { public void setEstimateAccumulation(boolean estimateAccumulation) { this.estimateAccumulation = estimateAccumulation; } + + public int getBatchDispatchRequestThreadPoolNums() { + return batchDispatchRequestThreadPoolNums; + } + + public void setBatchDispatchRequestThreadPoolNums(int batchDispatchRequestThreadPoolNums) { + this.batchDispatchRequestThreadPoolNums = batchDispatchRequestThreadPoolNums; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index a4af44222..10f54a36f 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -42,6 +42,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.ExecutionException; @@ -134,6 +135,10 @@ public class DefaultMessageStore implements MessageStore { private ReputMessageService reputMessageService; + private MainBatchDispatchRequestService mainBatchDispatchRequestService; + + private DispatchService dispatchService; + private HAService haService; // CompactionLog @@ -189,6 +194,14 @@ public class DefaultMessageStore implements MessageStore { private int maxDelayLevel; + private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0); + + private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>(); + + private int dispatchRequestOrderlyQueueSize = 16; + + private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize); + public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { this.messageArrivingListener = messageArrivingListener; @@ -225,7 +238,13 @@ public class DefaultMessageStore implements MessageStore { } } - this.reputMessageService = new ReputMessageService(); + if (!messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) { + this.reputMessageService = new ReputMessageService(); + } else { + this.reputMessageService = new ConcurrentReputMessageService(); + this.mainBatchDispatchRequestService = new MainBatchDispatchRequestService(); + this.dispatchService = new DispatchService(); + } this.transientStorePool = new TransientStorePool(this); @@ -361,6 +380,11 @@ public class DefaultMessageStore implements MessageStore { this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset()); this.reputMessageService.start(); + if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) { + this.mainBatchDispatchRequestService.start(); + this.dispatchService.start(); + } + // Checking is not necessary, as long as the dLedger's implementation exactly follows the definition of Recover, // which is eliminating the dispatch inconsistency between the commitLog and consumeQueue at the end of recovery. this.doRecheckReputOffsetFromCq(); @@ -458,6 +482,12 @@ public class DefaultMessageStore implements MessageStore { } this.commitLog.shutdown(); this.reputMessageService.shutdown(); + if (mainBatchDispatchRequestService != null) { + mainBatchDispatchRequestService.shutdown(); + } + if (dispatchService != null) { + dispatchService.shutdown(); + } this.flushConsumeQueueService.shutdown(); this.allocateMappedFileService.shutdown(); this.storeCheckpoint.flush(); @@ -675,7 +705,12 @@ public class DefaultMessageStore implements MessageStore { this.recoverTopicQueueTable(); - this.reputMessageService = new ReputMessageService(); + if (!messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) { + this.reputMessageService = new ReputMessageService(); + } else { + this.reputMessageService = new ConcurrentReputMessageService(); + } + this.reputMessageService.setReputFromOffset(Math.min(oldReputFromOffset, offsetToTruncate)); this.reputMessageService.start(); } @@ -2646,110 +2681,7 @@ public class DefaultMessageStore implements MessageStore { class ReputMessageService extends ServiceThread { - private volatile long reputFromOffset = 0; - - private int batchId = 0; - - private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0); - - private static final int BATCH_SIZE = 1024 * 1024 * 4; - - private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>(); - - private int dispatchRequestOrderlyQueueSize = 16; - - private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize); - - private int batchDispatchRequestThreadPoolNums = 16; - - private ExecutorService batchDispatchRequestExecutor; - - public ReputMessageService() { - if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) { - initExecutorService(); - startBatchDispatchRequestService(); - } - } - - private void initExecutorService() { - batchDispatchRequestExecutor = new ThreadPoolExecutor( - this.batchDispatchRequestThreadPoolNums, - this.batchDispatchRequestThreadPoolNums, - 1000 * 60, - TimeUnit.MICROSECONDS, - new LinkedBlockingDeque<>(), - new ThreadFactoryImpl("BatchDispatchRequestServiceThread_")); - } - - private void startBatchDispatchRequestService() { - new Thread(() -> { - while (true) { - if (!batchDispatchRequestQueue.isEmpty()) { - BatchDispatchRequest task = batchDispatchRequestQueue.poll(); - batchDispatchRequestExecutor.execute(() -> { - ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate(); - tmpByteBuffer.position(task.position); - tmpByteBuffer.limit(task.position + task.size); - List<DispatchRequest> dispatchRequestList = new ArrayList<>(); - while (tmpByteBuffer.hasRemaining()) { - DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false); - if (dispatchRequest.isSuccess()) { - dispatchRequestList.add(dispatchRequest); - } else { - LOGGER.error("[BUG]read total count not equals msg total size."); - } - } - this.dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()])); - mappedPageHoldCount.getAndDecrement(); - }); - } else { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - }, "MainBatchDispatchRequestServiceThread").start(); - - new Thread(() -> { - List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>(); - while (true) { - dispatchRequestsList.clear(); - dispatchRequestOrderlyQueue.get(dispatchRequestsList); - if (!dispatchRequestsList.isEmpty()) { - for (DispatchRequest[] dispatchRequests : dispatchRequestsList) { - for (DispatchRequest dispatchRequest : dispatchRequests) { - DefaultMessageStore.this.doDispatch(dispatchRequest); - // wake up long-polling - if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable() - && DefaultMessageStore.this.messageArrivingListener != null) { - DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), - dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, - dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), - dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); - notifyMessageArrive4MultiQueue(dispatchRequest); - } - if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && - DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { - DefaultMessageStore.this.storeStatsService - .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1); - DefaultMessageStore.this.storeStatsService - .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) - .add(dispatchRequest.getMsgSize()); - } - } - } - } else { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - }, "DispatchServiceThread").start(); - } + public volatile long reputFromOffset = 0; public long getReputFromOffset() { return reputFromOffset; @@ -2781,14 +2713,14 @@ public class DefaultMessageStore implements MessageStore { return DefaultMessageStore.this.getConfirmOffset() - this.reputFromOffset; } - private boolean isCommitLogAvailable() { + public boolean isCommitLogAvailable() { if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()) { return this.reputFromOffset <= DefaultMessageStore.this.commitLog.getConfirmOffset(); } return this.reputFromOffset < DefaultMessageStore.this.getConfirmOffset(); } - private void doReput() { + public void doReput() { if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.", this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset()); @@ -2865,7 +2797,187 @@ public class DefaultMessageStore implements MessageStore { } } - private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) { + private void notifyMessageArrive4MultiQueue(DispatchRequest dispatchRequest) { + Map<String, String> prop = dispatchRequest.getPropertiesMap(); + if (prop == null) { + return; + } + String multiDispatchQueue = prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); + String multiQueueOffset = prop.get(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET); + if (StringUtils.isBlank(multiDispatchQueue) || StringUtils.isBlank(multiQueueOffset)) { + return; + } + String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + String[] queueOffsets = multiQueueOffset.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); + if (queues.length != queueOffsets.length) { + return; + } + for (int i = 0; i < queues.length; i++) { + String queueName = queues[i]; + long queueOffset = Long.parseLong(queueOffsets[i]); + int queueId = dispatchRequest.getQueueId(); + if (DefaultMessageStore.this.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) { + queueId = 0; + } + DefaultMessageStore.this.messageArrivingListener.arriving( + queueName, queueId, queueOffset + 1, dispatchRequest.getTagsCode(), + dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); + } + } + + @Override + public void run() { + DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + try { + TimeUnit.MILLISECONDS.sleep(1); + this.doReput(); + } catch (Exception e) { + DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e); + } + } + + DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end"); + } + + @Override + public String getServiceName() { + if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) { + return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + ReputMessageService.class.getSimpleName(); + } + return ReputMessageService.class.getSimpleName(); + } + + } + + class MainBatchDispatchRequestService extends ServiceThread { + + private final ExecutorService batchDispatchRequestExecutor; + + public MainBatchDispatchRequestService() { + batchDispatchRequestExecutor = new ThreadPoolExecutor( + DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(), + DefaultMessageStore.this.getBrokerConfig().getBatchDispatchRequestThreadPoolNums(), + 1000 * 60, + TimeUnit.MICROSECONDS, + new LinkedBlockingQueue<>(1024), + new ThreadFactoryImpl("BatchDispatchRequestServiceThread_")); + } + + private void pollBatchDispatchRequest() { + if (!batchDispatchRequestQueue.isEmpty()) { + BatchDispatchRequest task = batchDispatchRequestQueue.poll(); + batchDispatchRequestExecutor.execute(() -> { + ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate(); + tmpByteBuffer.position(task.position); + tmpByteBuffer.limit(task.position + task.size); + List<DispatchRequest> dispatchRequestList = new ArrayList<>(); + while (tmpByteBuffer.hasRemaining()) { + DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false); + if (dispatchRequest.isSuccess()) { + dispatchRequestList.add(dispatchRequest); + } else { + LOGGER.error("[BUG]read total count not equals msg total size."); + } + } + dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()])); + mappedPageHoldCount.getAndDecrement(); + }); + } + } + + @Override + public void run() { + DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + try { + TimeUnit.MILLISECONDS.sleep(1); + pollBatchDispatchRequest(); + } catch (Exception e) { + DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e); + } + } + + DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end"); + } + + @Override + public String getServiceName() { + if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) { + return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + MainBatchDispatchRequestService.class.getSimpleName(); + } + return MainBatchDispatchRequestService.class.getSimpleName(); + } + + } + + class DispatchService extends ServiceThread { + + private final List<DispatchRequest[]> dispatchRequestsList = new ArrayList<>(); + + private void dispatch() { + dispatchRequestsList.clear(); + dispatchRequestOrderlyQueue.get(dispatchRequestsList); + if (!dispatchRequestsList.isEmpty()) { + for (DispatchRequest[] dispatchRequests : dispatchRequestsList) { + for (DispatchRequest dispatchRequest : dispatchRequests) { + DefaultMessageStore.this.doDispatch(dispatchRequest); + // wake up long-polling + if (DefaultMessageStore.this.brokerConfig.isLongPollingEnable() + && DefaultMessageStore.this.messageArrivingListener != null) { + DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), + dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, + dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), + dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); + DefaultMessageStore.this.reputMessageService.notifyMessageArrive4MultiQueue(dispatchRequest); + } + if (!DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && + DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { + DefaultMessageStore.this.storeStatsService + .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).add(1); + DefaultMessageStore.this.storeStatsService + .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) + .add(dispatchRequest.getMsgSize()); + } + } + } + } + } + + @Override + public void run() { + DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started"); + + while (!this.isStopped()) { + try { + TimeUnit.MILLISECONDS.sleep(1); + dispatch(); + } catch (Exception e) { + DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e); + } + } + + DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end"); + } + + @Override + public String getServiceName() { + if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) { + return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + DispatchService.class.getSimpleName(); + } + return DispatchService.class.getSimpleName(); + } + } + + class ConcurrentReputMessageService extends ReputMessageService { + + private static final int BATCH_SIZE = 1024 * 1024 * 4; + + private int batchId = 0; + + public void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) { if (position < 0) { return; } @@ -2874,7 +2986,8 @@ public class DefaultMessageStore implements MessageStore { batchDispatchRequestQueue.offer(task); } - private void doReputConcurrently() { + @Override + public void doReput() { if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.", this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset()); @@ -2896,6 +3009,8 @@ public class DefaultMessageStore implements MessageStore { for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) { ByteBuffer byteBuffer = result.getByteBuffer(); + byteBuffer.mark(); + int totalSize = byteBuffer.getInt(); if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) { doNext = false; @@ -2914,9 +3029,11 @@ public class DefaultMessageStore implements MessageStore { doNext = false; } + byteBuffer.reset(); + if (totalSize > 0) { if (batchDispatchRequestStart == -1) { - batchDispatchRequestStart = byteBuffer.position() - 8; + batchDispatchRequestStart = byteBuffer.position(); batchDispatchRequestSize = 0; } batchDispatchRequestSize += totalSize; @@ -2925,7 +3042,7 @@ public class DefaultMessageStore implements MessageStore { batchDispatchRequestStart = -1; batchDispatchRequestSize = -1; } - byteBuffer.position(byteBuffer.position() + totalSize - 8); + byteBuffer.position(byteBuffer.position() + totalSize); this.reputFromOffset += totalSize; readSize += totalSize; } else { @@ -2938,80 +3055,52 @@ public class DefaultMessageStore implements MessageStore { batchDispatchRequestSize = -1; } } - } catch (Throwable e) { - throw e; } finally { this.createBatchDispatchRequest(result.getByteBuffer(), batchDispatchRequestStart, batchDispatchRequestSize); - boolean over = this.mappedPageHoldCount.get() == 0; + boolean over = mappedPageHoldCount.get() == 0; while (!over) { try { - Thread.sleep(1); + TimeUnit.MILLISECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); } - over = this.mappedPageHoldCount.get() == 0; + over = mappedPageHoldCount.get() == 0; } result.release(); } } } - private void notifyMessageArrive4MultiQueue(DispatchRequest dispatchRequest) { - Map<String, String> prop = dispatchRequest.getPropertiesMap(); - if (prop == null) { - return; - } - String multiDispatchQueue = prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH); - String multiQueueOffset = prop.get(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET); - if (StringUtils.isBlank(multiDispatchQueue) || StringUtils.isBlank(multiQueueOffset)) { - return; - } - String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); - String[] queueOffsets = multiQueueOffset.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER); - if (queues.length != queueOffsets.length) { - return; - } - for (int i = 0; i < queues.length; i++) { - String queueName = queues[i]; - long queueOffset = Long.parseLong(queueOffsets[i]); - int queueId = dispatchRequest.getQueueId(); - if (DefaultMessageStore.this.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) { - queueId = 0; + @Override + public void shutdown() { + for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) { + try { + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException ignored) { } - DefaultMessageStore.this.messageArrivingListener.arriving( - queueName, queueId, queueOffset + 1, dispatchRequest.getTagsCode(), - dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } + + if (this.isCommitLogAvailable()) { + LOGGER.warn("shutdown concurrentReputMessageService, but CommitLog have not finish to be dispatched, CommitLog max" + + " offset={}, reputFromOffset={}", DefaultMessageStore.this.commitLog.getMaxOffset(), + this.reputFromOffset); + } + + this.shutdown(); } @Override public void run() { - DefaultMessageStore.LOGGER.info(this.getServiceName() + " service started"); - - while (!this.isStopped()) { - try { - Thread.sleep(1); - if (!messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) { - this.doReput(); - } else { - doReputConcurrently(); - } - } catch (Exception e) { - DefaultMessageStore.LOGGER.warn(this.getServiceName() + " service has exception. ", e); - } - } - - DefaultMessageStore.LOGGER.info(this.getServiceName() + " service end"); + super.run(); } @Override public String getServiceName() { if (DefaultMessageStore.this.getBrokerConfig().isInBrokerContainer()) { - return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + ReputMessageService.class.getSimpleName(); + return DefaultMessageStore.this.getBrokerIdentity().getIdentifier() + ConcurrentReputMessageService.class.getSimpleName(); } - return ReputMessageService.class.getSimpleName(); + return ConcurrentReputMessageService.class.getSimpleName(); } - } @Override
