This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9dec4cf5fe [ISSUE #7669] map variable delayLevelTable changed to 
ConcurrentSkipListMap from ConcurrentHashMap (#7675)
9dec4cf5fe is described below

commit 9dec4cf5fea916cd64fc47f6a3f036e5017b6622
Author: YASH PATEL <121890726+yp969...@users.noreply.github.com>
AuthorDate: Tue Dec 19 17:17:13 2023 +0530

    [ISSUE #7669] map variable delayLevelTable changed to ConcurrentSkipListMap 
from ConcurrentHashMap (#7675)
---
 .../apache/rocketmq/broker/schedule/ScheduleMessageService.java   | 5 +++--
 .../main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java   | 8 ++++----
 .../main/java/org/apache/rocketmq/store/DefaultMessageStore.java  | 5 +++--
 3 files changed, 10 insertions(+), 8 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index 0c2e6507bd..ef7e4f6789 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -23,6 +23,7 @@ import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
@@ -70,8 +71,8 @@ public class ScheduleMessageService extends ConfigManager {
     private static final long WAIT_FOR_SHUTDOWN = 5000L;
     private static final long DELAY_FOR_A_SLEEP = 10L;
 
-    private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis 
*/> delayLevelTable =
-        new ConcurrentHashMap<>(32);
+    private final ConcurrentSkipListMap<Integer /* level */, Long/* delay 
timeMillis */> delayLevelTable =
+        new ConcurrentSkipListMap<>();
 
     private final ConcurrentMap<Integer /* level */, Long/* offset */> 
offsetTable =
         new ConcurrentHashMap<>(32);
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index c0d00d8640..e907a1ccc3 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -24,7 +24,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
@@ -199,7 +199,7 @@ public class ProxyConfig implements ConfigFile {
 
     private boolean useDelayLevel = false;
     private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 
9m 10m 20m 30m 1h 2h";
-    private transient Map<Integer /* level */, Long/* delay timeMillis */> 
delayLevelTable = new ConcurrentHashMap<>();
+    private transient ConcurrentSkipListMap<Integer /* level */, Long/* delay 
timeMillis */> delayLevelTable = new ConcurrentSkipListMap<>();
 
     private String metricCollectorMode = 
MetricCollectorMode.OFF.getModeString();
     // Example address: 127.0.0.1:1234
@@ -291,7 +291,7 @@ public class ProxyConfig implements ConfigFile {
     }
 
     public void parseDelayLevel() {
-        this.delayLevelTable = new ConcurrentHashMap<>();
+        this.delayLevelTable = new ConcurrentSkipListMap<>();
         Map<String, Long> timeUnitTable = new HashMap<>();
         timeUnitTable.put("s", 1000L);
         timeUnitTable.put("m", 1000L * 60);
@@ -1124,7 +1124,7 @@ public class ProxyConfig implements ConfigFile {
         this.messageDelayLevel = messageDelayLevel;
     }
 
-    public Map<Integer, Long> getDelayLevelTable() {
+    public ConcurrentSkipListMap<Integer, Long> getDelayLevelTable() {
         return delayLevelTable;
     }
 
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index dc5f312e5a..aa72b1617d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -46,6 +46,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -190,8 +191,8 @@ public class DefaultMessageStore implements MessageStore {
 
     private SendMessageBackHook sendMessageBackHook;
 
-    private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis 
*/> delayLevelTable =
-        new ConcurrentHashMap<>(32);
+    private final ConcurrentSkipListMap<Integer /* level */, Long/* delay 
timeMillis */> delayLevelTable =
+        new ConcurrentSkipListMap<>();
 
     private int maxDelayLevel;
 

Reply via email to