This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b9bbd5206 [ISSUE #6230] Optimizes ScheduleMessageService code and 
logic (#6231)
b9bbd5206 is described below

commit b9bbd520603814de6bcfb76a811fc6dac4afc21b
Author: Lobo Xu <[email protected]>
AuthorDate: Mon Mar 6 10:22:29 2023 +0800

    [ISSUE #6230] Optimizes ScheduleMessageService code and logic (#6231)
    
    Co-authored-by: loboxu <[email protected]>
---
 .../broker/schedule/ScheduleMessageService.java    | 72 +++++++---------------
 1 file changed, 22 insertions(+), 50 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index bdc9c5672..e91c32b55 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -17,10 +17,7 @@
 package org.apache.rocketmq.broker.schedule;
 
 import io.opentelemetry.api.common.Attributes;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
@@ -32,8 +29,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
 import org.apache.rocketmq.common.ConfigManager;
@@ -96,14 +93,11 @@ public class ScheduleMessageService extends ConfigManager {
         this.enableAsyncDeliver = 
brokerController.getMessageStoreConfig().isEnableScheduleAsyncDeliver();
         scheduledPersistService = new ScheduledThreadPoolExecutor(1,
             new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, 
brokerController.getBrokerConfig()));
-        scheduledPersistService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    ScheduleMessageService.this.persist();
-                } catch (Throwable e) {
-                    log.error("scheduleAtFixedRate flush exception", e);
-                }
+        scheduledPersistService.scheduleAtFixedRate(() -> {
+            try {
+                ScheduleMessageService.this.persist();
+            } catch (Throwable e) {
+                log.error("scheduleAtFixedRate flush exception", e);
             }
         }, 10000, 
this.brokerController.getMessageStoreConfig().getFlushDelayOffsetInterval(), 
TimeUnit.MILLISECONDS);
     }
@@ -117,9 +111,7 @@ public class ScheduleMessageService extends ConfigManager {
     }
 
     public void buildRunningStats(HashMap<String, String> stats) {
-        Iterator<Map.Entry<Integer, Long>> it = 
this.offsetTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Map.Entry<Integer, Long> next = it.next();
+        for (Map.Entry<Integer, Long> next : this.offsetTable.entrySet()) {
             int queueId = delayLevel2QueueId(next.getKey());
             long delayOffset = next.getValue();
             long maxOffset = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
 queueId);
@@ -169,17 +161,13 @@ public class ScheduleMessageService extends ConfigManager 
{
                 }
             }
 
-            this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        if (started.get()) {
-                            ScheduleMessageService.this.persist();
-                        }
-                    } catch (Throwable e) {
-                        log.error("scheduleAtFixedRate flush exception", e);
+            this.deliverExecutorService.scheduleAtFixedRate(() -> {
+                try {
+                    if (started.get()) {
+                        ScheduleMessageService.this.persist();
                     }
+                } catch (Throwable e) {
+                    log.error("scheduleAtFixedRate flush exception", e);
                 }
             }, 10000, 
this.brokerController.getMessageStore().getMessageStoreConfig().getFlushDelayOffsetInterval(),
 TimeUnit.MILLISECONDS);
         }
@@ -208,10 +196,8 @@ public class ScheduleMessageService extends ConfigManager {
                 }
             }
 
-            if (this.deliverPendingTable != null) {
-                for (int i = 1; i <= this.deliverPendingTable.size(); i++) {
-                    log.warn("deliverPendingTable level: {}, size: {}", i, 
this.deliverPendingTable.get(i).size());
-                }
+            for (int i = 1; i <= this.deliverPendingTable.size(); i++) {
+                log.warn("deliverPendingTable level: {}, size: {}", i, 
this.deliverPendingTable.get(i).size());
             }
 
             this.persist();
@@ -378,17 +364,6 @@ public class ScheduleMessageService extends ConfigManager {
         return msgInner;
     }
 
-    public int computeDelayLevel(long timeMillis) {
-        long intervalMillis = timeMillis - System.currentTimeMillis();
-        List<Map.Entry<Integer, Long>> sortedLevels = 
delayLevelTable.entrySet().stream().sorted(Comparator.comparingLong(Map.Entry::getValue)).collect(Collectors.toList());
-        for (Map.Entry<Integer, Long> entry : sortedLevels) {
-            if (entry.getValue() > intervalMillis) {
-                return entry.getKey();
-            }
-        }
-        return sortedLevels.get(sortedLevels.size() - 1).getKey();
-    }
-
     class DeliverDelayedMessageTimerTask implements Runnable {
         private final int delayLevel;
         private final long offset;
@@ -402,7 +377,7 @@ public class ScheduleMessageService extends ConfigManager {
         public void run() {
             try {
                 if (isStarted()) {
-                    this.executeOnTimeup();
+                    this.executeOnTimeUp();
                 }
             } catch (Exception e) {
                 // XXX: warn and notify me
@@ -411,9 +386,6 @@ public class ScheduleMessageService extends ConfigManager {
             }
         }
 
-        /**
-         * @return
-         */
         private long correctDeliverTimestamp(final long now, final long 
deliverTimestamp) {
 
             long result = deliverTimestamp;
@@ -426,7 +398,7 @@ public class ScheduleMessageService extends ConfigManager {
             return result;
         }
 
-        public void executeOnTimeup() {
+        public void executeOnTimeUp() {
             ConsumeQueueInterface cq =
                 
ScheduleMessageService.this.brokerController.getMessageStore().getConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                     delayLevel2QueueId(delayLevel));
@@ -633,7 +605,7 @@ public class ScheduleMessageService extends ConfigManager {
         private boolean autoResend = false;
         private CompletableFuture<PutMessageResult> future;
 
-        private volatile int resendCount = 0;
+        private volatile AtomicInteger resendCount = new AtomicInteger(0);
         private volatile ProcessStatus status = ProcessStatus.RUNNING;
 
         public PutResultProcess setTopic(String topic) {
@@ -712,7 +684,7 @@ public class ScheduleMessageService extends ConfigManager {
             return future;
         }
 
-        public int getResendCount() {
+        public AtomicInteger getResendCount() {
             return resendCount;
         }
 
@@ -795,7 +767,7 @@ public class ScheduleMessageService extends ConfigManager {
 
             // Gradually increase the resend interval.
             try {
-                Thread.sleep(Math.min(this.resendCount++ * 100, 60 * 1000));
+                Thread.sleep(Math.min(this.resendCount.incrementAndGet() * 
100, 60 * 1000));
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
@@ -823,13 +795,13 @@ public class ScheduleMessageService extends ConfigManager 
{
         public boolean need2Blocked() {
             int maxResendNum2Blocked = 
ScheduleMessageService.this.brokerController.getMessageStore().getMessageStoreConfig()
                 .getScheduleAsyncDeliverMaxResendNum2Blocked();
-            return this.resendCount > maxResendNum2Blocked;
+            return this.resendCount.get() > maxResendNum2Blocked;
         }
 
         public boolean need2Skip() {
             int maxResendNum2Blocked = 
ScheduleMessageService.this.brokerController.getMessageStore().getMessageStoreConfig()
                 .getScheduleAsyncDeliverMaxResendNum2Blocked();
-            return this.resendCount > maxResendNum2Blocked * 2;
+            return this.resendCount.get() > maxResendNum2Blocked * 2;
         }
 
         @Override

Reply via email to