This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 9dec4cf5fe [ISSUE #7669] map variable delayLevelTable changed to ConcurrentSkipListMap from ConcurrentHashMap (#7675) 9dec4cf5fe is described below commit 9dec4cf5fea916cd64fc47f6a3f036e5017b6622 Author: YASH PATEL <121890726+yp969...@users.noreply.github.com> AuthorDate: Tue Dec 19 17:17:13 2023 +0530 [ISSUE #7669] map variable delayLevelTable changed to ConcurrentSkipListMap from ConcurrentHashMap (#7675) --- .../apache/rocketmq/broker/schedule/ScheduleMessageService.java | 5 +++-- .../main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java | 8 ++++---- .../main/java/org/apache/rocketmq/store/DefaultMessageStore.java | 5 +++-- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java index 0c2e6507bd..ef7e4f6789 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java @@ -23,6 +23,7 @@ import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; @@ -70,8 +71,8 @@ public class ScheduleMessageService extends ConfigManager { private static final long WAIT_FOR_SHUTDOWN = 5000L; private static final long DELAY_FOR_A_SLEEP = 10L; - private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = - new ConcurrentHashMap<>(32); + private final ConcurrentSkipListMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = + new ConcurrentSkipListMap<>(); private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = new ConcurrentHashMap<>(32); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index c0d00d8640..e907a1ccc3 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -24,7 +24,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -199,7 +199,7 @@ public class ProxyConfig implements ConfigFile { private boolean useDelayLevel = false; private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; - private transient Map<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<>(); + private transient ConcurrentSkipListMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentSkipListMap<>(); private String metricCollectorMode = MetricCollectorMode.OFF.getModeString(); // Example address: 127.0.0.1:1234 @@ -291,7 +291,7 @@ public class ProxyConfig implements ConfigFile { } public void parseDelayLevel() { - this.delayLevelTable = new ConcurrentHashMap<>(); + this.delayLevelTable = new ConcurrentSkipListMap<>(); Map<String, Long> timeUnitTable = new HashMap<>(); timeUnitTable.put("s", 1000L); timeUnitTable.put("m", 1000L * 60); @@ -1124,7 +1124,7 @@ public class ProxyConfig implements ConfigFile { this.messageDelayLevel = messageDelayLevel; } - public Map<Integer, Long> getDelayLevelTable() { + public ConcurrentSkipListMap<Integer, Long> getDelayLevelTable() { return delayLevelTable; } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index dc5f312e5a..aa72b1617d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -46,6 +46,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -190,8 +191,8 @@ public class DefaultMessageStore implements MessageStore { private SendMessageBackHook sendMessageBackHook; - private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = - new ConcurrentHashMap<>(32); + private final ConcurrentSkipListMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = + new ConcurrentSkipListMap<>(); private int maxDelayLevel;