This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 449536bed0 Revert "[ISSUE# 9333] Use fastjson2 in broker module 
(#9334)" (#9387)
449536bed0 is described below

commit 449536bed023ef57713ecb097433bab544a3ae67
Author: lizhimins <707364...@qq.com>
AuthorDate: Tue May 6 17:55:53 2025 +0800

    Revert "[ISSUE# 9333] Use fastjson2 in broker module (#9334)" (#9387)
    
    This reverts commit b1daa3c0f0e8dd85f181331e4ec4ff276fb361c0.
---
 broker/BUILD.bazel                                 |   2 +
 .../rocketmq/broker/RocksDBConfigManager.java      |   7 +-
 .../broker/offset/ConsumerOrderInfoManager.java    |  19 +--
 .../rocketmq/broker/pop/PopConsumerRecord.java     |   8 +-
 .../rocketmq/broker/pop/PopConsumerService.java    |  35 ++--
 .../broker/processor/AckMessageProcessor.java      |   7 +-
 .../broker/processor/AdminBrokerProcessor.java     |  49 +++---
 .../processor/ChangeInvisibleTimeProcessor.java    |   9 +-
 .../broker/processor/PopBufferMergeService.java    |  19 +--
 .../broker/processor/PopMessageProcessor.java      |  31 ++--
 .../broker/processor/PopReviveService.java         |  25 ++-
 .../broker/topic/TopicQueueMappingManager.java     |  21 +--
 .../broker/transaction/TransactionMetrics.java     |  44 ++---
 .../rocketmq/broker/RocksDBConfigManagerTest.java  |  75 ---------
 .../broker/processor/AdminBrokerProcessorTest.java |  79 +--------
 .../ChangeInvisibleTimeProcessorTest.java          |  57 +------
 .../processor/PopBufferMergeServiceTest.java       | 182 +++++----------------
 .../broker/processor/PopMessageProcessorTest.java  |  50 +-----
 .../broker/processor/PopReviveServiceTest.java     |  83 ++--------
 .../broker/topic/TopicQueueMappingManagerTest.java |  68 ++------
 .../transaction/queue/TransactionMetricsTest.java  |  59 +------
 .../apache/rocketmq/store/pop/PopCheckPoint.java   |   5 +-
 22 files changed, 219 insertions(+), 715 deletions(-)

diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel
index 6ee2c8635f..9d61c0a1ff 100644
--- a/broker/BUILD.bazel
+++ b/broker/BUILD.bazel
@@ -31,6 +31,7 @@ java_library(
         "//tieredstore",
         "@maven//:org_slf4j_slf4j_api",
         "@maven//:ch_qos_logback_logback_classic",
+        "@maven//:com_alibaba_fastjson",
         "@maven//:com_alibaba_fastjson2_fastjson2",
         "@maven//:com_github_luben_zstd_jni",
         "@maven//:com_google_guava_guava",
@@ -81,6 +82,7 @@ java_library(
         "//remoting",
         "//store",
         "//tieredstore",
+        "@maven//:com_alibaba_fastjson",
         "@maven//:com_alibaba_fastjson2_fastjson2",
         "@maven//:org_slf4j_slf4j_api",
         "@maven//:com_google_guava_guava",
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
index c59c00c040..ee2d4e54a6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
@@ -16,7 +16,9 @@
  */
 package org.apache.rocketmq.broker;
 
-import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson.JSON;
+import java.nio.charset.StandardCharsets;
+import java.util.function.BiConsumer;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -29,9 +31,6 @@ import org.rocksdb.RocksIterator;
 import org.rocksdb.Statistics;
 import org.rocksdb.WriteBatch;
 
-import java.nio.charset.StandardCharsets;
-import java.util.function.BiConsumer;
-
 public class RocksDBConfigManager {
     protected static final Logger BROKER_LOG = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     public volatile boolean isStop = false;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
index 9f173daf46..120f5b104c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
@@ -16,9 +16,17 @@
  */
 package org.apache.rocketmq.broker.offset;
 
-import com.alibaba.fastjson2.annotation.JSONField;
+import com.alibaba.fastjson.annotation.JSONField;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
@@ -29,15 +37,6 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
 public class ConsumerOrderInfoManager extends ConfigManager {
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
index 661ace9bcb..1ee01fea1c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
@@ -16,9 +16,9 @@
  */
 package org.apache.rocketmq.broker.pop;
 
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.annotation.JSONField;
-
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.annotation.JSONField;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 
@@ -119,7 +119,7 @@ public class PopConsumerRecord {
     }
 
     public static PopConsumerRecord decode(byte[] body) {
-        return JSON.parseObject(body, PopConsumerRecord.class);
+        return JSONObject.parseObject(body, PopConsumerRecord.class);
     }
 
     public long getPopTime() {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index a2198f2560..1138ff4afe 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -16,9 +16,25 @@
  */
 package org.apache.rocketmq.broker.pop;
 
-import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson.JSON;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.broker.BrokerController;
@@ -49,23 +65,6 @@ import org.apache.rocketmq.store.pop.PopCheckPoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Objects;
-import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
 public class PopConsumerService extends ServiceThread {
 
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 06a531552a..23a4f6167c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -16,9 +16,11 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson.JSON;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import java.util.BitSet;
+import java.nio.charset.StandardCharsets;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
@@ -50,9 +52,6 @@ import 
org.apache.rocketmq.store.exception.ConsumeQueueException;
 import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.BatchAckMsg;
 
-import java.nio.charset.StandardCharsets;
-import java.util.BitSet;
-
 public class AckMessageProcessor implements NettyRequestProcessor {
 
     private static final Logger POP_LOGGER = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 4d45730a3c..812ca90e82 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -16,12 +16,33 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
 import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.opentelemetry.api.common.Attributes;
+import java.io.UnsupportedEncodingException;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.auth.authentication.enums.UserType;
@@ -206,28 +227,6 @@ import org.apache.rocketmq.store.timer.TimerCheckpoint;
 import org.apache.rocketmq.store.timer.TimerMessageStore;
 import org.apache.rocketmq.store.util.LibC;
 
-import java.io.UnsupportedEncodingException;
-import java.net.UnknownHostException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
 import static 
org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
@@ -2733,7 +2732,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
             } else {
                 ConsumerFilterData filterData = 
this.brokerController.getConsumerFilterManager()
                     .get(requestHeader.getTopic(), 
requestHeader.getConsumerGroup());
-                body.setFilterData(JSON.toJSONString(filterData));
+                body.setFilterData(JSON.toJSONString(filterData, true));
 
                 messageFilter = new ExpressionMessageFilter(subscriptionData, 
filterData,
                     this.brokerController.getConsumerFilterManager());
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index f288c001b8..de72ee7baf 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -16,9 +16,12 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson.JSON;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.nio.charset.StandardCharsets;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
@@ -47,10 +50,6 @@ import 
org.apache.rocketmq.store.exception.ConsumeQueueException;
 import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
 public class ChangeInvisibleTimeProcessor implements NettyRequestProcessor {
     private static final Logger POP_LOGGER = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
     private final BrokerController brokerController;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 7c309ec5c4..820388b18d 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -16,7 +16,15 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson.JSON;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.common.KeyBuilder;
@@ -36,15 +44,6 @@ import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.BatchAckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicInteger;
-
 public class PopBufferMergeService extends ServiceThread {
     private static final Logger POP_LOGGER = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
     ConcurrentHashMap<String/*mergeKey*/, PopCheckPointWrapper>
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 9f55269f39..d73acc84df 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -16,13 +16,27 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson.JSON;
 import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.FileRegion;
 import io.opentelemetry.api.common.Attributes;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.filter.ConsumerFilterData;
 import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
@@ -77,21 +91,6 @@ import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.BatchAckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index dcffaf50cc..e1ead86169 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -16,8 +16,18 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson.JSON;
 import io.opentelemetry.api.common.Attributes;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.broker.BrokerController;
@@ -27,12 +37,12 @@ import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicFilterType;
-import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -51,17 +61,6 @@ import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.BatchAckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
 import static 
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index dfbe5d347a..6b9cf15938 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -16,8 +16,13 @@
  */
 package org.apache.rocketmq.broker.topic;
 
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONWriter;
+import com.alibaba.fastjson.JSON;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
@@ -35,13 +40,6 @@ import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;
 
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 import static 
org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
 
 public class TopicQueueMappingManager extends ConfigManager {
@@ -151,10 +149,7 @@ public class TopicQueueMappingManager extends 
ConfigManager {
         TopicQueueMappingSerializeWrapper wrapper = new 
TopicQueueMappingSerializeWrapper();
         wrapper.setTopicQueueMappingInfoMap(topicQueueMappingTable);
         wrapper.setDataVersion(this.dataVersion);
-        if (pretty) {
-            return JSON.toJSONString(wrapper, JSONWriter.Feature.PrettyFormat);
-        }
-        return JSON.toJSONString(wrapper);
+        return JSON.toJSONString(wrapper, pretty);
     }
 
     @Override
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
index d8dd811db2..ad30c73c60 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
@@ -16,21 +16,16 @@
  */
 package org.apache.rocketmq.broker.transaction;
 
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONWriter;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.google.common.io.Files;
-import org.apache.rocketmq.common.ConfigManager;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.remoting.protocol.DataVersion;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
@@ -38,6 +33,14 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
 public class TransactionMetrics extends ConfigManager {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
@@ -87,11 +90,11 @@ public class TransactionMetrics extends ConfigManager {
         this.transactionCounts = transactionCounts;
     }
 
-    protected void write0(OutputStream out) {
+    protected void write0(Writer writer) {
         TransactionMetricsSerializeWrapper wrapper = new 
TransactionMetricsSerializeWrapper();
         wrapper.setTransactionCount(transactionCounts);
         wrapper.setDataVersion(dataVersion);
-        JSON.writeTo(out, wrapper, JSONWriter.Feature.BrowserCompatible);
+        JSON.writeJSONString(writer, wrapper, 
SerializerFeature.BrowserCompatible);
     }
 
     @Override
@@ -179,7 +182,7 @@ public class TransactionMetrics extends ConfigManager {
         String config = configFilePath();
         String temp = config + ".tmp";
         String backup = config + ".bak";
-        FileOutputStream outputStream = null;
+        BufferedWriter bufferedWriter = null;
         try {
             File tmpFile = new File(temp);
             File parentDirectory = tmpFile.getParentFile();
@@ -196,10 +199,11 @@ public class TransactionMetrics extends ConfigManager {
                     return;
                 }
             }
-            outputStream = new FileOutputStream(tmpFile, false);
-            write0(outputStream);
-            outputStream.flush();
-            outputStream.close();
+            bufferedWriter = new BufferedWriter(new OutputStreamWriter(new 
FileOutputStream(tmpFile, false),
+                    StandardCharsets.UTF_8));
+            write0(bufferedWriter);
+            bufferedWriter.flush();
+            bufferedWriter.close();
             log.debug("Finished writing tmp file: {}", temp);
 
             File configFile = new File(config);
@@ -212,9 +216,9 @@ public class TransactionMetrics extends ConfigManager {
         } catch (IOException e) {
             log.error("Failed to persist {}", temp, e);
         } finally {
-            if (null != outputStream) {
+            if (null != bufferedWriter) {
                 try {
-                    outputStream.close();
+                    bufferedWriter.close();
                 } catch (IOException ignore) {
                 }
             }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java 
b/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java
deleted file mode 100644
index d9feb6a782..0000000000
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker;
-
-import com.alibaba.fastjson2.JSON;
-import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
-import org.apache.rocketmq.remoting.protocol.DataVersion;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.nio.charset.StandardCharsets;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
-public class RocksDBConfigManagerTest {
-
-    private ConfigRocksDBStorage configRocksDBStorage;
-
-    private RocksDBConfigManager rocksDBConfigManager;
-
-    @Before
-    public void setUp() throws IllegalAccessException {
-        configRocksDBStorage = mock(ConfigRocksDBStorage.class);
-        rocksDBConfigManager = spy(new RocksDBConfigManager("testPath", 1000L, 
null));
-        rocksDBConfigManager.configRocksDBStorage = configRocksDBStorage;
-    }
-
-    @Test
-    public void testLoadDataVersion() throws Exception {
-        DataVersion expected = new DataVersion();
-        expected.nextVersion();
-        String jsonData = JSON.toJSONString(expected);
-        byte[] mockDataVersion = jsonData.getBytes(StandardCharsets.UTF_8);
-
-        
when(rocksDBConfigManager.configRocksDBStorage.getKvDataVersion()).thenReturn(mockDataVersion);
-
-        boolean result = rocksDBConfigManager.loadDataVersion();
-
-        assertTrue(result);
-        assertEquals(expected.getCounter().get(), 
rocksDBConfigManager.getKvDataVersion().getCounter().get());
-        assertEquals(expected.getTimestamp(), 
rocksDBConfigManager.getKvDataVersion().getTimestamp());
-    }
-
-    @Test
-    public void testUpdateKvDataVersion() throws Exception {
-        rocksDBConfigManager.updateKvDataVersion();
-
-        DataVersion expectedDataVersion = 
rocksDBConfigManager.getKvDataVersion();
-        verify(rocksDBConfigManager.configRocksDBStorage, 
times(1)).updateKvDataVersion(
-                
eq(JSON.toJSONString(expectedDataVersion).getBytes(StandardCharsets.UTF_8))
-        );
-    }
-}
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index 8418781b6b..a6bcca954d 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -16,8 +16,7 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
+import com.alibaba.fastjson.JSON;
 import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
@@ -35,10 +34,10 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.broker.client.ConsumerManager;
 import org.apache.rocketmq.broker.client.net.Broker2Client;
-import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
-import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
+import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -74,7 +73,6 @@ import 
org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody;
 import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.UserInfo;
-import 
org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader;
@@ -89,13 +87,11 @@ import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerStatusRequestHead
 import 
org.apache.rocketmq.remoting.protocol.header.GetEarliestMsgStoretimeRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.GetSubscriptionGroupConfigRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetTopicConfigRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetUserRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.ListAclsRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.ListUsersRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.NotifyMinBrokerIdChangeRequestHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.QueryConsumeQueueRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.QueryCorrectionOffsetHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader;
@@ -108,7 +104,6 @@ import 
org.apache.rocketmq.remoting.protocol.header.UpdateAclRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.UpdateUserRequestHeader;
 import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
-import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.store.CommitLog;
 import org.apache.rocketmq.store.DefaultMessageStore;
@@ -116,7 +111,6 @@ import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.logfile.DefaultMappedFile;
-import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
 import org.apache.rocketmq.store.stats.BrokerStats;
 import org.apache.rocketmq.store.timer.TimerCheckpoint;
 import org.apache.rocketmq.store.timer.TimerMessageStore;
@@ -151,8 +145,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.LongAdder;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -727,7 +719,7 @@ public class AdminBrokerProcessorTest {
         consumerOffsetManager = mock(ConsumerOffsetManager.class);
         
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
         ConsumerOffsetManager consumerOffset = new ConsumerOffsetManager();
-        
when(consumerOffsetManager.encode()).thenReturn(JSON.toJSONString(consumerOffset));
+        
when(consumerOffsetManager.encode()).thenReturn(JSON.toJSONString(consumerOffset,
 false));
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
         RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
@@ -1338,69 +1330,6 @@ public class AdminBrokerProcessorTest {
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
     }
 
-    @Test
-    public void testGetSubscriptionGroup() throws RemotingCommandException {
-        
brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().put("group",
 new SubscriptionGroupConfig());
-        GetSubscriptionGroupConfigRequestHeader requestHeader = new 
GetSubscriptionGroupConfigRequestHeader();
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_SUBSCRIPTIONGROUP_CONFIG, 
requestHeader);
-        requestHeader.setGroup("group");
-        request.makeCustomHeaderToNet();
-        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
-        assertEquals(ResponseCode.SUCCESS, response.getCode());
-    }
-
-    @Test
-    public void testCheckRocksdbCqWriteProgress() throws 
RemotingCommandException {
-        CheckRocksdbCqWriteProgressRequestHeader requestHeader = new 
CheckRocksdbCqWriteProgressRequestHeader();
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS,
 requestHeader);
-        requestHeader.setTopic("topic");
-        request.makeCustomHeaderToNet();
-        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
-        assertEquals(ResponseCode.SUCCESS, response.getCode());
-    }
-
-    @Test
-    public void testQueryConsumeQueue() throws RemotingCommandException {
-        messageStore = mock(MessageStore.class);
-        ConsumeQueueInterface consumeQueue = mock(ConsumeQueueInterface.class);
-        when(consumeQueue.getMinOffsetInQueue()).thenReturn(0L);
-        when(consumeQueue.getMaxOffsetInQueue()).thenReturn(1L);
-        when(messageStore.getConsumeQueue(anyString(), 
anyInt())).thenReturn(consumeQueue);
-        when(brokerController.getMessageStore()).thenReturn(messageStore);
-        QueryConsumeQueueRequestHeader requestHeader = new 
QueryConsumeQueueRequestHeader();
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_QUEUE, 
requestHeader);
-        requestHeader.setTopic("topic");
-        requestHeader.setQueueId(0);
-        request.makeCustomHeaderToNet();
-        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
-        assertEquals(ResponseCode.SUCCESS, response.getCode());
-    }
-
-    @Test
-    public void testProcessRequest_GetTopicConfig() throws Exception {
-        GetTopicConfigRequestHeader requestHeader = new 
GetTopicConfigRequestHeader();
-        requestHeader.setTopic("testTopic");
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, 
requestHeader);
-        request.makeCustomHeaderToNet();
-
-        TopicConfig topicConfig = new TopicConfig();
-        topicConfig.setTopicName("testTopic");
-        TopicConfigManager topicConfigManager = mock(TopicConfigManager.class);
-        
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
-        when(topicConfigManager.selectTopicConfig("testTopic"))
-                .thenReturn(topicConfig);
-
-        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
-
-        assertNotNull(response);
-        assertEquals(ResponseCode.SUCCESS, response.getCode());
-
-        String responseBody = new String(response.getBody(), 
StandardCharsets.UTF_8);
-        TopicConfigAndQueueMapping result = 
JSONObject.parseObject(responseBody, TopicConfigAndQueueMapping.class);
-        assertEquals("testTopic", result.getTopicName());
-    }
-
     private ResetOffsetRequestHeader createRequestHeader(String topic,String 
group,long timestamp,boolean force,long offset,int queueId) {
         ResetOffsetRequestHeader requestHeader = new 
ResetOffsetRequestHeader();
         requestHeader.setTopic(topic);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
index 77490dbd69..e15d51b4a8 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
@@ -18,11 +18,12 @@ package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.net.Broker2Client;
 import org.apache.rocketmq.broker.failover.EscapeBridge;
-import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -40,12 +41,10 @@ import 
org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
 import org.apache.rocketmq.store.AppendMessageResult;
 import org.apache.rocketmq.store.AppendMessageStatus;
 import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.exception.ConsumeQueueException;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -53,13 +52,8 @@ import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import java.lang.reflect.Field;
-import java.util.concurrent.CompletableFuture;
-
 import static 
org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -168,51 +162,4 @@ public class ChangeInvisibleTimeProcessorTest {
         
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_MESSAGE);
         
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
     }
-
-    @Test
-    public void testProcessRequestAsync_JsonParsing() throws Exception {
-        Channel mockChannel = mock(Channel.class);
-        RemotingCommand mockRequest = mock(RemotingCommand.class);
-        BrokerController mockBrokerController = mock(BrokerController.class);
-        TopicConfigManager mockTopicConfigManager = 
mock(TopicConfigManager.class);
-        MessageStore mockMessageStore = mock(MessageStore.class);
-        BrokerConfig mockBrokerConfig = mock(BrokerConfig.class);
-        BrokerStatsManager mockBrokerStatsManager = 
mock(BrokerStatsManager.class);
-        PopMessageProcessor mockPopMessageProcessor = 
mock(PopMessageProcessor.class);
-        PopBufferMergeService mockPopBufferMergeService = 
mock(PopBufferMergeService.class);
-
-        
when(mockBrokerController.getTopicConfigManager()).thenReturn(mockTopicConfigManager);
-        
when(mockBrokerController.getMessageStore()).thenReturn(mockMessageStore);
-        
when(mockBrokerController.getBrokerConfig()).thenReturn(mockBrokerConfig);
-        
when(mockBrokerController.getBrokerStatsManager()).thenReturn(mockBrokerStatsManager);
-        
when(mockBrokerController.getPopMessageProcessor()).thenReturn(mockPopMessageProcessor);
-        
when(mockPopMessageProcessor.getPopBufferMergeService()).thenReturn(mockPopBufferMergeService);
-        when(mockPopBufferMergeService.addAk(anyInt(), 
any())).thenReturn(false);
-        when(mockBrokerController.getEscapeBridge()).thenReturn(escapeBridge);
-        PutMessageResult mockPutMessageResult = new 
PutMessageResult(PutMessageStatus.PUT_OK, null, true);
-        
when(mockBrokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(CompletableFuture.completedFuture(mockPutMessageResult));
-
-        TopicConfig topicConfig = new TopicConfig();
-        topicConfig.setReadQueueNums(4);
-        
when(mockTopicConfigManager.selectTopicConfig(anyString())).thenReturn(topicConfig);
-        when(mockMessageStore.getMinOffsetInQueue(anyString(), 
anyInt())).thenReturn(0L);
-        when(mockMessageStore.getMaxOffsetInQueue(anyString(), 
anyInt())).thenReturn(10L);
-        
when(mockBrokerConfig.isPopConsumerKVServiceEnable()).thenReturn(false);
-
-        ChangeInvisibleTimeRequestHeader requestHeader = new 
ChangeInvisibleTimeRequestHeader();
-        requestHeader.setTopic("TestTopic");
-        requestHeader.setQueueId(1);
-        requestHeader.setOffset(5L);
-        requestHeader.setConsumerGroup("TestGroup");
-        requestHeader.setExtraInfo("0 10000 10000 0 TestBroker 1");
-        requestHeader.setInvisibleTime(60000L);
-        
when(mockRequest.decodeCommandCustomHeader(ChangeInvisibleTimeRequestHeader.class)).thenReturn(requestHeader);
-
-        ChangeInvisibleTimeProcessor processor = new 
ChangeInvisibleTimeProcessor(mockBrokerController);
-        CompletableFuture<RemotingCommand> futureResponse = 
processor.processRequestAsync(mockChannel, mockRequest, true);
-
-        RemotingCommand response = futureResponse.get();
-        assertNotNull(response);
-        assertEquals(ResponseCode.SUCCESS, response.getCode());
-    }
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
index 33d6820a7e..acc7a3da74 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
@@ -16,20 +16,19 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson2.JSON;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.client.ConsumerManager;
-import org.apache.rocketmq.broker.failover.EscapeBridge;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
-import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
 import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.PutMessageResult;
-import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.pop.AckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
@@ -38,82 +37,56 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
+import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import java.lang.reflect.Method;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import static 
org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.Silent.class)
 public class PopBufferMergeServiceTest {
-
+    @Spy
+    private BrokerController brokerController = new BrokerController(new 
BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new 
MessageStoreConfig());
     @Mock
-    private BrokerController brokerController;
-
     private PopMessageProcessor popMessageProcessor;
-
-    @Mock
-    private ScheduleMessageService scheduleMessageService;
-
     @Mock
-    private TopicConfigManager topicConfigManager;
-
-    @Mock
-    private ConsumerManager consumerManager;
-
+    private ChannelHandlerContext handlerContext;
     @Mock
     private DefaultMessageStore messageStore;
-
-    @Mock
-    private MessageStoreConfig messageStoreConfig;
-
-    private String defaultGroup = "defaultGroup";
-
-    private String defaultTopic = "defaultTopic";
-
-    private PopBufferMergeService popBufferMergeService;
-
-    @Mock
-    private BrokerConfig brokerConfig;
-
-    @Mock
-    private EscapeBridge escapeBridge;
+    private ScheduleMessageService scheduleMessageService;
+    private ClientChannelInfo clientChannelInfo;
+    private String group = "FooBarGroup";
+    private String topic = "FooBar";
 
     @Before
     public void init() throws Exception {
-        when(brokerConfig.getBrokerIP1()).thenReturn("127.0.0.1");
-        when(brokerConfig.isEnablePopBufferMerge()).thenReturn(true);
-        when(brokerConfig.getPopCkStayBufferTime()).thenReturn(10 * 1000);
-        when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
-        when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
-        when(brokerController.getMessageStore()).thenReturn(messageStore);
-        
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
-        
when(brokerController.getScheduleMessageService()).thenReturn(scheduleMessageService);
-        
when(brokerController.getConsumerManager()).thenReturn(consumerManager);
-        
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+        FieldUtils.writeField(brokerController.getBrokerConfig(), 
"enablePopBufferMerge", true, true);
+        brokerController.setMessageStore(messageStore);
         popMessageProcessor = new PopMessageProcessor(brokerController);
-        popBufferMergeService = new PopBufferMergeService(brokerController, 
popMessageProcessor);
-        FieldUtils.writeDeclaredField(popBufferMergeService, 
"brokerController", brokerController, true);
-        ConcurrentMap<String, TopicConfig> topicConfigTable = new 
ConcurrentHashMap<>();
-        topicConfigTable.put(defaultTopic, new TopicConfig());
-        
when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable);
+        scheduleMessageService = new ScheduleMessageService(brokerController);
+        scheduleMessageService.parseDelayLevel();
+        Channel mockChannel = mock(Channel.class);
+        
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new 
TopicConfig());
+        clientChannelInfo = new ClientChannelInfo(mockChannel);
+        ConsumerData consumerData = createConsumerData(group, topic);
+        brokerController.getConsumerManager().registerConsumer(
+            consumerData.getGroupName(),
+            clientChannelInfo,
+            consumerData.getConsumeType(),
+            consumerData.getMessageModel(),
+            consumerData.getConsumeFromWhere(),
+            consumerData.getSubscriptionDataSet(),
+            false);
     }
 
-    @Test(timeout = 15_000)
+    @Test(timeout = 10_000)
     public void testBasic() throws Exception {
         // This test case fails on Windows in CI pipeline
         // Disable it for later fix
         Assume.assumeFalse(MixAll.isWindows());
+        PopBufferMergeService popBufferMergeService = new 
PopBufferMergeService(brokerController, popMessageProcessor);
+        popBufferMergeService.start();
         PopCheckPoint ck = new PopCheckPoint();
         ck.setBitMap(0);
         int msgCnt = 1;
@@ -124,8 +97,8 @@ public class PopBufferMergeServiceTest {
         ck.setInvisibleTime(invisibleTime);
         int offset = 100;
         ck.setStartOffset(offset);
-        ck.setCId(defaultGroup);
-        ck.setTopic(defaultTopic);
+        ck.setCId(group);
+        ck.setTopic(topic);
         int queueId = 0;
         ck.setQueueId(queueId);
 
@@ -135,93 +108,18 @@ public class PopBufferMergeServiceTest {
         AckMsg ackMsg = new AckMsg();
         ackMsg.setAckOffset(ackOffset);
         ackMsg.setStartOffset(offset);
-        ackMsg.setConsumerGroup(defaultGroup);
-        ackMsg.setTopic(defaultTopic);
+        ackMsg.setConsumerGroup(group);
+        ackMsg.setTopic(topic);
         ackMsg.setQueueId(queueId);
         ackMsg.setPopTime(popTime);
         try {
             assertThat(popBufferMergeService.addCk(ck, reviveQid, ackOffset, 
nextBeginOffset)).isTrue();
-            assertThat(popBufferMergeService.getLatestOffset(defaultTopic, 
defaultGroup, queueId)).isEqualTo(nextBeginOffset);
+            assertThat(popBufferMergeService.getLatestOffset(topic, group, 
queueId)).isEqualTo(nextBeginOffset);
             Thread.sleep(1000); // wait background threads of 
PopBufferMergeService run for some time
             assertThat(popBufferMergeService.addAk(reviveQid, 
ackMsg)).isTrue();
-            assertThat(popBufferMergeService.getLatestOffset(defaultTopic, 
defaultGroup, queueId)).isEqualTo(nextBeginOffset);
+            assertThat(popBufferMergeService.getLatestOffset(topic, group, 
queueId)).isEqualTo(nextBeginOffset);
         } finally {
             popBufferMergeService.shutdown(true);
         }
     }
-
-    @Test
-    public void testAddCkJustOffset_MergeKeyConflict() {
-        PopCheckPoint point = mock(PopCheckPoint.class);
-        String mergeKey = "testMergeKey";
-        when(point.getTopic()).thenReturn(mergeKey);
-        when(point.getCId()).thenReturn("");
-        when(point.getQueueId()).thenReturn(0);
-        when(point.getStartOffset()).thenReturn(0L);
-        when(point.getPopTime()).thenReturn(0L);
-        when(point.getBrokerName()).thenReturn("");
-        popBufferMergeService.buffer.put(mergeKey + "000", 
mock(PopBufferMergeService.PopCheckPointWrapper.class));
-
-        assertFalse(popBufferMergeService.addCkJustOffset(point, 0, 0, 0));
-    }
-
-    @Test
-    public void testAddCkMock() {
-        int queueId = 0;
-        long startOffset = 100L;
-        long invisibleTime = 30_000L;
-        long popTime = System.currentTimeMillis();
-        int reviveQueueId = 0;
-        long nextBeginOffset = 101L;
-        String brokerName = "brokerName";
-        popBufferMergeService.addCkMock(defaultGroup, defaultTopic, queueId, 
startOffset, invisibleTime, popTime, reviveQueueId, nextBeginOffset, 
brokerName);
-        verify(brokerConfig, times(1)).isEnablePopLog();
-    }
-
-    @Test
-    public void testPutAckToStore() throws Exception {
-        PopCheckPoint point = new PopCheckPoint();
-        point.setStartOffset(100L);
-        point.setCId("testGroup");
-        point.setTopic("testTopic");
-        point.setQueueId(1);
-        point.setPopTime(System.currentTimeMillis());
-        point.setBrokerName("testBroker");
-
-        PopBufferMergeService.PopCheckPointWrapper pointWrapper = 
mock(PopBufferMergeService.PopCheckPointWrapper.class);
-        when(pointWrapper.getCk()).thenReturn(point);
-        when(pointWrapper.getReviveQueueId()).thenReturn(0);
-
-        AtomicInteger toStoreBits = new AtomicInteger(0);
-        when(pointWrapper.getToStoreBits()).thenReturn(toStoreBits);
-
-        byte msgIndex = 0;
-        AtomicInteger count = new AtomicInteger(0);
-
-        EscapeBridge escapeBridge = mock(EscapeBridge.class);
-        when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
-        
when(brokerController.getBrokerConfig().isAppendAckAsync()).thenReturn(false);
-
-        
when(escapeBridge.putMessageToSpecificQueue(any())).thenAnswer(invocation -> {
-            MessageExtBrokerInner capturedMessage = invocation.getArgument(0);
-            AckMsg ackMsg = JSON.parseObject(capturedMessage.getBody(), 
AckMsg.class);
-
-            assertEquals(point.ackOffsetByIndex(msgIndex), 
ackMsg.getAckOffset());
-            assertEquals(point.getStartOffset(), ackMsg.getStartOffset());
-            assertEquals(point.getCId(), ackMsg.getConsumerGroup());
-            assertEquals(point.getTopic(), ackMsg.getTopic());
-            assertEquals(point.getQueueId(), ackMsg.getQueueId());
-            assertEquals(point.getPopTime(), ackMsg.getPopTime());
-            assertEquals(point.getBrokerName(), ackMsg.getBrokerName());
-
-            PutMessageResult result = mock(PutMessageResult.class);
-            
when(result.getPutMessageStatus()).thenReturn(PutMessageStatus.PUT_OK);
-            return result;
-        });
-
-        Method method = 
PopBufferMergeService.class.getDeclaredMethod("putAckToStore", 
PopBufferMergeService.PopCheckPointWrapper.class, byte.class, 
AtomicInteger.class);
-        method.setAccessible(true);
-        method.invoke(popBufferMergeService, pointWrapper, msgIndex, count);
-        verify(escapeBridge, 
times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class));
-    }
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
index 28476149ab..fdb0690e5d 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
@@ -16,9 +16,10 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson2.JSON;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.embedded.EmbeddedChannel;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -26,7 +27,6 @@ import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.ConsumeInitMode;
 import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
@@ -42,7 +42,7 @@ import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.exception.ConsumeQueueException;
 import org.apache.rocketmq.store.logfile.DefaultMappedFile;
-import org.apache.rocketmq.store.pop.PopCheckPoint;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -50,13 +50,8 @@ import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.CompletableFuture;
-
 import static 
org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -173,17 +168,17 @@ public class PopMessageProcessorTest {
                 
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
 
         long offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 
0);
-        assertEquals(-1, offset);
+        Assert.assertEquals(-1, offset);
 
         RemotingCommand request = createPopMsgCommand(newGroup, topic, 0, 
ConsumeInitMode.MAX);
         popMessageProcessor.processRequest(handlerContext, request);
         offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 
0);
-        assertEquals(minOffset, offset);
+        Assert.assertEquals(minOffset, offset);
 
         when(messageStore.getMinOffsetInQueue(retryTopic, 
0)).thenReturn(minOffset * 2);
         popMessageProcessor.processRequest(handlerContext, request);
         offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 
0);
-        assertEquals(minOffset, offset); // will not entry getInitOffset() 
again
+        Assert.assertEquals(minOffset, offset); // will not entry 
getInitOffset() again
         messageStore.getMinOffsetInQueue(retryTopic, 0); // prevent 
UnnecessaryStubbingException
     }
 
@@ -198,17 +193,17 @@ public class PopMessageProcessorTest {
                 
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
 
         long offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
-        assertEquals(-1, offset);
+        Assert.assertEquals(-1, offset);
 
         RemotingCommand request = createPopMsgCommand(newGroup, topic, 0, 
ConsumeInitMode.MAX);
         popMessageProcessor.processRequest(handlerContext, request);
         offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
-        assertEquals(maxOffset - 1, offset); // checkInMem return false
+        Assert.assertEquals(maxOffset - 1, offset); // checkInMem return false
 
         when(messageStore.getMaxOffsetInQueue(topic, 0)).thenReturn(maxOffset 
* 2);
         popMessageProcessor.processRequest(handlerContext, request);
         offset = 
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
-        assertEquals(maxOffset - 1, offset); // will not entry getInitOffset() 
again
+        Assert.assertEquals(maxOffset - 1, offset); // will not entry 
getInitOffset() again
         messageStore.getMaxOffsetInQueue(topic, 0); // prevent 
UnnecessaryStubbingException
     }
 
@@ -245,31 +240,4 @@ public class PopMessageProcessorTest {
         }
         return getMessageResult;
     }
-
-    @Test
-    public void testBuildCkMsgJsonParsing() {
-        PopCheckPoint ck = new PopCheckPoint();
-        ck.setTopic("TestTopic");
-        ck.setQueueId(1);
-        ck.setStartOffset(100L);
-        ck.setCId("TestConsumer");
-        ck.setPopTime(System.currentTimeMillis());
-        ck.setBrokerName("TestBroker");
-
-        int reviveQid = 0;
-        PopMessageProcessor processor = new 
PopMessageProcessor(brokerController);
-
-        MessageExtBrokerInner result = processor.buildCkMsg(ck, reviveQid);
-
-        String jsonBody = new String(result.getBody(), StandardCharsets.UTF_8);
-        PopCheckPoint actual = JSON.parseObject(jsonBody, PopCheckPoint.class);
-
-        assertEquals(ck.getTopic(), actual.getTopic());
-        assertEquals(ck.getQueueId(), actual.getQueueId());
-        assertEquals(ck.getStartOffset(), actual.getStartOffset());
-        assertEquals(ck.getCId(), actual.getCId());
-        assertEquals(ck.getPopTime(), actual.getPopTime());
-        assertEquals(ck.getBrokerName(), actual.getBrokerName());
-        assertEquals(ck.getReviveTime(), actual.getReviveTime());
-    }
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
index e6a2cdb6cd..3010e83610 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
@@ -16,7 +16,13 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson.JSON;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.failover.EscapeBridge;
@@ -34,13 +40,12 @@ import 
org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.utils.DataConverter;
 import org.apache.rocketmq.common.utils.NetworkUtil;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.store.AppendMessageResult;
-import org.apache.rocketmq.store.AppendMessageStatus;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.AppendMessageStatus;
 import org.apache.rocketmq.store.pop.AckMsg;
-import org.apache.rocketmq.store.pop.BatchAckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 import org.apache.rocketmq.store.timer.TimerMessageStore;
 import org.junit.Assert;
@@ -51,27 +56,18 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.times;
 
 @RunWith(MockitoJUnitRunner.Silent.class)
 public class PopReviveServiceTest {
@@ -409,59 +405,6 @@ public class PopReviveServiceTest {
         verify(messageStore, 
times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
     }
 
-    @Test
-    public void testReviveMsgFromBatchAck() throws Throwable {
-        brokerConfig.setEnableSkipLongAwaitingAck(true);
-        when(consumerOffsetManager.queryOffset(PopAckConstants.REVIVE_GROUP, 
REVIVE_TOPIC, REVIVE_QUEUE_ID)).thenReturn(0L);
-        List<MessageExt> reviveMessageExtList = new ArrayList<>();
-        long basePopTime = System.currentTimeMillis();
-        
reviveMessageExtList.add(buildBatchAckMsg(buildBatchAckMsg(Arrays.asList(1L, 
2L, 3L), basePopTime), 1, 1, basePopTime));
-        doReturn(reviveMessageExtList, new 
ArrayList<>()).when(popReviveService).getReviveMessage(anyLong(), anyInt());
-
-        PopReviveService.ConsumeReviveObj consumeReviveObj = new 
PopReviveService.ConsumeReviveObj();
-        popReviveService.consumeReviveMessage(consumeReviveObj);
-        assertEquals(1, consumeReviveObj.map.size());
-
-        ArgumentCaptor<Long> commitOffsetCaptor = 
ArgumentCaptor.forClass(Long.class);
-        doNothing().when(consumerOffsetManager).commitOffset(anyString(), 
anyString(), anyString(), anyInt(), commitOffsetCaptor.capture());
-        popReviveService.mergeAndRevive(consumeReviveObj);
-        assertEquals(1, commitOffsetCaptor.getValue().longValue());
-    }
-
-    public static MessageExtBrokerInner buildBatchAckMsg(BatchAckMsg 
batchAckMsg, long deliverMs, long reviveOffset, long deliverTime) {
-        MessageExtBrokerInner result = buildBatchAckInnerMessage(REVIVE_TOPIC, 
batchAckMsg, REVIVE_QUEUE_ID, STORE_HOST, deliverMs, 
PopMessageProcessor.genAckUniqueId(batchAckMsg));
-        result.setQueueOffset(reviveOffset);
-        result.setDeliverTimeMs(deliverMs);
-        result.setStoreTimestamp(deliverTime);
-        return result;
-    }
-
-    public static BatchAckMsg buildBatchAckMsg(Collection<Long> offsets, long 
popTime) {
-        BatchAckMsg result = new BatchAckMsg();
-        result.setConsumerGroup(GROUP);
-        result.setTopic(TOPIC);
-        result.setQueueId(0);
-        result.setPopTime(popTime);
-        result.setBrokerName("broker-a");
-        result.getAckOffsetList().addAll(offsets);
-        return result;
-    }
-
-    public static MessageExtBrokerInner buildBatchAckInnerMessage(String 
reviveTopic, AckMsg ackMsg, int reviveQid, SocketAddress host, long deliverMs, 
String ackUniqueId) {
-        MessageExtBrokerInner result = new MessageExtBrokerInner();
-        result.setTopic(reviveTopic);
-        
result.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.CHARSET_UTF8));
-        result.setQueueId(reviveQid);
-        result.setTags(PopAckConstants.BATCH_ACK_TAG);
-        result.setBornTimestamp(System.currentTimeMillis());
-        result.setBornHost(host);
-        result.setStoreHost(host);
-        result.setDeliverTimeMs(deliverMs);
-        
result.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, 
ackUniqueId);
-        
result.setPropertiesString(MessageDecoder.messageProperties2String(result.getProperties()));
-        return result;
-    }
-
     public static PopCheckPoint buildPopCheckPoint(long startOffset, long 
popTime, long reviveOffset) {
         PopCheckPoint ck = new PopCheckPoint();
         ck.setStartOffset(startOffset);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
index 9b25e0134c..b74e57ab93 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
@@ -17,11 +17,15 @@
 
 package org.apache.rocketmq.broker.topic;
 
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.BrokerConfig;
-import 
org.apache.rocketmq.remoting.protocol.body.TopicQueueMappingSerializeWrapper;
 import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
 import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils;
 import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicRemappingDetailWrapper;
@@ -33,16 +37,6 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -85,9 +79,9 @@ public class TopicQueueMappingManagerTest {
                 String topic = UUID.randomUUID().toString();
                 int queueNum = 10;
                 TopicRemappingDetailWrapper topicRemappingDetailWrapper  = 
TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, brokers, new 
HashMap<>());
-                assertEquals(1, 
topicRemappingDetailWrapper.getBrokerConfigMap().size());
+                Assert.assertEquals(1, 
topicRemappingDetailWrapper.getBrokerConfigMap().size());
                 TopicQueueMappingDetail topicQueueMappingDetail  = 
topicRemappingDetailWrapper.getBrokerConfigMap().values().iterator().next().getMappingDetail();
-                assertEquals(queueNum, 
topicQueueMappingDetail.getHostedQueues().size());
+                Assert.assertEquals(queueNum, 
topicQueueMappingDetail.getHostedQueues().size());
                 mappingDetailMap.put(topic, topicQueueMappingDetail);
             }
         }
@@ -95,7 +89,7 @@ public class TopicQueueMappingManagerTest {
         {
             topicQueueMappingManager = new 
TopicQueueMappingManager(brokerController);
             Assert.assertTrue(topicQueueMappingManager.load());
-            assertEquals(0, 
topicQueueMappingManager.getTopicQueueMappingTable().size());
+            Assert.assertEquals(0, 
topicQueueMappingManager.getTopicQueueMappingTable().size());
             for (TopicQueueMappingDetail mappingDetail : 
mappingDetailMap.values()) {
                 for (int i = 0; i < 10; i++) {
                     
topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, false, 
true);
@@ -107,49 +101,11 @@ public class TopicQueueMappingManagerTest {
         {
             topicQueueMappingManager = new 
TopicQueueMappingManager(brokerController);
             Assert.assertTrue(topicQueueMappingManager.load());
-            assertEquals(mappingDetailMap.size(), 
topicQueueMappingManager.getTopicQueueMappingTable().size());
+            Assert.assertEquals(mappingDetailMap.size(), 
topicQueueMappingManager.getTopicQueueMappingTable().size());
             for (TopicQueueMappingDetail topicQueueMappingDetail: 
topicQueueMappingManager.getTopicQueueMappingTable().values()) {
-                assertEquals(topicQueueMappingDetail, 
mappingDetailMap.get(topicQueueMappingDetail.getTopic()));
+                Assert.assertEquals(topicQueueMappingDetail, 
mappingDetailMap.get(topicQueueMappingDetail.getTopic()));
             }
         }
         delete(topicQueueMappingManager);
     }
-
-    @Test
-    public void testEncodePretty() {
-        TopicQueueMappingManager topicQueueMappingManager = new 
TopicQueueMappingManager(null);
-        TopicQueueMappingDetail detail = new TopicQueueMappingDetail();
-        detail.setTopic("testTopic");
-        detail.setBname("testBroker");
-
-        topicQueueMappingManager.getTopicQueueMappingTable().put("testTopic", 
detail);
-        topicQueueMappingManager.getDataVersion().nextVersion();
-
-        String actual = topicQueueMappingManager.encode(true);
-        TopicQueueMappingSerializeWrapper expectedWrapper = new 
TopicQueueMappingSerializeWrapper();
-        expectedWrapper.setTopicQueueMappingInfoMap(new 
ConcurrentHashMap<>(topicQueueMappingManager.getTopicQueueMappingTable()));
-        
expectedWrapper.setDataVersion(topicQueueMappingManager.getDataVersion());
-        String expected = JSON.toJSONString(expectedWrapper, 
JSONWriter.Feature.PrettyFormat);
-
-        assertEquals(expected, actual);
-    }
-
-    @Test
-    public void testEncodeNonPretty() {
-        TopicQueueMappingManager topicQueueMappingManager = new 
TopicQueueMappingManager(null);
-        TopicQueueMappingDetail detail = new TopicQueueMappingDetail();
-        detail.setTopic("testTopic");
-        detail.setBname("testBroker");
-
-        topicQueueMappingManager.getTopicQueueMappingTable().put("testTopic", 
detail);
-        topicQueueMappingManager.getDataVersion().nextVersion();
-
-        String actual = topicQueueMappingManager.encode(false);
-        TopicQueueMappingSerializeWrapper expectedWrapper = new 
TopicQueueMappingSerializeWrapper();
-        expectedWrapper.setTopicQueueMappingInfoMap(new 
ConcurrentHashMap<>(topicQueueMappingManager.getTopicQueueMappingTable()));
-        
expectedWrapper.setDataVersion(topicQueueMappingManager.getDataVersion());
-        String expected = JSON.toJSONString(expectedWrapper);
-
-        assertEquals(expected, actual);
-    }
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
index 62a6ad8b5b..690b4eabb5 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
@@ -19,40 +19,23 @@ package org.apache.rocketmq.broker.transaction.queue;
 
 import org.apache.rocketmq.broker.transaction.TransactionMetrics;
 import org.apache.rocketmq.broker.transaction.TransactionMetrics.Metric;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import java.io.File;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.Collections;
-import java.util.UUID;
 
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
 
 @RunWith(MockitoJUnitRunner.class)
 public class TransactionMetricsTest {
     private TransactionMetrics transactionMetrics;
     private String configPath;
-    private Path path;
 
     @Before
-    public void before() throws Exception {
-        configPath = createBaseDir();
-        path = Paths.get(configPath);
-        transactionMetrics = spy(new TransactionMetrics(configPath));
-    }
-
-    @After
-    public void after() throws Exception {
-        deleteFile(configPath);
-        assertFalse(path.toFile().exists());
+    public void setUp() throws Exception {
+        configPath = "configPath";
+        transactionMetrics = new TransactionMetrics(configPath);
     }
 
     /**
@@ -97,40 +80,4 @@ public class TransactionMetricsTest {
         transactionMetrics.cleanMetrics(Collections.singleton(topic));
         assert transactionMetrics.getTransactionCount(topic) == 0;
     }
-
-    @Test
-    public void testPersist() {
-        assertFalse(path.toFile().exists());
-        transactionMetrics.persist();
-        assertTrue(path.toFile().exists());
-        verify(transactionMetrics).persist();
-    }
-
-    private String createBaseDir() {
-        String baseDir = System.getProperty("java.io.tmpdir") + File.separator 
+ "unitteststore-" + UUID.randomUUID();
-        final File file = new File(baseDir);
-        if (file.exists()) {
-            System.exit(1);
-        }
-        return baseDir;
-    }
-
-    private void deleteFile(String fileName) {
-        deleteFile(new File(fileName));
-    }
-
-    private void deleteFile(File file) {
-        if (!file.exists()) {
-            return;
-        }
-        if (file.isFile()) {
-            file.delete();
-        } else if (file.isDirectory()) {
-            File[] files = file.listFiles();
-            for (File file1 : files) {
-                deleteFile(file1);
-            }
-            file.delete();
-        }
-    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java 
b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
index 67cf045cfb..38e0a20752 100644
--- a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
@@ -16,7 +16,7 @@
  */
 package org.apache.rocketmq.store.pop;
 
-import com.alibaba.fastjson2.annotation.JSONField;
+import com.alibaba.fastjson.annotation.JSONField;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -35,6 +35,7 @@ public class PopCheckPoint implements 
Comparable<PopCheckPoint> {
     private int queueId;
     @JSONField(name = "t")
     private String topic;
+    @JSONField(name = "c")
     private String cid;
     @JSONField(name = "ro")
     private long reviveOffset;
@@ -113,12 +114,10 @@ public class PopCheckPoint implements 
Comparable<PopCheckPoint> {
         this.topic = topic;
     }
 
-    @JSONField(name = "c")
     public String getCId() {
         return cid;
     }
 
-    @JSONField(name = "c")
     public void setCId(String cid) {
         this.cid = cid;
     }

Reply via email to