This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 04b8400eb997ce7f144aff661a8febdac0ebd4bf
Author: nowinkey <[email protected]>
AuthorDate: Wed Feb 8 21:07:49 2023 +0800

    Refactor MainBatchDispatchRequestService and DispatchService service object 
locations
---
 .../apache/rocketmq/store/DefaultMessageStore.java | 37 ++++++++++++----------
 1 file changed, 20 insertions(+), 17 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index baa71e1ba..48146c638 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -135,10 +135,6 @@ public class DefaultMessageStore implements MessageStore {
 
     private ReputMessageService reputMessageService;
 
-    private MainBatchDispatchRequestService mainBatchDispatchRequestService;
-
-    private DispatchService dispatchService;
-
     private HAService haService;
 
     // CompactionLog
@@ -242,8 +238,6 @@ public class DefaultMessageStore implements MessageStore {
             this.reputMessageService = new ReputMessageService();
         } else {
             this.reputMessageService = new ConcurrentReputMessageService();
-            this.mainBatchDispatchRequestService = new 
MainBatchDispatchRequestService();
-            this.dispatchService = new DispatchService();
         }
 
         this.transientStorePool = new TransientStorePool(this);
@@ -380,11 +374,6 @@ public class DefaultMessageStore implements MessageStore {
         
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
         this.reputMessageService.start();
 
-        if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
-            this.mainBatchDispatchRequestService.start();
-            this.dispatchService.start();
-        }
-
         // Checking is not necessary, as long as the dLedger's implementation 
exactly follows the definition of Recover,
         // which is eliminating the dispatch inconsistency between the 
commitLog and consumeQueue at the end of recovery.
         this.doRecheckReputOffsetFromCq();
@@ -482,12 +471,7 @@ public class DefaultMessageStore implements MessageStore {
             }
             this.commitLog.shutdown();
             this.reputMessageService.shutdown();
-            if (mainBatchDispatchRequestService != null) {
-                mainBatchDispatchRequestService.shutdown();
-            }
-            if (dispatchService != null) {
-                dispatchService.shutdown();
-            }
+
             this.flushConsumeQueueService.shutdown();
             this.allocateMappedFileService.shutdown();
             this.storeCheckpoint.flush();
@@ -2983,6 +2967,16 @@ public class DefaultMessageStore implements MessageStore 
{
 
         private long batchId = 0;
 
+        private MainBatchDispatchRequestService 
mainBatchDispatchRequestService;
+
+        private DispatchService dispatchService;
+
+        public ConcurrentReputMessageService(){
+            super();
+            this.mainBatchDispatchRequestService = new 
MainBatchDispatchRequestService();
+            this.dispatchService = new DispatchService();
+        }
+
         public void createBatchDispatchRequest(ByteBuffer byteBuffer, int 
position, int size) {
             if (position < 0) {
                 return;
@@ -2992,6 +2986,13 @@ public class DefaultMessageStore implements MessageStore 
{
             batchDispatchRequestQueue.offer(task);
         }
 
+        @Override
+        public void start() {
+            super.start();
+            this.mainBatchDispatchRequestService.start();
+            this.dispatchService.start();
+        }
+
         @Override
         public void doReput() {
             if (this.reputFromOffset < 
DefaultMessageStore.this.commitLog.getMinOffset()) {
@@ -3092,6 +3093,8 @@ public class DefaultMessageStore implements MessageStore {
                         this.reputFromOffset);
             }
 
+            this.mainBatchDispatchRequestService.shutdown();
+            this.dispatchService.shutdown();
             super.shutdown();
         }
 

Reply via email to