This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 449536bed0 Revert "[ISSUE# 9333] Use fastjson2 in broker module (#9334)" (#9387) 449536bed0 is described below commit 449536bed023ef57713ecb097433bab544a3ae67 Author: lizhimins <707364...@qq.com> AuthorDate: Tue May 6 17:55:53 2025 +0800 Revert "[ISSUE# 9333] Use fastjson2 in broker module (#9334)" (#9387) This reverts commit b1daa3c0f0e8dd85f181331e4ec4ff276fb361c0. --- broker/BUILD.bazel | 2 + .../rocketmq/broker/RocksDBConfigManager.java | 7 +- .../broker/offset/ConsumerOrderInfoManager.java | 19 +-- .../rocketmq/broker/pop/PopConsumerRecord.java | 8 +- .../rocketmq/broker/pop/PopConsumerService.java | 35 ++-- .../broker/processor/AckMessageProcessor.java | 7 +- .../broker/processor/AdminBrokerProcessor.java | 49 +++--- .../processor/ChangeInvisibleTimeProcessor.java | 9 +- .../broker/processor/PopBufferMergeService.java | 19 +-- .../broker/processor/PopMessageProcessor.java | 31 ++-- .../broker/processor/PopReviveService.java | 25 ++- .../broker/topic/TopicQueueMappingManager.java | 21 +-- .../broker/transaction/TransactionMetrics.java | 44 ++--- .../rocketmq/broker/RocksDBConfigManagerTest.java | 75 --------- .../broker/processor/AdminBrokerProcessorTest.java | 79 +-------- .../ChangeInvisibleTimeProcessorTest.java | 57 +------ .../processor/PopBufferMergeServiceTest.java | 182 +++++---------------- .../broker/processor/PopMessageProcessorTest.java | 50 +----- .../broker/processor/PopReviveServiceTest.java | 83 ++-------- .../broker/topic/TopicQueueMappingManagerTest.java | 68 ++------ .../transaction/queue/TransactionMetricsTest.java | 59 +------ .../apache/rocketmq/store/pop/PopCheckPoint.java | 5 +- 22 files changed, 219 insertions(+), 715 deletions(-) diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel index 6ee2c8635f..9d61c0a1ff 100644 --- a/broker/BUILD.bazel +++ b/broker/BUILD.bazel @@ -31,6 +31,7 @@ java_library( "//tieredstore", "@maven//:org_slf4j_slf4j_api", "@maven//:ch_qos_logback_logback_classic", + "@maven//:com_alibaba_fastjson", "@maven//:com_alibaba_fastjson2_fastjson2", "@maven//:com_github_luben_zstd_jni", "@maven//:com_google_guava_guava", @@ -81,6 +82,7 @@ java_library( "//remoting", "//store", "//tieredstore", + "@maven//:com_alibaba_fastjson", "@maven//:com_alibaba_fastjson2_fastjson2", "@maven//:org_slf4j_slf4j_api", "@maven//:com_google_guava_guava", diff --git a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java index c59c00c040..ee2d4e54a6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java @@ -16,7 +16,9 @@ */ package org.apache.rocketmq.broker; -import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson.JSON; +import java.nio.charset.StandardCharsets; +import java.util.function.BiConsumer; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.config.ConfigRocksDBStorage; import org.apache.rocketmq.common.constant.LoggerName; @@ -29,9 +31,6 @@ import org.rocksdb.RocksIterator; import org.rocksdb.Statistics; import org.rocksdb.WriteBatch; -import java.nio.charset.StandardCharsets; -import java.util.function.BiConsumer; - public class RocksDBConfigManager { protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); public volatile boolean isStop = false; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java index 9f173daf46..120f5b104c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java @@ -16,9 +16,17 @@ */ package org.apache.rocketmq.broker.offset; -import com.alibaba.fastjson2.annotation.JSONField; +import com.alibaba.fastjson.annotation.JSONField; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -29,15 +37,6 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - public class ConsumerOrderInfoManager extends ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java index 661ace9bcb..1ee01fea1c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java @@ -16,9 +16,9 @@ */ package org.apache.rocketmq.broker.pop; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.annotation.JSONField; - +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.annotation.JSONField; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -119,7 +119,7 @@ public class PopConsumerRecord { } public static PopConsumerRecord decode(byte[] body) { - return JSON.parseObject(body, PopConsumerRecord.class); + return JSONObject.parseObject(body, PopConsumerRecord.class); } public long getPopTime() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java index a2198f2560..1138ff4afe 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java @@ -16,9 +16,25 @@ */ package org.apache.rocketmq.broker.pop; -import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson.JSON; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; +import java.nio.ByteBuffer; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Triple; import org.apache.rocketmq.broker.BrokerController; @@ -49,23 +65,6 @@ import org.apache.rocketmq.store.pop.PopCheckPoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Objects; -import java.util.Queue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - public class PopConsumerService extends ServiceThread { private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java index 06a531552a..23a4f6167c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java @@ -16,9 +16,11 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson.JSON; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import java.util.BitSet; +import java.nio.charset.StandardCharsets; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.metrics.PopMetricsManager; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; @@ -50,9 +52,6 @@ import org.apache.rocketmq.store.exception.ConsumeQueueException; import org.apache.rocketmq.store.pop.AckMsg; import org.apache.rocketmq.store.pop.BatchAckMsg; -import java.nio.charset.StandardCharsets; -import java.util.BitSet; - public class AckMessageProcessor implements NettyRequestProcessor { private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 4d45730a3c..812ca90e82 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -16,12 +16,33 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Sets; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.opentelemetry.api.common.Attributes; +import java.io.UnsupportedEncodingException; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.auth.authentication.enums.UserType; @@ -206,28 +227,6 @@ import org.apache.rocketmq.store.timer.TimerCheckpoint; import org.apache.rocketmq.store.timer.TimerMessageStore; import org.apache.rocketmq.store.util.LibC; -import java.io.UnsupportedEncodingException; -import java.net.UnknownHostException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM; import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; @@ -2733,7 +2732,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } else { ConsumerFilterData filterData = this.brokerController.getConsumerFilterManager() .get(requestHeader.getTopic(), requestHeader.getConsumerGroup()); - body.setFilterData(JSON.toJSONString(filterData)); + body.setFilterData(JSON.toJSONString(filterData, true)); messageFilter = new ExpressionMessageFilter(subscriptionData, filterData, this.brokerController.getConsumerFilterManager()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java index f288c001b8..de72ee7baf 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java @@ -16,9 +16,12 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson.JSON; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.nio.charset.StandardCharsets; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.metrics.PopMetricsManager; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; @@ -47,10 +50,6 @@ import org.apache.rocketmq.store.exception.ConsumeQueueException; import org.apache.rocketmq.store.pop.AckMsg; import org.apache.rocketmq.store.pop.PopCheckPoint; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - public class ChangeInvisibleTimeProcessor implements NettyRequestProcessor { private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java index 7c309ec5c4..820388b18d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java @@ -16,7 +16,15 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson.JSON; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.metrics.PopMetricsManager; import org.apache.rocketmq.common.KeyBuilder; @@ -36,15 +44,6 @@ import org.apache.rocketmq.store.pop.AckMsg; import org.apache.rocketmq.store.pop.BatchAckMsg; import org.apache.rocketmq.store.pop.PopCheckPoint; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.atomic.AtomicInteger; - public class PopBufferMergeService extends ServiceThread { private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); ConcurrentHashMap<String/*mergeKey*/, PopCheckPointWrapper> diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 9f55269f39..d73acc84df 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -16,13 +16,27 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson.JSON; import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.FileRegion; import io.opentelemetry.api.common.Attributes; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ConsumerFilterManager; @@ -77,21 +91,6 @@ import org.apache.rocketmq.store.pop.AckMsg; import org.apache.rocketmq.store.pop.BatchAckMsg; import org.apache.rocketmq.store.pop.PopCheckPoint; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Random; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index dcffaf50cc..e1ead86169 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -16,8 +16,18 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson.JSON; import io.opentelemetry.api.common.Attributes; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Triple; import org.apache.rocketmq.broker.BrokerController; @@ -27,12 +37,12 @@ import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PopAckConstants; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; -import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; @@ -51,17 +61,6 @@ import org.apache.rocketmq.store.pop.AckMsg; import org.apache.rocketmq.store.pop.BatchAckMsg; import org.apache.rocketmq.store.pop.PopCheckPoint; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.concurrent.CompletableFuture; - import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index dfbe5d347a..6b9cf15938 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -16,8 +16,13 @@ */ package org.apache.rocketmq.broker.topic; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONWriter; +import com.alibaba.fastjson.JSON; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -35,13 +40,6 @@ import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader; import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; public class TopicQueueMappingManager extends ConfigManager { @@ -151,10 +149,7 @@ public class TopicQueueMappingManager extends ConfigManager { TopicQueueMappingSerializeWrapper wrapper = new TopicQueueMappingSerializeWrapper(); wrapper.setTopicQueueMappingInfoMap(topicQueueMappingTable); wrapper.setDataVersion(this.dataVersion); - if (pretty) { - return JSON.toJSONString(wrapper, JSONWriter.Feature.PrettyFormat); - } - return JSON.toJSONString(wrapper); + return JSON.toJSONString(wrapper, pretty); } @Override diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java index d8dd811db2..ad30c73c60 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java @@ -16,21 +16,16 @@ */ package org.apache.rocketmq.broker.transaction; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONWriter; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.serializer.SerializerFeature; import com.google.common.io.Files; -import org.apache.rocketmq.common.ConfigManager; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.topic.TopicValidator; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.remoting.protocol.DataVersion; -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; - +import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -38,6 +33,14 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.common.ConfigManager; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.protocol.DataVersion; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + public class TransactionMetrics extends ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -87,11 +90,11 @@ public class TransactionMetrics extends ConfigManager { this.transactionCounts = transactionCounts; } - protected void write0(OutputStream out) { + protected void write0(Writer writer) { TransactionMetricsSerializeWrapper wrapper = new TransactionMetricsSerializeWrapper(); wrapper.setTransactionCount(transactionCounts); wrapper.setDataVersion(dataVersion); - JSON.writeTo(out, wrapper, JSONWriter.Feature.BrowserCompatible); + JSON.writeJSONString(writer, wrapper, SerializerFeature.BrowserCompatible); } @Override @@ -179,7 +182,7 @@ public class TransactionMetrics extends ConfigManager { String config = configFilePath(); String temp = config + ".tmp"; String backup = config + ".bak"; - FileOutputStream outputStream = null; + BufferedWriter bufferedWriter = null; try { File tmpFile = new File(temp); File parentDirectory = tmpFile.getParentFile(); @@ -196,10 +199,11 @@ public class TransactionMetrics extends ConfigManager { return; } } - outputStream = new FileOutputStream(tmpFile, false); - write0(outputStream); - outputStream.flush(); - outputStream.close(); + bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(tmpFile, false), + StandardCharsets.UTF_8)); + write0(bufferedWriter); + bufferedWriter.flush(); + bufferedWriter.close(); log.debug("Finished writing tmp file: {}", temp); File configFile = new File(config); @@ -212,9 +216,9 @@ public class TransactionMetrics extends ConfigManager { } catch (IOException e) { log.error("Failed to persist {}", temp, e); } finally { - if (null != outputStream) { + if (null != bufferedWriter) { try { - outputStream.close(); + bufferedWriter.close(); } catch (IOException ignore) { } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java deleted file mode 100644 index d9feb6a782..0000000000 --- a/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.broker; - -import com.alibaba.fastjson2.JSON; -import org.apache.rocketmq.common.config.ConfigRocksDBStorage; -import org.apache.rocketmq.remoting.protocol.DataVersion; -import org.junit.Before; -import org.junit.Test; - -import java.nio.charset.StandardCharsets; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.mock; - -public class RocksDBConfigManagerTest { - - private ConfigRocksDBStorage configRocksDBStorage; - - private RocksDBConfigManager rocksDBConfigManager; - - @Before - public void setUp() throws IllegalAccessException { - configRocksDBStorage = mock(ConfigRocksDBStorage.class); - rocksDBConfigManager = spy(new RocksDBConfigManager("testPath", 1000L, null)); - rocksDBConfigManager.configRocksDBStorage = configRocksDBStorage; - } - - @Test - public void testLoadDataVersion() throws Exception { - DataVersion expected = new DataVersion(); - expected.nextVersion(); - String jsonData = JSON.toJSONString(expected); - byte[] mockDataVersion = jsonData.getBytes(StandardCharsets.UTF_8); - - when(rocksDBConfigManager.configRocksDBStorage.getKvDataVersion()).thenReturn(mockDataVersion); - - boolean result = rocksDBConfigManager.loadDataVersion(); - - assertTrue(result); - assertEquals(expected.getCounter().get(), rocksDBConfigManager.getKvDataVersion().getCounter().get()); - assertEquals(expected.getTimestamp(), rocksDBConfigManager.getKvDataVersion().getTimestamp()); - } - - @Test - public void testUpdateKvDataVersion() throws Exception { - rocksDBConfigManager.updateKvDataVersion(); - - DataVersion expectedDataVersion = rocksDBConfigManager.getKvDataVersion(); - verify(rocksDBConfigManager.configRocksDBStorage, times(1)).updateKvDataVersion( - eq(JSON.toJSONString(expectedDataVersion).getBytes(StandardCharsets.UTF_8)) - ); - } -} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index 8418781b6b..a6bcca954d 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -16,8 +16,7 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; +import com.alibaba.fastjson.JSON; import com.google.common.collect.Sets; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -35,10 +34,10 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.client.ConsumerManager; import org.apache.rocketmq.broker.client.net.Broker2Client; -import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager; -import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.broker.schedule.ScheduleMessageService; +import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager; +import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.BrokerConfig; @@ -74,7 +73,6 @@ import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UserInfo; -import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader; @@ -89,13 +87,11 @@ import org.apache.rocketmq.remoting.protocol.header.GetConsumerStatusRequestHead import org.apache.rocketmq.remoting.protocol.header.GetEarliestMsgStoretimeRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.GetSubscriptionGroupConfigRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetTopicConfigRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetUserRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ListAclsRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ListUsersRequestHeader; import org.apache.rocketmq.remoting.protocol.header.NotifyMinBrokerIdChangeRequestHeader; -import org.apache.rocketmq.remoting.protocol.header.QueryConsumeQueueRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryCorrectionOffsetHeader; import org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader; @@ -108,7 +104,6 @@ import org.apache.rocketmq.remoting.protocol.header.UpdateAclRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateUserRequestHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; -import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.store.CommitLog; import org.apache.rocketmq.store.DefaultMessageStore; @@ -116,7 +111,6 @@ import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.logfile.DefaultMappedFile; -import org.apache.rocketmq.store.queue.ConsumeQueueInterface; import org.apache.rocketmq.store.stats.BrokerStats; import org.apache.rocketmq.store.timer.TimerCheckpoint; import org.apache.rocketmq.store.timer.TimerMessageStore; @@ -151,8 +145,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.LongAdder; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -727,7 +719,7 @@ public class AdminBrokerProcessorTest { consumerOffsetManager = mock(ConsumerOffsetManager.class); when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); ConsumerOffsetManager consumerOffset = new ConsumerOffsetManager(); - when(consumerOffsetManager.encode()).thenReturn(JSON.toJSONString(consumerOffset)); + when(consumerOffsetManager.encode()).thenReturn(JSON.toJSONString(consumerOffset, false)); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null); RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); @@ -1338,69 +1330,6 @@ public class AdminBrokerProcessorTest { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } - @Test - public void testGetSubscriptionGroup() throws RemotingCommandException { - brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().put("group", new SubscriptionGroupConfig()); - GetSubscriptionGroupConfigRequestHeader requestHeader = new GetSubscriptionGroupConfigRequestHeader(); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SUBSCRIPTIONGROUP_CONFIG, requestHeader); - requestHeader.setGroup("group"); - request.makeCustomHeaderToNet(); - RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); - assertEquals(ResponseCode.SUCCESS, response.getCode()); - } - - @Test - public void testCheckRocksdbCqWriteProgress() throws RemotingCommandException { - CheckRocksdbCqWriteProgressRequestHeader requestHeader = new CheckRocksdbCqWriteProgressRequestHeader(); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, requestHeader); - requestHeader.setTopic("topic"); - request.makeCustomHeaderToNet(); - RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); - assertEquals(ResponseCode.SUCCESS, response.getCode()); - } - - @Test - public void testQueryConsumeQueue() throws RemotingCommandException { - messageStore = mock(MessageStore.class); - ConsumeQueueInterface consumeQueue = mock(ConsumeQueueInterface.class); - when(consumeQueue.getMinOffsetInQueue()).thenReturn(0L); - when(consumeQueue.getMaxOffsetInQueue()).thenReturn(1L); - when(messageStore.getConsumeQueue(anyString(), anyInt())).thenReturn(consumeQueue); - when(brokerController.getMessageStore()).thenReturn(messageStore); - QueryConsumeQueueRequestHeader requestHeader = new QueryConsumeQueueRequestHeader(); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_QUEUE, requestHeader); - requestHeader.setTopic("topic"); - requestHeader.setQueueId(0); - request.makeCustomHeaderToNet(); - RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); - assertEquals(ResponseCode.SUCCESS, response.getCode()); - } - - @Test - public void testProcessRequest_GetTopicConfig() throws Exception { - GetTopicConfigRequestHeader requestHeader = new GetTopicConfigRequestHeader(); - requestHeader.setTopic("testTopic"); - - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, requestHeader); - request.makeCustomHeaderToNet(); - - TopicConfig topicConfig = new TopicConfig(); - topicConfig.setTopicName("testTopic"); - TopicConfigManager topicConfigManager = mock(TopicConfigManager.class); - when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); - when(topicConfigManager.selectTopicConfig("testTopic")) - .thenReturn(topicConfig); - - RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); - - assertNotNull(response); - assertEquals(ResponseCode.SUCCESS, response.getCode()); - - String responseBody = new String(response.getBody(), StandardCharsets.UTF_8); - TopicConfigAndQueueMapping result = JSONObject.parseObject(responseBody, TopicConfigAndQueueMapping.class); - assertEquals("testTopic", result.getTopicName()); - } - private ResetOffsetRequestHeader createRequestHeader(String topic,String group,long timestamp,boolean force,long offset,int queueId) { ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); requestHeader.setTopic(topic); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java index 77490dbd69..e15d51b4a8 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java @@ -18,11 +18,12 @@ package org.apache.rocketmq.broker.processor; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; +import java.lang.reflect.Field; +import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.net.Broker2Client; import org.apache.rocketmq.broker.failover.EscapeBridge; -import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.MessageConst; @@ -40,12 +41,10 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData; import org.apache.rocketmq.store.AppendMessageResult; import org.apache.rocketmq.store.AppendMessageStatus; import org.apache.rocketmq.store.DefaultMessageStore; -import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.exception.ConsumeQueueException; -import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -53,13 +52,8 @@ import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; -import java.lang.reflect.Field; -import java.util.concurrent.CompletableFuture; - import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -168,51 +162,4 @@ public class ChangeInvisibleTimeProcessorTest { assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_MESSAGE); assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque()); } - - @Test - public void testProcessRequestAsync_JsonParsing() throws Exception { - Channel mockChannel = mock(Channel.class); - RemotingCommand mockRequest = mock(RemotingCommand.class); - BrokerController mockBrokerController = mock(BrokerController.class); - TopicConfigManager mockTopicConfigManager = mock(TopicConfigManager.class); - MessageStore mockMessageStore = mock(MessageStore.class); - BrokerConfig mockBrokerConfig = mock(BrokerConfig.class); - BrokerStatsManager mockBrokerStatsManager = mock(BrokerStatsManager.class); - PopMessageProcessor mockPopMessageProcessor = mock(PopMessageProcessor.class); - PopBufferMergeService mockPopBufferMergeService = mock(PopBufferMergeService.class); - - when(mockBrokerController.getTopicConfigManager()).thenReturn(mockTopicConfigManager); - when(mockBrokerController.getMessageStore()).thenReturn(mockMessageStore); - when(mockBrokerController.getBrokerConfig()).thenReturn(mockBrokerConfig); - when(mockBrokerController.getBrokerStatsManager()).thenReturn(mockBrokerStatsManager); - when(mockBrokerController.getPopMessageProcessor()).thenReturn(mockPopMessageProcessor); - when(mockPopMessageProcessor.getPopBufferMergeService()).thenReturn(mockPopBufferMergeService); - when(mockPopBufferMergeService.addAk(anyInt(), any())).thenReturn(false); - when(mockBrokerController.getEscapeBridge()).thenReturn(escapeBridge); - PutMessageResult mockPutMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, null, true); - when(mockBrokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(CompletableFuture.completedFuture(mockPutMessageResult)); - - TopicConfig topicConfig = new TopicConfig(); - topicConfig.setReadQueueNums(4); - when(mockTopicConfigManager.selectTopicConfig(anyString())).thenReturn(topicConfig); - when(mockMessageStore.getMinOffsetInQueue(anyString(), anyInt())).thenReturn(0L); - when(mockMessageStore.getMaxOffsetInQueue(anyString(), anyInt())).thenReturn(10L); - when(mockBrokerConfig.isPopConsumerKVServiceEnable()).thenReturn(false); - - ChangeInvisibleTimeRequestHeader requestHeader = new ChangeInvisibleTimeRequestHeader(); - requestHeader.setTopic("TestTopic"); - requestHeader.setQueueId(1); - requestHeader.setOffset(5L); - requestHeader.setConsumerGroup("TestGroup"); - requestHeader.setExtraInfo("0 10000 10000 0 TestBroker 1"); - requestHeader.setInvisibleTime(60000L); - when(mockRequest.decodeCommandCustomHeader(ChangeInvisibleTimeRequestHeader.class)).thenReturn(requestHeader); - - ChangeInvisibleTimeProcessor processor = new ChangeInvisibleTimeProcessor(mockBrokerController); - CompletableFuture<RemotingCommand> futureResponse = processor.processRequestAsync(mockChannel, mockRequest, true); - - RemotingCommand response = futureResponse.get(); - assertNotNull(response); - assertEquals(ResponseCode.SUCCESS, response.getCode()); - } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java index 33d6820a7e..acc7a3da74 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java @@ -16,20 +16,19 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson2.JSON; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.broker.client.ConsumerManager; -import org.apache.rocketmq.broker.failover.EscapeBridge; +import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.schedule.ScheduleMessageService; -import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData; import org.apache.rocketmq.store.DefaultMessageStore; -import org.apache.rocketmq.store.PutMessageResult; -import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.pop.AckMsg; import org.apache.rocketmq.store.pop.PopCheckPoint; @@ -38,82 +37,56 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; -import java.lang.reflect.Method; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; - +import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.Silent.class) public class PopBufferMergeServiceTest { - + @Spy + private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig()); @Mock - private BrokerController brokerController; - private PopMessageProcessor popMessageProcessor; - - @Mock - private ScheduleMessageService scheduleMessageService; - @Mock - private TopicConfigManager topicConfigManager; - - @Mock - private ConsumerManager consumerManager; - + private ChannelHandlerContext handlerContext; @Mock private DefaultMessageStore messageStore; - - @Mock - private MessageStoreConfig messageStoreConfig; - - private String defaultGroup = "defaultGroup"; - - private String defaultTopic = "defaultTopic"; - - private PopBufferMergeService popBufferMergeService; - - @Mock - private BrokerConfig brokerConfig; - - @Mock - private EscapeBridge escapeBridge; + private ScheduleMessageService scheduleMessageService; + private ClientChannelInfo clientChannelInfo; + private String group = "FooBarGroup"; + private String topic = "FooBar"; @Before public void init() throws Exception { - when(brokerConfig.getBrokerIP1()).thenReturn("127.0.0.1"); - when(brokerConfig.isEnablePopBufferMerge()).thenReturn(true); - when(brokerConfig.getPopCkStayBufferTime()).thenReturn(10 * 1000); - when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); - when(brokerController.getEscapeBridge()).thenReturn(escapeBridge); - when(brokerController.getMessageStore()).thenReturn(messageStore); - when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); - when(brokerController.getScheduleMessageService()).thenReturn(scheduleMessageService); - when(brokerController.getConsumerManager()).thenReturn(consumerManager); - when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); + FieldUtils.writeField(brokerController.getBrokerConfig(), "enablePopBufferMerge", true, true); + brokerController.setMessageStore(messageStore); popMessageProcessor = new PopMessageProcessor(brokerController); - popBufferMergeService = new PopBufferMergeService(brokerController, popMessageProcessor); - FieldUtils.writeDeclaredField(popBufferMergeService, "brokerController", brokerController, true); - ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>(); - topicConfigTable.put(defaultTopic, new TopicConfig()); - when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable); + scheduleMessageService = new ScheduleMessageService(brokerController); + scheduleMessageService.parseDelayLevel(); + Channel mockChannel = mock(Channel.class); + brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig()); + clientChannelInfo = new ClientChannelInfo(mockChannel); + ConsumerData consumerData = createConsumerData(group, topic); + brokerController.getConsumerManager().registerConsumer( + consumerData.getGroupName(), + clientChannelInfo, + consumerData.getConsumeType(), + consumerData.getMessageModel(), + consumerData.getConsumeFromWhere(), + consumerData.getSubscriptionDataSet(), + false); } - @Test(timeout = 15_000) + @Test(timeout = 10_000) public void testBasic() throws Exception { // This test case fails on Windows in CI pipeline // Disable it for later fix Assume.assumeFalse(MixAll.isWindows()); + PopBufferMergeService popBufferMergeService = new PopBufferMergeService(brokerController, popMessageProcessor); + popBufferMergeService.start(); PopCheckPoint ck = new PopCheckPoint(); ck.setBitMap(0); int msgCnt = 1; @@ -124,8 +97,8 @@ public class PopBufferMergeServiceTest { ck.setInvisibleTime(invisibleTime); int offset = 100; ck.setStartOffset(offset); - ck.setCId(defaultGroup); - ck.setTopic(defaultTopic); + ck.setCId(group); + ck.setTopic(topic); int queueId = 0; ck.setQueueId(queueId); @@ -135,93 +108,18 @@ public class PopBufferMergeServiceTest { AckMsg ackMsg = new AckMsg(); ackMsg.setAckOffset(ackOffset); ackMsg.setStartOffset(offset); - ackMsg.setConsumerGroup(defaultGroup); - ackMsg.setTopic(defaultTopic); + ackMsg.setConsumerGroup(group); + ackMsg.setTopic(topic); ackMsg.setQueueId(queueId); ackMsg.setPopTime(popTime); try { assertThat(popBufferMergeService.addCk(ck, reviveQid, ackOffset, nextBeginOffset)).isTrue(); - assertThat(popBufferMergeService.getLatestOffset(defaultTopic, defaultGroup, queueId)).isEqualTo(nextBeginOffset); + assertThat(popBufferMergeService.getLatestOffset(topic, group, queueId)).isEqualTo(nextBeginOffset); Thread.sleep(1000); // wait background threads of PopBufferMergeService run for some time assertThat(popBufferMergeService.addAk(reviveQid, ackMsg)).isTrue(); - assertThat(popBufferMergeService.getLatestOffset(defaultTopic, defaultGroup, queueId)).isEqualTo(nextBeginOffset); + assertThat(popBufferMergeService.getLatestOffset(topic, group, queueId)).isEqualTo(nextBeginOffset); } finally { popBufferMergeService.shutdown(true); } } - - @Test - public void testAddCkJustOffset_MergeKeyConflict() { - PopCheckPoint point = mock(PopCheckPoint.class); - String mergeKey = "testMergeKey"; - when(point.getTopic()).thenReturn(mergeKey); - when(point.getCId()).thenReturn(""); - when(point.getQueueId()).thenReturn(0); - when(point.getStartOffset()).thenReturn(0L); - when(point.getPopTime()).thenReturn(0L); - when(point.getBrokerName()).thenReturn(""); - popBufferMergeService.buffer.put(mergeKey + "000", mock(PopBufferMergeService.PopCheckPointWrapper.class)); - - assertFalse(popBufferMergeService.addCkJustOffset(point, 0, 0, 0)); - } - - @Test - public void testAddCkMock() { - int queueId = 0; - long startOffset = 100L; - long invisibleTime = 30_000L; - long popTime = System.currentTimeMillis(); - int reviveQueueId = 0; - long nextBeginOffset = 101L; - String brokerName = "brokerName"; - popBufferMergeService.addCkMock(defaultGroup, defaultTopic, queueId, startOffset, invisibleTime, popTime, reviveQueueId, nextBeginOffset, brokerName); - verify(brokerConfig, times(1)).isEnablePopLog(); - } - - @Test - public void testPutAckToStore() throws Exception { - PopCheckPoint point = new PopCheckPoint(); - point.setStartOffset(100L); - point.setCId("testGroup"); - point.setTopic("testTopic"); - point.setQueueId(1); - point.setPopTime(System.currentTimeMillis()); - point.setBrokerName("testBroker"); - - PopBufferMergeService.PopCheckPointWrapper pointWrapper = mock(PopBufferMergeService.PopCheckPointWrapper.class); - when(pointWrapper.getCk()).thenReturn(point); - when(pointWrapper.getReviveQueueId()).thenReturn(0); - - AtomicInteger toStoreBits = new AtomicInteger(0); - when(pointWrapper.getToStoreBits()).thenReturn(toStoreBits); - - byte msgIndex = 0; - AtomicInteger count = new AtomicInteger(0); - - EscapeBridge escapeBridge = mock(EscapeBridge.class); - when(brokerController.getEscapeBridge()).thenReturn(escapeBridge); - when(brokerController.getBrokerConfig().isAppendAckAsync()).thenReturn(false); - - when(escapeBridge.putMessageToSpecificQueue(any())).thenAnswer(invocation -> { - MessageExtBrokerInner capturedMessage = invocation.getArgument(0); - AckMsg ackMsg = JSON.parseObject(capturedMessage.getBody(), AckMsg.class); - - assertEquals(point.ackOffsetByIndex(msgIndex), ackMsg.getAckOffset()); - assertEquals(point.getStartOffset(), ackMsg.getStartOffset()); - assertEquals(point.getCId(), ackMsg.getConsumerGroup()); - assertEquals(point.getTopic(), ackMsg.getTopic()); - assertEquals(point.getQueueId(), ackMsg.getQueueId()); - assertEquals(point.getPopTime(), ackMsg.getPopTime()); - assertEquals(point.getBrokerName(), ackMsg.getBrokerName()); - - PutMessageResult result = mock(PutMessageResult.class); - when(result.getPutMessageStatus()).thenReturn(PutMessageStatus.PUT_OK); - return result; - }); - - Method method = PopBufferMergeService.class.getDeclaredMethod("putAckToStore", PopBufferMergeService.PopCheckPointWrapper.class, byte.class, AtomicInteger.class); - method.setAccessible(true); - method.invoke(popBufferMergeService, pointWrapper, msgIndex, count); - verify(escapeBridge, times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); - } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java index 28476149ab..fdb0690e5d 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java @@ -16,9 +16,10 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson2.JSON; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.embedded.EmbeddedChannel; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.common.BrokerConfig; @@ -26,7 +27,6 @@ import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.ConsumeInitMode; import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; @@ -42,7 +42,7 @@ import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.exception.ConsumeQueueException; import org.apache.rocketmq.store.logfile.DefaultMappedFile; -import org.apache.rocketmq.store.pop.PopCheckPoint; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -50,13 +50,8 @@ import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.CompletableFuture; - import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -173,17 +168,17 @@ public class PopMessageProcessorTest { .thenReturn(CompletableFuture.completedFuture(getMessageResult)); long offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 0); - assertEquals(-1, offset); + Assert.assertEquals(-1, offset); RemotingCommand request = createPopMsgCommand(newGroup, topic, 0, ConsumeInitMode.MAX); popMessageProcessor.processRequest(handlerContext, request); offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 0); - assertEquals(minOffset, offset); + Assert.assertEquals(minOffset, offset); when(messageStore.getMinOffsetInQueue(retryTopic, 0)).thenReturn(minOffset * 2); popMessageProcessor.processRequest(handlerContext, request); offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 0); - assertEquals(minOffset, offset); // will not entry getInitOffset() again + Assert.assertEquals(minOffset, offset); // will not entry getInitOffset() again messageStore.getMinOffsetInQueue(retryTopic, 0); // prevent UnnecessaryStubbingException } @@ -198,17 +193,17 @@ public class PopMessageProcessorTest { .thenReturn(CompletableFuture.completedFuture(getMessageResult)); long offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0); - assertEquals(-1, offset); + Assert.assertEquals(-1, offset); RemotingCommand request = createPopMsgCommand(newGroup, topic, 0, ConsumeInitMode.MAX); popMessageProcessor.processRequest(handlerContext, request); offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0); - assertEquals(maxOffset - 1, offset); // checkInMem return false + Assert.assertEquals(maxOffset - 1, offset); // checkInMem return false when(messageStore.getMaxOffsetInQueue(topic, 0)).thenReturn(maxOffset * 2); popMessageProcessor.processRequest(handlerContext, request); offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0); - assertEquals(maxOffset - 1, offset); // will not entry getInitOffset() again + Assert.assertEquals(maxOffset - 1, offset); // will not entry getInitOffset() again messageStore.getMaxOffsetInQueue(topic, 0); // prevent UnnecessaryStubbingException } @@ -245,31 +240,4 @@ public class PopMessageProcessorTest { } return getMessageResult; } - - @Test - public void testBuildCkMsgJsonParsing() { - PopCheckPoint ck = new PopCheckPoint(); - ck.setTopic("TestTopic"); - ck.setQueueId(1); - ck.setStartOffset(100L); - ck.setCId("TestConsumer"); - ck.setPopTime(System.currentTimeMillis()); - ck.setBrokerName("TestBroker"); - - int reviveQid = 0; - PopMessageProcessor processor = new PopMessageProcessor(brokerController); - - MessageExtBrokerInner result = processor.buildCkMsg(ck, reviveQid); - - String jsonBody = new String(result.getBody(), StandardCharsets.UTF_8); - PopCheckPoint actual = JSON.parseObject(jsonBody, PopCheckPoint.class); - - assertEquals(ck.getTopic(), actual.getTopic()); - assertEquals(ck.getQueueId(), actual.getQueueId()); - assertEquals(ck.getStartOffset(), actual.getStartOffset()); - assertEquals(ck.getCId(), actual.getCId()); - assertEquals(ck.getPopTime(), actual.getPopTime()); - assertEquals(ck.getBrokerName(), actual.getBrokerName()); - assertEquals(ck.getReviveTime(), actual.getReviveTime()); - } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java index e6a2cdb6cd..3010e83610 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java @@ -16,7 +16,13 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson.JSON; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.tuple.Triple; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.failover.EscapeBridge; @@ -34,13 +40,12 @@ import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.common.utils.DataConverter; import org.apache.rocketmq.common.utils.NetworkUtil; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; -import org.apache.rocketmq.store.AppendMessageResult; -import org.apache.rocketmq.store.AppendMessageStatus; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; +import org.apache.rocketmq.store.AppendMessageResult; +import org.apache.rocketmq.store.AppendMessageStatus; import org.apache.rocketmq.store.pop.AckMsg; -import org.apache.rocketmq.store.pop.BatchAckMsg; import org.apache.rocketmq.store.pop.PopCheckPoint; import org.apache.rocketmq.store.timer.TimerMessageStore; import org.junit.Assert; @@ -51,27 +56,18 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.times; @RunWith(MockitoJUnitRunner.Silent.class) public class PopReviveServiceTest { @@ -409,59 +405,6 @@ public class PopReviveServiceTest { verify(messageStore, times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK } - @Test - public void testReviveMsgFromBatchAck() throws Throwable { - brokerConfig.setEnableSkipLongAwaitingAck(true); - when(consumerOffsetManager.queryOffset(PopAckConstants.REVIVE_GROUP, REVIVE_TOPIC, REVIVE_QUEUE_ID)).thenReturn(0L); - List<MessageExt> reviveMessageExtList = new ArrayList<>(); - long basePopTime = System.currentTimeMillis(); - reviveMessageExtList.add(buildBatchAckMsg(buildBatchAckMsg(Arrays.asList(1L, 2L, 3L), basePopTime), 1, 1, basePopTime)); - doReturn(reviveMessageExtList, new ArrayList<>()).when(popReviveService).getReviveMessage(anyLong(), anyInt()); - - PopReviveService.ConsumeReviveObj consumeReviveObj = new PopReviveService.ConsumeReviveObj(); - popReviveService.consumeReviveMessage(consumeReviveObj); - assertEquals(1, consumeReviveObj.map.size()); - - ArgumentCaptor<Long> commitOffsetCaptor = ArgumentCaptor.forClass(Long.class); - doNothing().when(consumerOffsetManager).commitOffset(anyString(), anyString(), anyString(), anyInt(), commitOffsetCaptor.capture()); - popReviveService.mergeAndRevive(consumeReviveObj); - assertEquals(1, commitOffsetCaptor.getValue().longValue()); - } - - public static MessageExtBrokerInner buildBatchAckMsg(BatchAckMsg batchAckMsg, long deliverMs, long reviveOffset, long deliverTime) { - MessageExtBrokerInner result = buildBatchAckInnerMessage(REVIVE_TOPIC, batchAckMsg, REVIVE_QUEUE_ID, STORE_HOST, deliverMs, PopMessageProcessor.genAckUniqueId(batchAckMsg)); - result.setQueueOffset(reviveOffset); - result.setDeliverTimeMs(deliverMs); - result.setStoreTimestamp(deliverTime); - return result; - } - - public static BatchAckMsg buildBatchAckMsg(Collection<Long> offsets, long popTime) { - BatchAckMsg result = new BatchAckMsg(); - result.setConsumerGroup(GROUP); - result.setTopic(TOPIC); - result.setQueueId(0); - result.setPopTime(popTime); - result.setBrokerName("broker-a"); - result.getAckOffsetList().addAll(offsets); - return result; - } - - public static MessageExtBrokerInner buildBatchAckInnerMessage(String reviveTopic, AckMsg ackMsg, int reviveQid, SocketAddress host, long deliverMs, String ackUniqueId) { - MessageExtBrokerInner result = new MessageExtBrokerInner(); - result.setTopic(reviveTopic); - result.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.CHARSET_UTF8)); - result.setQueueId(reviveQid); - result.setTags(PopAckConstants.BATCH_ACK_TAG); - result.setBornTimestamp(System.currentTimeMillis()); - result.setBornHost(host); - result.setStoreHost(host); - result.setDeliverTimeMs(deliverMs); - result.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, ackUniqueId); - result.setPropertiesString(MessageDecoder.messageProperties2String(result.getProperties())); - return result; - } - public static PopCheckPoint buildPopCheckPoint(long startOffset, long popTime, long reviveOffset) { PopCheckPoint ck = new PopCheckPoint(); ck.setStartOffset(startOffset); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java index 9b25e0134c..b74e57ab93 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java @@ -17,11 +17,15 @@ package org.apache.rocketmq.broker.topic; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONWriter; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.remoting.protocol.body.TopicQueueMappingSerializeWrapper; import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.remoting.protocol.statictopic.TopicRemappingDetailWrapper; @@ -33,16 +37,6 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -85,9 +79,9 @@ public class TopicQueueMappingManagerTest { String topic = UUID.randomUUID().toString(); int queueNum = 10; TopicRemappingDetailWrapper topicRemappingDetailWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, brokers, new HashMap<>()); - assertEquals(1, topicRemappingDetailWrapper.getBrokerConfigMap().size()); + Assert.assertEquals(1, topicRemappingDetailWrapper.getBrokerConfigMap().size()); TopicQueueMappingDetail topicQueueMappingDetail = topicRemappingDetailWrapper.getBrokerConfigMap().values().iterator().next().getMappingDetail(); - assertEquals(queueNum, topicQueueMappingDetail.getHostedQueues().size()); + Assert.assertEquals(queueNum, topicQueueMappingDetail.getHostedQueues().size()); mappingDetailMap.put(topic, topicQueueMappingDetail); } } @@ -95,7 +89,7 @@ public class TopicQueueMappingManagerTest { { topicQueueMappingManager = new TopicQueueMappingManager(brokerController); Assert.assertTrue(topicQueueMappingManager.load()); - assertEquals(0, topicQueueMappingManager.getTopicQueueMappingTable().size()); + Assert.assertEquals(0, topicQueueMappingManager.getTopicQueueMappingTable().size()); for (TopicQueueMappingDetail mappingDetail : mappingDetailMap.values()) { for (int i = 0; i < 10; i++) { topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, false, true); @@ -107,49 +101,11 @@ public class TopicQueueMappingManagerTest { { topicQueueMappingManager = new TopicQueueMappingManager(brokerController); Assert.assertTrue(topicQueueMappingManager.load()); - assertEquals(mappingDetailMap.size(), topicQueueMappingManager.getTopicQueueMappingTable().size()); + Assert.assertEquals(mappingDetailMap.size(), topicQueueMappingManager.getTopicQueueMappingTable().size()); for (TopicQueueMappingDetail topicQueueMappingDetail: topicQueueMappingManager.getTopicQueueMappingTable().values()) { - assertEquals(topicQueueMappingDetail, mappingDetailMap.get(topicQueueMappingDetail.getTopic())); + Assert.assertEquals(topicQueueMappingDetail, mappingDetailMap.get(topicQueueMappingDetail.getTopic())); } } delete(topicQueueMappingManager); } - - @Test - public void testEncodePretty() { - TopicQueueMappingManager topicQueueMappingManager = new TopicQueueMappingManager(null); - TopicQueueMappingDetail detail = new TopicQueueMappingDetail(); - detail.setTopic("testTopic"); - detail.setBname("testBroker"); - - topicQueueMappingManager.getTopicQueueMappingTable().put("testTopic", detail); - topicQueueMappingManager.getDataVersion().nextVersion(); - - String actual = topicQueueMappingManager.encode(true); - TopicQueueMappingSerializeWrapper expectedWrapper = new TopicQueueMappingSerializeWrapper(); - expectedWrapper.setTopicQueueMappingInfoMap(new ConcurrentHashMap<>(topicQueueMappingManager.getTopicQueueMappingTable())); - expectedWrapper.setDataVersion(topicQueueMappingManager.getDataVersion()); - String expected = JSON.toJSONString(expectedWrapper, JSONWriter.Feature.PrettyFormat); - - assertEquals(expected, actual); - } - - @Test - public void testEncodeNonPretty() { - TopicQueueMappingManager topicQueueMappingManager = new TopicQueueMappingManager(null); - TopicQueueMappingDetail detail = new TopicQueueMappingDetail(); - detail.setTopic("testTopic"); - detail.setBname("testBroker"); - - topicQueueMappingManager.getTopicQueueMappingTable().put("testTopic", detail); - topicQueueMappingManager.getDataVersion().nextVersion(); - - String actual = topicQueueMappingManager.encode(false); - TopicQueueMappingSerializeWrapper expectedWrapper = new TopicQueueMappingSerializeWrapper(); - expectedWrapper.setTopicQueueMappingInfoMap(new ConcurrentHashMap<>(topicQueueMappingManager.getTopicQueueMappingTable())); - expectedWrapper.setDataVersion(topicQueueMappingManager.getDataVersion()); - String expected = JSON.toJSONString(expectedWrapper); - - assertEquals(expected, actual); - } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java index 62a6ad8b5b..690b4eabb5 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java @@ -19,40 +19,23 @@ package org.apache.rocketmq.broker.transaction.queue; import org.apache.rocketmq.broker.transaction.TransactionMetrics; import org.apache.rocketmq.broker.transaction.TransactionMetrics.Metric; -import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; -import java.io.File; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.Collections; -import java.util.UUID; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; @RunWith(MockitoJUnitRunner.class) public class TransactionMetricsTest { private TransactionMetrics transactionMetrics; private String configPath; - private Path path; @Before - public void before() throws Exception { - configPath = createBaseDir(); - path = Paths.get(configPath); - transactionMetrics = spy(new TransactionMetrics(configPath)); - } - - @After - public void after() throws Exception { - deleteFile(configPath); - assertFalse(path.toFile().exists()); + public void setUp() throws Exception { + configPath = "configPath"; + transactionMetrics = new TransactionMetrics(configPath); } /** @@ -97,40 +80,4 @@ public class TransactionMetricsTest { transactionMetrics.cleanMetrics(Collections.singleton(topic)); assert transactionMetrics.getTransactionCount(topic) == 0; } - - @Test - public void testPersist() { - assertFalse(path.toFile().exists()); - transactionMetrics.persist(); - assertTrue(path.toFile().exists()); - verify(transactionMetrics).persist(); - } - - private String createBaseDir() { - String baseDir = System.getProperty("java.io.tmpdir") + File.separator + "unitteststore-" + UUID.randomUUID(); - final File file = new File(baseDir); - if (file.exists()) { - System.exit(1); - } - return baseDir; - } - - private void deleteFile(String fileName) { - deleteFile(new File(fileName)); - } - - private void deleteFile(File file) { - if (!file.exists()) { - return; - } - if (file.isFile()) { - file.delete(); - } else if (file.isDirectory()) { - File[] files = file.listFiles(); - for (File file1 : files) { - deleteFile(file1); - } - file.delete(); - } - } } diff --git a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java index 67cf045cfb..38e0a20752 100644 --- a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java +++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java @@ -16,7 +16,7 @@ */ package org.apache.rocketmq.store.pop; -import com.alibaba.fastjson2.annotation.JSONField; +import com.alibaba.fastjson.annotation.JSONField; import java.util.ArrayList; import java.util.List; @@ -35,6 +35,7 @@ public class PopCheckPoint implements Comparable<PopCheckPoint> { private int queueId; @JSONField(name = "t") private String topic; + @JSONField(name = "c") private String cid; @JSONField(name = "ro") private long reviveOffset; @@ -113,12 +114,10 @@ public class PopCheckPoint implements Comparable<PopCheckPoint> { this.topic = topic; } - @JSONField(name = "c") public String getCId() { return cid; } - @JSONField(name = "c") public void setCId(String cid) { this.cid = cid; }