This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new dd62ed0f3b [ISSUE #8892] Add test cases to config manager v2 (#8873)
dd62ed0f3b is described below

commit dd62ed0f3b16919adec5d5eece21a1050dc9c5a0
Author: Zhanhui Li <lizhan...@apache.org>
AuthorDate: Tue Oct 29 20:07:49 2024 +0800

    [ISSUE #8892] Add test cases to config manager v2 (#8873)
    
    * fix: add unit test for TopicConfigManagerV2
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * fix: add unit test cases for config manager v2
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    * chore: add copyright header
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
    
    ---------
    
    Signed-off-by: Li Zhanhui <lizhan...@gmail.com>
---
 .../rocketmq/broker/config/v2/ConfigStorage.java   |  14 +-
 .../broker/config/v2/ConsumerOffsetManagerV2.java  |   2 +
 .../config/v2/SubscriptionGroupManagerV2.java      |   2 +
 .../config/v2/ConsumerOffsetManagerV2Test.java     | 193 +++++++++++++++++++++
 .../config/v2/SubscriptionGroupManagerV2Test.java  | 141 +++++++++++++++
 .../broker/config/v2/TopicConfigManagerV2Test.java | 123 +++++++++++++
 6 files changed, 471 insertions(+), 4 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
index af259aaa37..a31b573daa 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java
@@ -27,11 +27,11 @@ import 
org.apache.rocketmq.common.config.AbstractRocksDBStorage;
 import org.apache.rocketmq.common.config.ConfigHelper;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DirectSlice;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
 
@@ -112,10 +112,16 @@ public class ConfigStorage extends AbstractRocksDBStorage 
{
             readOptions.setTotalOrderSeek(true);
             readOptions.setTailing(false);
             readOptions.setAutoPrefixMode(true);
-            readOptions.setIterateLowerBound(new DirectSlice(beginKey));
-            readOptions.setIterateUpperBound(new DirectSlice(endKey));
+            // Use DirectSlice till the follow issue is fixed:
+            // https://github.com/facebook/rocksdb/issues/13098
+            //
+            // readOptions.setIterateUpperBound(new DirectSlice(endKey));
+            byte[] buf = new byte[endKey.remaining()];
+            endKey.slice().get(buf);
+            readOptions.setIterateUpperBound(new Slice(buf));
+
             RocksIterator iterator = db.newIterator(defaultCFHandle, 
readOptions);
-            iterator.seekToFirst();
+            iterator.seek(beginKey.slice());
             return iterator;
         }
     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
index 5b0885c491..2c5d3677d8 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
@@ -390,10 +390,12 @@ public class ConsumerOffsetManagerV2 extends 
ConsumerOffsetManager {
 
         ByteBuf keyBuf = keyOfPullOffset(group, topic, queueId);
         ByteBuf valueBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(8);
+        valueBuf.writeLong(offset);
         try (WriteBatch writeBatch = new WriteBatch()) {
             writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
             long stateMachineVersion = brokerController.getMessageStore() != 
null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
             ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+            configStorage.write(writeBatch);
         } catch (RocksDBException e) {
             LOG.error("Failed to commit pull offset. group={}, topic={}, 
queueId={}, offset={}",
                 group, topic, queueId, offset);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
index 8da6f9d2bc..f535fa195a 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java
@@ -74,6 +74,7 @@ public class SubscriptionGroupManagerV2 extends 
SubscriptionGroupManager {
                 if (null != subscriptionGroupConfig) {
                     
super.updateSubscriptionGroupConfigWithoutPersist(subscriptionGroupConfig);
                 }
+                iterator.next();
             }
         } finally {
             beginKey.release();
@@ -163,6 +164,7 @@ public class SubscriptionGroupManagerV2 extends 
SubscriptionGroupManager {
             writeBatch.delete(ConfigHelper.readBytes(keyBuf));
             long stateMachineVersion = 
brokerController.getMessageStore().getStateMachineVersion();
             ConfigHelper.stampDataVersion(writeBatch, dataVersion, 
stateMachineVersion);
+            configStorage.write(writeBatch);
         } catch (RocksDBException e) {
             log.error("Failed to remove subscription group config by 
group-name={}", groupName, e);
         }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java
new file mode 100644
index 0000000000..d7f46855e1
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.config.v2;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ConsumerOffsetManagerV2Test {
+
+    private ConfigStorage configStorage;
+
+    private ConsumerOffsetManagerV2 consumerOffsetManagerV2;
+
+    @Mock
+    private BrokerController controller;
+
+    @Rule
+    public TemporaryFolder tf = new TemporaryFolder();
+
+    @After
+    public void cleanUp() {
+        if (null != configStorage) {
+            configStorage.shutdown();
+        }
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();
+
+        File configStoreDir = tf.newFolder();
+        configStorage = new ConfigStorage(configStoreDir.getAbsolutePath());
+        configStorage.start();
+        consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, 
configStorage);
+    }
+
+    /**
+     * Verify consumer offset can survive restarts
+     */
+    @Test
+    public void testCommitOffset_Standard() {
+        Assert.assertTrue(consumerOffsetManagerV2.load());
+
+        String clientHost = "localhost";
+        String topic = "T1";
+        String group = "G0";
+        int queueId = 1;
+        long queueOffset = 100;
+        consumerOffsetManagerV2.commitOffset(clientHost, group, topic, 
queueId, queueOffset);
+        Assert.assertEquals(queueOffset, 
consumerOffsetManagerV2.queryOffset(group, topic, queueId));
+
+        configStorage.shutdown();
+        consumerOffsetManagerV2.getOffsetTable().clear();
+        Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, 
topic, queueId));
+
+        configStorage.start();
+        consumerOffsetManagerV2.load();
+        Assert.assertEquals(queueOffset, 
consumerOffsetManagerV2.queryOffset(group, topic, queueId));
+    }
+
+    /**
+     * Verify commit offset can survive config store restart
+     */
+    @Test
+    public void testCommitOffset_LMQ() {
+        Assert.assertTrue(consumerOffsetManagerV2.load());
+
+        String clientHost = "localhost";
+        String topic = MixAll.LMQ_PREFIX + "T1";
+        String group = "G0";
+        int queueId = 1;
+        long queueOffset = 100;
+        consumerOffsetManagerV2.commitOffset(clientHost, group, topic, 
queueId, queueOffset);
+        Assert.assertEquals(queueOffset, 
consumerOffsetManagerV2.queryOffset(group, topic, queueId));
+
+        configStorage.shutdown();
+
+        configStorage.start();
+        consumerOffsetManagerV2.load();
+        Assert.assertEquals(queueOffset, 
consumerOffsetManagerV2.queryOffset(group, topic, queueId));
+    }
+
+
+    /**
+     * Verify commit offset can survive config store restart
+     */
+    @Test
+    public void testCommitPullOffset_LMQ() {
+        Assert.assertTrue(consumerOffsetManagerV2.load());
+
+        String clientHost = "localhost";
+        String topic = MixAll.LMQ_PREFIX + "T1";
+        String group = "G0";
+        int queueId = 1;
+        long queueOffset = 100;
+        consumerOffsetManagerV2.commitPullOffset(clientHost, group, topic, 
queueId, queueOffset);
+        Assert.assertEquals(queueOffset, 
consumerOffsetManagerV2.queryPullOffset(group, topic, queueId));
+
+        configStorage.shutdown();
+
+        configStorage.start();
+        consumerOffsetManagerV2.load();
+        Assert.assertEquals(queueOffset, 
consumerOffsetManagerV2.queryPullOffset(group, topic, queueId));
+    }
+
+    /**
+     * Verify commit offset can survive config store restart
+     */
+    @Test
+    public void testRemoveByTopicAtGroup() {
+        Assert.assertTrue(consumerOffsetManagerV2.load());
+
+        String clientHost = "localhost";
+        String topic = MixAll.LMQ_PREFIX + "T1";
+        String topic2 = MixAll.LMQ_PREFIX + "T2";
+        String group = "G0";
+        int queueId = 1;
+        long queueOffset = 100;
+        consumerOffsetManagerV2.commitOffset(clientHost, group, topic, 
queueId, queueOffset);
+        consumerOffsetManagerV2.commitOffset(clientHost, group, topic2, 
queueId, queueOffset);
+        Assert.assertEquals(queueOffset, 
consumerOffsetManagerV2.queryOffset(group, topic, queueId));
+        Assert.assertEquals(queueOffset, 
consumerOffsetManagerV2.queryOffset(group, topic2, queueId));
+
+        consumerOffsetManagerV2.removeConsumerOffset(topic + 
ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + group);
+        Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, 
topic, queueId));
+        Assert.assertEquals(queueOffset, 
consumerOffsetManagerV2.queryOffset(group, topic2, queueId));
+
+        configStorage.shutdown();
+        configStorage.start();
+        consumerOffsetManagerV2.load();
+        Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, 
topic, queueId));
+        Assert.assertEquals(queueOffset, 
consumerOffsetManagerV2.queryOffset(group, topic2, queueId));
+    }
+
+    /**
+     * Verify commit offset can survive config store restart
+     */
+    @Test
+    public void testRemoveByGroup() {
+        Assert.assertTrue(consumerOffsetManagerV2.load());
+
+        String clientHost = "localhost";
+        String topic = MixAll.LMQ_PREFIX + "T1";
+        String topic2 = MixAll.LMQ_PREFIX + "T2";
+        String group = "G0";
+        int queueId = 1;
+        long queueOffset = 100;
+        consumerOffsetManagerV2.commitOffset(clientHost, group, topic, 
queueId, queueOffset);
+        consumerOffsetManagerV2.commitOffset(clientHost, group, topic2, 
queueId, queueOffset);
+        Assert.assertEquals(queueOffset, 
consumerOffsetManagerV2.queryOffset(group, topic, queueId));
+        consumerOffsetManagerV2.removeOffset(group);
+        Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, 
topic, queueId));
+        Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, 
topic2, queueId));
+
+        configStorage.shutdown();
+        configStorage.start();
+        consumerOffsetManagerV2.load();
+        Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, 
topic, queueId));
+        Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, 
topic2, queueId));
+    }
+
+}
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
new file mode 100644
index 0000000000..6d436a7c4d
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.config.v2;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicy;
+import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicyType;
+import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.store.MessageStore;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SubscriptionGroupManagerV2Test {
+    private ConfigStorage configStorage;
+
+    private SubscriptionGroupManagerV2 subscriptionGroupManagerV2;
+
+    @Mock
+    private BrokerController controller;
+
+    @Mock
+    private MessageStore messageStore;
+
+    @Rule
+    public TemporaryFolder tf = new TemporaryFolder();
+
+    @After
+    public void cleanUp() {
+        if (null != configStorage) {
+            configStorage.shutdown();
+        }
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setAutoCreateSubscriptionGroup(false);
+        Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();
+
+        Mockito.doReturn(messageStore).when(controller).getMessageStore();
+        Mockito.doReturn(1L).when(messageStore).getStateMachineVersion();
+
+        File configStoreDir = tf.newFolder();
+        configStorage = new ConfigStorage(configStoreDir.getAbsolutePath());
+        configStorage.start();
+        subscriptionGroupManagerV2 = new 
SubscriptionGroupManagerV2(controller, configStorage);
+    }
+
+
+    @Test
+    public void testUpdateSubscriptionGroupConfig() {
+        SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
+        subscriptionGroupConfig.setGroupName("G1");
+        subscriptionGroupConfig.setConsumeEnable(true);
+        subscriptionGroupConfig.setRetryMaxTimes(16);
+        subscriptionGroupConfig.setGroupSysFlag(1);
+        GroupRetryPolicy retryPolicy = new GroupRetryPolicy();
+        retryPolicy.setType(GroupRetryPolicyType.EXPONENTIAL);
+        subscriptionGroupConfig.setGroupRetryPolicy(retryPolicy);
+        subscriptionGroupConfig.setBrokerId(1);
+        subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+        subscriptionGroupConfig.setConsumeMessageOrderly(true);
+        subscriptionGroupConfig.setConsumeTimeoutMinute(30);
+        subscriptionGroupConfig.setConsumeFromMinEnable(true);
+        subscriptionGroupConfig.setWhichBrokerWhenConsumeSlowly(1);
+        subscriptionGroupConfig.setNotifyConsumerIdsChangedEnable(true);
+        
subscriptionGroupManagerV2.updateSubscriptionGroupConfig(subscriptionGroupConfig);
+
+        SubscriptionGroupConfig found = 
subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
+        Assert.assertEquals(subscriptionGroupConfig, found);
+
+        subscriptionGroupManagerV2.getSubscriptionGroupTable().clear();
+        configStorage.shutdown();
+        configStorage.start();
+        subscriptionGroupManagerV2.load();
+        found = 
subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
+        Assert.assertEquals(subscriptionGroupConfig, found);
+    }
+
+
+    @Test
+    public void testDeleteSubscriptionGroupConfig() {
+        SubscriptionGroupConfig subscriptionGroupConfig = new 
SubscriptionGroupConfig();
+        subscriptionGroupConfig.setGroupName("G1");
+        subscriptionGroupConfig.setConsumeEnable(true);
+        subscriptionGroupConfig.setRetryMaxTimes(16);
+        subscriptionGroupConfig.setGroupSysFlag(1);
+        GroupRetryPolicy retryPolicy = new GroupRetryPolicy();
+        retryPolicy.setType(GroupRetryPolicyType.EXPONENTIAL);
+        subscriptionGroupConfig.setGroupRetryPolicy(retryPolicy);
+        subscriptionGroupConfig.setBrokerId(1);
+        subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+        subscriptionGroupConfig.setConsumeMessageOrderly(true);
+        subscriptionGroupConfig.setConsumeTimeoutMinute(30);
+        subscriptionGroupConfig.setConsumeFromMinEnable(true);
+        subscriptionGroupConfig.setWhichBrokerWhenConsumeSlowly(1);
+        subscriptionGroupConfig.setNotifyConsumerIdsChangedEnable(true);
+        
subscriptionGroupManagerV2.updateSubscriptionGroupConfig(subscriptionGroupConfig);
+
+        SubscriptionGroupConfig found = 
subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
+        Assert.assertEquals(subscriptionGroupConfig, found);
+        
subscriptionGroupManagerV2.removeSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
+
+        found = 
subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
+        Assert.assertNull(found);
+
+        configStorage.shutdown();
+        configStorage.start();
+        subscriptionGroupManagerV2.load();
+        found = 
subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
+        Assert.assertNull(found);
+    }
+
+}
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
new file mode 100644
index 0000000000..92c936b110
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.config.v2;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+
+@RunWith(value = MockitoJUnitRunner.class)
+public class TopicConfigManagerV2Test {
+
+    private ConfigStorage configStorage;
+
+    private TopicConfigManagerV2 topicConfigManagerV2;
+
+    @Mock
+    private BrokerController controller;
+
+    @Rule
+    public TemporaryFolder tf = new TemporaryFolder();
+
+    @After
+    public void cleanUp() {
+        if (null != configStorage) {
+            configStorage.shutdown();
+        }
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();
+
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        
Mockito.doReturn(messageStoreConfig).when(controller).getMessageStoreConfig();
+
+        File configStoreDir = tf.newFolder();
+        configStorage = new ConfigStorage(configStoreDir.getAbsolutePath());
+        configStorage.start();
+        topicConfigManagerV2 = new TopicConfigManagerV2(controller, 
configStorage);
+    }
+
+    @Test
+    public void testUpdateTopicConfig() {
+        TopicConfig topicConfig = new TopicConfig();
+        String topicName = "T1";
+        topicConfig.setTopicName(topicName);
+        topicConfig.setPerm(6);
+        topicConfig.setReadQueueNums(8);
+        topicConfig.setWriteQueueNums(4);
+        topicConfig.setOrder(true);
+        topicConfig.setTopicSysFlag(4);
+        topicConfigManagerV2.updateTopicConfig(topicConfig);
+
+        Assert.assertTrue(configStorage.shutdown());
+
+        topicConfigManagerV2.getTopicConfigTable().clear();
+
+        Assert.assertTrue(configStorage.start());
+        Assert.assertTrue(topicConfigManagerV2.load());
+
+        TopicConfig loaded = topicConfigManagerV2.selectTopicConfig(topicName);
+        Assert.assertNotNull(loaded);
+        Assert.assertEquals(topicName, loaded.getTopicName());
+        Assert.assertEquals(6, loaded.getPerm());
+        Assert.assertEquals(8, loaded.getReadQueueNums());
+        Assert.assertEquals(4, loaded.getWriteQueueNums());
+        Assert.assertTrue(loaded.isOrder());
+        Assert.assertEquals(4, loaded.getTopicSysFlag());
+
+        Assert.assertTrue(topicConfigManagerV2.containsTopic(topicName));
+    }
+
+    @Test
+    public void testRemoveTopicConfig() {
+        TopicConfig topicConfig = new TopicConfig();
+        String topicName = "T1";
+        topicConfig.setTopicName(topicName);
+        topicConfig.setPerm(6);
+        topicConfig.setReadQueueNums(8);
+        topicConfig.setWriteQueueNums(4);
+        topicConfig.setOrder(true);
+        topicConfig.setTopicSysFlag(4);
+        topicConfigManagerV2.updateTopicConfig(topicConfig);
+        topicConfigManagerV2.removeTopicConfig(topicName);
+        Assert.assertFalse(topicConfigManagerV2.containsTopic(topicName));
+        Assert.assertTrue(configStorage.shutdown());
+
+        Assert.assertTrue(configStorage.start());
+        Assert.assertTrue(topicConfigManagerV2.load());
+        Assert.assertFalse(topicConfigManagerV2.containsTopic(topicName));
+    }
+}

Reply via email to