This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 3df1b9232 [ISSUE #7201] Remove the DefaultMessageStore.class dependency in TransientStorePool 3df1b9232 is described below commit 3df1b9232af99944cb3d4d4d2d00c5a85cd3b57d Author: guyinyou <36399867+guyin...@users.noreply.github.com> AuthorDate: Thu Aug 17 13:59:04 2023 +0800 [ISSUE #7201] Remove the DefaultMessageStore.class dependency in TransientStorePool Co-authored-by: guyinyou <guyinyou....@alibaba-inc.com> --- .../apache/rocketmq/store/AllocateMappedFileService.java | 6 +++--- .../java/org/apache/rocketmq/store/DefaultMessageStore.java | 7 +++++-- .../java/org/apache/rocketmq/store/TransientStorePool.java | 13 ++++--------- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index dca7d5325..c8420fea1 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -55,7 +55,7 @@ public class AllocateMappedFileService extends ServiceThread { if (this.messageStore.isTransientStorePoolEnable()) { if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool() && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool - canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size(); + canSubmitRequests = this.messageStore.remainTransientStoreBufferNumbs() - this.requestQueue.size(); } } @@ -65,7 +65,7 @@ public class AllocateMappedFileService extends ServiceThread { if (nextPutOK) { if (canSubmitRequests <= 0) { log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " + - "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums()); + "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs()); this.requestTable.remove(nextFilePath); return null; } @@ -81,7 +81,7 @@ public class AllocateMappedFileService extends ServiceThread { if (nextNextPutOK) { if (canSubmitRequests <= 0) { log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " + - "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums()); + "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs()); this.requestTable.remove(nextNextFilePath); } else { boolean offerOK = this.requestQueue.offer(nextNextReq); diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 6115ead59..f2a54ddf6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -250,7 +250,7 @@ public class DefaultMessageStore implements MessageStore { this.reputMessageService = new ConcurrentReputMessageService(); } - this.transientStorePool = new TransientStorePool(this); + this.transientStorePool = new TransientStorePool(messageStoreConfig.getTransientStorePoolSize(), messageStoreConfig.getMappedFileSizeCommitLog()); this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity())); @@ -1983,7 +1983,10 @@ public class DefaultMessageStore implements MessageStore { } public int remainTransientStoreBufferNumbs() { - return this.transientStorePool.availableBufferNums(); + if (this.isTransientStorePoolEnable()) { + return this.transientStorePool.availableBufferNums(); + } + return Integer.MAX_VALUE; } @Override diff --git a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java index 8c1a5338b..0d42ee69e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java +++ b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java @@ -33,13 +33,11 @@ public class TransientStorePool { private final int poolSize; private final int fileSize; private final Deque<ByteBuffer> availableBuffers; - private final DefaultMessageStore messageStore; private volatile boolean isRealCommit = true; - public TransientStorePool(final DefaultMessageStore messageStore) { - this.messageStore = messageStore; - this.poolSize = messageStore.getMessageStoreConfig().getTransientStorePoolSize(); - this.fileSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); + public TransientStorePool(final int poolSize, final int fileSize) { + this.poolSize = poolSize; + this.fileSize = fileSize; this.availableBuffers = new ConcurrentLinkedDeque<>(); } @@ -81,10 +79,7 @@ public class TransientStorePool { } public int availableBufferNums() { - if (messageStore.isTransientStorePoolEnable()) { - return availableBuffers.size(); - } - return Integer.MAX_VALUE; + return availableBuffers.size(); } public boolean isRealCommit() {