This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0f1ff25512 [ISSUE #6881] Fix scheduled messages are replayed bug 
(#6882)
0f1ff25512 is described below

commit 0f1ff255128f723402298560b757d92784921316
Author: gaoyf <[email protected]>
AuthorDate: Sun Jun 11 10:31:30 2023 +0800

    [ISSUE #6881] Fix scheduled messages are replayed bug (#6882)
    
    * fix scheduled messages are replayed bug
    
    * scheduledPersistService reset to final and constructed in the constructor
---
 .../broker/schedule/ScheduleMessageService.java         | 17 ++++-------------
 1 file changed, 4 insertions(+), 13 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index 196b78f83c..2a4ace0985 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -92,14 +92,7 @@ public class ScheduleMessageService extends ConfigManager {
         this.brokerController = brokerController;
         this.enableAsyncDeliver = 
brokerController.getMessageStoreConfig().isEnableScheduleAsyncDeliver();
         scheduledPersistService = new ScheduledThreadPoolExecutor(1,
-            new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, 
brokerController.getBrokerConfig()));
-        scheduledPersistService.scheduleAtFixedRate(() -> {
-            try {
-                ScheduleMessageService.this.persist();
-            } catch (Throwable e) {
-                log.error("scheduleAtFixedRate flush exception", e);
-            }
-        }, 10000, 
this.brokerController.getMessageStoreConfig().getFlushDelayOffsetInterval(), 
TimeUnit.MILLISECONDS);
+                new ThreadFactoryImpl("ScheduleMessageServicePersistThread", 
true, brokerController.getBrokerConfig()));
     }
 
     public static int queueId2DelayLevel(final int queueId) {
@@ -161,15 +154,13 @@ public class ScheduleMessageService extends ConfigManager 
{
                 }
             }
 
-            this.deliverExecutorService.scheduleAtFixedRate(() -> {
+            scheduledPersistService.scheduleAtFixedRate(() -> {
                 try {
-                    if (started.get()) {
-                        ScheduleMessageService.this.persist();
-                    }
+                    ScheduleMessageService.this.persist();
                 } catch (Throwable e) {
                     log.error("scheduleAtFixedRate flush exception", e);
                 }
-            }, 10000, 
this.brokerController.getMessageStore().getMessageStoreConfig().getFlushDelayOffsetInterval(),
 TimeUnit.MILLISECONDS);
+            }, 10000, 
this.brokerController.getMessageStoreConfig().getFlushDelayOffsetInterval(), 
TimeUnit.MILLISECONDS);
         }
     }
 

Reply via email to