This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3df1b9232  [ISSUE #7201] Remove the DefaultMessageStore.class 
dependency in TransientStorePool
3df1b9232 is described below

commit 3df1b9232af99944cb3d4d4d2d00c5a85cd3b57d
Author: guyinyou <36399867+guyin...@users.noreply.github.com>
AuthorDate: Thu Aug 17 13:59:04 2023 +0800

     [ISSUE #7201] Remove the DefaultMessageStore.class dependency in 
TransientStorePool
    
    Co-authored-by: guyinyou <guyinyou....@alibaba-inc.com>
---
 .../apache/rocketmq/store/AllocateMappedFileService.java    |  6 +++---
 .../java/org/apache/rocketmq/store/DefaultMessageStore.java |  7 +++++--
 .../java/org/apache/rocketmq/store/TransientStorePool.java  | 13 ++++---------
 3 files changed, 12 insertions(+), 14 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java 
b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
index dca7d5325..c8420fea1 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
@@ -55,7 +55,7 @@ public class AllocateMappedFileService extends ServiceThread {
         if (this.messageStore.isTransientStorePoolEnable()) {
             if 
(this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
                 && BrokerRole.SLAVE != 
this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is 
slave, don't fast fail even no buffer in pool
-                canSubmitRequests = 
this.messageStore.getTransientStorePool().availableBufferNums() - 
this.requestQueue.size();
+                canSubmitRequests = 
this.messageStore.remainTransientStoreBufferNumbs() - this.requestQueue.size();
             }
         }
 
@@ -65,7 +65,7 @@ public class AllocateMappedFileService extends ServiceThread {
         if (nextPutOK) {
             if (canSubmitRequests <= 0) {
                 log.warn("[NOTIFYME]TransientStorePool is not enough, so 
create mapped file error, " +
-                    "RequestQueueSize : {}, StorePoolSize: {}", 
this.requestQueue.size(), 
this.messageStore.getTransientStorePool().availableBufferNums());
+                    "RequestQueueSize : {}, StorePoolSize: {}", 
this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs());
                 this.requestTable.remove(nextFilePath);
                 return null;
             }
@@ -81,7 +81,7 @@ public class AllocateMappedFileService extends ServiceThread {
         if (nextNextPutOK) {
             if (canSubmitRequests <= 0) {
                 log.warn("[NOTIFYME]TransientStorePool is not enough, so skip 
preallocate mapped file, " +
-                    "RequestQueueSize : {}, StorePoolSize: {}", 
this.requestQueue.size(), 
this.messageStore.getTransientStorePool().availableBufferNums());
+                    "RequestQueueSize : {}, StorePoolSize: {}", 
this.requestQueue.size(), this.messageStore.remainTransientStoreBufferNumbs());
                 this.requestTable.remove(nextNextFilePath);
             } else {
                 boolean offerOK = this.requestQueue.offer(nextNextReq);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 6115ead59..f2a54ddf6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -250,7 +250,7 @@ public class DefaultMessageStore implements MessageStore {
             this.reputMessageService = new ConcurrentReputMessageService();
         }
 
-        this.transientStorePool = new TransientStorePool(this);
+        this.transientStorePool = new 
TransientStorePool(messageStoreConfig.getTransientStorePoolSize(), 
messageStoreConfig.getMappedFileSizeCommitLog());
 
         this.scheduledExecutorService =
             Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("StoreScheduledThread", getBrokerIdentity()));
@@ -1983,7 +1983,10 @@ public class DefaultMessageStore implements MessageStore 
{
     }
 
     public int remainTransientStoreBufferNumbs() {
-        return this.transientStorePool.availableBufferNums();
+        if (this.isTransientStorePoolEnable()) {
+            return this.transientStorePool.availableBufferNums();
+        }
+        return Integer.MAX_VALUE;
     }
 
     @Override
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java 
b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
index 8c1a5338b..0d42ee69e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
+++ b/store/src/main/java/org/apache/rocketmq/store/TransientStorePool.java
@@ -33,13 +33,11 @@ public class TransientStorePool {
     private final int poolSize;
     private final int fileSize;
     private final Deque<ByteBuffer> availableBuffers;
-    private final DefaultMessageStore messageStore;
     private volatile boolean isRealCommit = true;
 
-    public TransientStorePool(final DefaultMessageStore messageStore) {
-        this.messageStore = messageStore;
-        this.poolSize = 
messageStore.getMessageStoreConfig().getTransientStorePoolSize();
-        this.fileSize = 
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
+    public TransientStorePool(final int poolSize, final int fileSize) {
+        this.poolSize = poolSize;
+        this.fileSize = fileSize;
         this.availableBuffers = new ConcurrentLinkedDeque<>();
     }
 
@@ -81,10 +79,7 @@ public class TransientStorePool {
     }
 
     public int availableBufferNums() {
-        if (messageStore.isTransientStorePoolEnable()) {
-            return availableBuffers.size();
-        }
-        return Integer.MAX_VALUE;
+        return availableBuffers.size();
     }
 
     public boolean isRealCommit() {

Reply via email to