This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new aaa4a4b5fc Add attribute for SubscriptionGroupConfig (#6891)
aaa4a4b5fc is described below
commit aaa4a4b5fcc1f3b395264375b9067303820cd38c
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Wed Jun 14 11:46:46 2023 +0800
Add attribute for SubscriptionGroupConfig (#6891)
---
.../subscription/SubscriptionGroupManager.java | 36 +++++-
.../rocketmq/broker/topic/TopicConfigManager.java | 106 +----------------
.../subscription/SubscriptionGroupManagerTest.java | 77 ++++++++++++
.../broker/topic/TopicConfigManagerTest.java | 17 ++-
.../common/SubscriptionGroupAttributes.java | 29 +++++
.../rocketmq/common/attribute/AttributeUtil.java | 132 +++++++++++++++++++++
.../subscription/SubscriptionGroupConfig.java | 18 ++-
.../command/consumer/UpdateSubGroupSubCommand.java | 12 ++
8 files changed, 312 insertions(+), 115 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index 808f370588..db8c8b6f23 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -16,7 +16,10 @@
*/
package org.apache.rocketmq.broker.subscription;
+import com.google.common.collect.ImmutableMap;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -26,11 +29,13 @@ import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.SubscriptionGroupAttributes;
+import org.apache.rocketmq.common.attribute.AttributeUtil;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.DataVersion;
-import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
@@ -111,6 +116,17 @@ public class SubscriptionGroupManager extends
ConfigManager {
}
public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig
config) {
+ Map<String, String> newAttributes = request(config);
+ Map<String, String> currentAttributes = current(config.getGroupName());
+
+ Map<String, String> finalAttributes =
AttributeUtil.alterCurrentAttributes(
+ this.subscriptionGroupTable.get(config.getGroupName()) == null,
+ SubscriptionGroupAttributes.ALL,
+ ImmutableMap.copyOf(currentAttributes),
+ ImmutableMap.copyOf(newAttributes));
+
+ config.setAttributes(finalAttributes);
+
SubscriptionGroupConfig old =
this.subscriptionGroupTable.put(config.getGroupName(), config);
if (old != null) {
log.info("update subscription group config, old: {} new: {}", old,
config);
@@ -315,4 +331,22 @@ public class SubscriptionGroupManager extends
ConfigManager {
return subscriptionGroupTable.containsKey(group);
}
+
+ private Map<String, String> request(SubscriptionGroupConfig
subscriptionGroupConfig) {
+ return subscriptionGroupConfig.getAttributes() == null ? new
HashMap<>() : subscriptionGroupConfig.getAttributes();
+ }
+
+ private Map<String, String> current(String groupName) {
+ SubscriptionGroupConfig subscriptionGroupConfig =
this.subscriptionGroupTable.get(groupName);
+ if (subscriptionGroupConfig == null) {
+ return new HashMap<>();
+ } else {
+ Map<String, String> attributes =
subscriptionGroupConfig.getAttributes();
+ if (attributes == null) {
+ return new HashMap<>();
+ } else {
+ return attributes;
+ }
+ }
+ }
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 16140d4cd8..e5fdd8675f 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -16,10 +16,8 @@
*/
package org.apache.rocketmq.broker.topic;
-import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
@@ -38,6 +36,7 @@ import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.attribute.Attribute;
+import org.apache.rocketmq.common.attribute.AttributeUtil;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
@@ -466,8 +465,9 @@ public class TopicConfigManager extends ConfigManager {
Map<String, String> newAttributes = request(topicConfig);
Map<String, String> currentAttributes =
current(topicConfig.getTopicName());
- Map<String, String> finalAttributes = alterCurrentAttributes(
+ Map<String, String> finalAttributes =
AttributeUtil.alterCurrentAttributes(
this.topicConfigTable.get(topicConfig.getTopicName()) == null,
+ TopicAttributes.ALL,
ImmutableMap.copyOf(currentAttributes),
ImmutableMap.copyOf(newAttributes));
@@ -628,106 +628,6 @@ public class TopicConfigManager extends ConfigManager {
}
}
- private Map<String, String> alterCurrentAttributes(boolean create,
ImmutableMap<String, String> currentAttributes,
- ImmutableMap<String, String> newAttributes) {
- Map<String, String> init = new HashMap<>();
- Map<String, String> add = new HashMap<>();
- Map<String, String> update = new HashMap<>();
- Map<String, String> delete = new HashMap<>();
- Set<String> keys = new HashSet<>();
-
- for (Entry<String, String> attribute : newAttributes.entrySet()) {
- String key = attribute.getKey();
- String realKey = realKey(key);
- String value = attribute.getValue();
-
- validate(realKey);
- duplicationCheck(keys, realKey);
-
- if (create) {
- if (key.startsWith("+")) {
- init.put(realKey, value);
- } else {
- throw new RuntimeException("only add attribute is
supported while creating topic. key: " + realKey);
- }
- } else {
- if (key.startsWith("+")) {
- if (!currentAttributes.containsKey(realKey)) {
- add.put(realKey, value);
- } else {
- update.put(realKey, value);
- }
- } else if (key.startsWith("-")) {
- if (!currentAttributes.containsKey(realKey)) {
- throw new RuntimeException("attempt to delete a
nonexistent key: " + realKey);
- }
- delete.put(realKey, value);
- } else {
- throw new RuntimeException("wrong format key: " + realKey);
- }
- }
- }
-
- validateAlter(init, true, false);
- validateAlter(add, false, false);
- validateAlter(update, false, false);
- validateAlter(delete, false, true);
-
- log.info("add: {}, update: {}, delete: {}", add, update, delete);
- HashMap<String, String> finalAttributes = new
HashMap<>(currentAttributes);
- finalAttributes.putAll(init);
- finalAttributes.putAll(add);
- finalAttributes.putAll(update);
- for (String s : delete.keySet()) {
- finalAttributes.remove(s);
- }
- return finalAttributes;
- }
-
- private void duplicationCheck(Set<String> keys, String key) {
- boolean notExist = keys.add(key);
- if (!notExist) {
- throw new RuntimeException("alter duplication key. key: " + key);
- }
- }
-
- private void validate(String kvAttribute) {
- if (Strings.isNullOrEmpty(kvAttribute)) {
- throw new RuntimeException("kv string format wrong.");
- }
-
- if (kvAttribute.contains("+")) {
- throw new RuntimeException("kv string format wrong.");
- }
-
- if (kvAttribute.contains("-")) {
- throw new RuntimeException("kv string format wrong.");
- }
- }
-
- private void validateAlter(Map<String, String> alter, boolean init,
boolean delete) {
- for (Entry<String, String> entry : alter.entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
-
- Attribute attribute = allAttributes().get(key);
- if (attribute == null) {
- throw new RuntimeException("unsupported key: " + key);
- }
- if (!init && !attribute.isChangeable()) {
- throw new RuntimeException("attempt to update an unchangeable
attribute. key: " + key);
- }
-
- if (!delete) {
- attribute.verify(value);
- }
- }
- }
-
- private String realKey(String key) {
- return key.substring(1);
- }
-
public boolean containsTopic(String topic) {
return topicConfigTable.containsKey(topic);
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
new file mode 100644
index 0000000000..6337c69ea7
--- /dev/null
+++
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManagerTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.subscription;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.SubscriptionGroupAttributes;
+import org.apache.rocketmq.common.attribute.BooleanAttribute;
+import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SubscriptionGroupManagerTest {
+ private String group = "group";
+ @Mock
+ private BrokerController brokerControllerMock;
+ private SubscriptionGroupManager subscriptionGroupManager;
+
+ @Before
+ public void before() {
+ SubscriptionGroupAttributes.ALL.put("test", new BooleanAttribute(
+ "test",
+ false,
+ false
+ ));
+ subscriptionGroupManager = spy(new
SubscriptionGroupManager(brokerControllerMock));
+ when(brokerControllerMock.getMessageStore()).thenReturn(null);
+ doNothing().when(subscriptionGroupManager).persist();
+ }
+
+ @Test
+ public void updateSubscriptionGroupConfig() {
+ SubscriptionGroupConfig subscriptionGroupConfig = new
SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(group);
+ Map<String, String> attr = ImmutableMap.of("+test", "true");
+ subscriptionGroupConfig.setAttributes(attr);
+
subscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig);
+ SubscriptionGroupConfig result =
subscriptionGroupManager.getSubscriptionGroupTable().get(group);
+ assertThat(result).isNotNull();
+ assertThat(result.getGroupName()).isEqualTo(group);
+ assertThat(result.getAttributes().get("test")).isEqualTo("true");
+
+
+ SubscriptionGroupConfig subscriptionGroupConfig1 = new
SubscriptionGroupConfig();
+ subscriptionGroupConfig1.setGroupName(group);
+ Map<String, String> attrRemove = ImmutableMap.of("-test", "");
+ subscriptionGroupConfig1.setAttributes(attrRemove);
+ assertThatThrownBy(() ->
subscriptionGroupManager.updateSubscriptionGroupConfig(subscriptionGroupConfig1))
+ .isInstanceOf(RuntimeException.class).hasMessage("attempt to
update an unchangeable attribute. key: test");
+ }
+}
\ No newline at end of file
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
index b77c44961a..6052a79d41 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -16,18 +16,22 @@
*/
package org.apache.rocketmq.broker.topic;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.common.attribute.Attribute;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicAttributes;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.Attribute;
import org.apache.rocketmq.common.attribute.BooleanAttribute;
+import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.attribute.EnumAttribute;
import org.apache.rocketmq.common.attribute.LongRangeAttribute;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.common.attribute.CQType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -35,14 +39,8 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
import static com.google.common.collect.Sets.newHashSet;
import static java.util.Arrays.asList;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -318,7 +316,6 @@ public class TopicConfigManagerTest {
supportedAttributes.put(supportAttribute.getName(),
supportAttribute);
}
- topicConfigManager = spy(topicConfigManager);
-
when(topicConfigManager.allAttributes()).thenReturn(supportedAttributes);
+ TopicAttributes.ALL.putAll(supportedAttributes);
}
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java
b/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java
new file mode 100644
index 0000000000..5b0072401c
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/SubscriptionGroupAttributes.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.common.attribute.Attribute;
+
+public class SubscriptionGroupAttributes {
+ public static final Map<String, Attribute> ALL;
+
+ static {
+ ALL = new HashMap<>();
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/attribute/AttributeUtil.java
b/common/src/main/java/org/apache/rocketmq/common/attribute/AttributeUtil.java
new file mode 100644
index 0000000000..a3646988c5
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/attribute/AttributeUtil.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.attribute;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+public class AttributeUtil {
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+ public static Map<String, String> alterCurrentAttributes(boolean create,
Map<String, Attribute> all,
+ ImmutableMap<String, String> currentAttributes, ImmutableMap<String,
String> newAttributes) {
+
+ Map<String, String> init = new HashMap<>();
+ Map<String, String> add = new HashMap<>();
+ Map<String, String> update = new HashMap<>();
+ Map<String, String> delete = new HashMap<>();
+ Set<String> keys = new HashSet<>();
+
+ for (Map.Entry<String, String> attribute : newAttributes.entrySet()) {
+ String key = attribute.getKey();
+ String realKey = realKey(key);
+ String value = attribute.getValue();
+
+ validate(realKey);
+ duplicationCheck(keys, realKey);
+
+ if (create) {
+ if (key.startsWith("+")) {
+ init.put(realKey, value);
+ } else {
+ throw new RuntimeException("only add attribute is
supported while creating topic. key: " + realKey);
+ }
+ } else {
+ if (key.startsWith("+")) {
+ if (!currentAttributes.containsKey(realKey)) {
+ add.put(realKey, value);
+ } else {
+ update.put(realKey, value);
+ }
+ } else if (key.startsWith("-")) {
+ if (!currentAttributes.containsKey(realKey)) {
+ throw new RuntimeException("attempt to delete a
nonexistent key: " + realKey);
+ }
+ delete.put(realKey, value);
+ } else {
+ throw new RuntimeException("wrong format key: " + realKey);
+ }
+ }
+ }
+
+ validateAlter(all, init, true, false);
+ validateAlter(all, add, false, false);
+ validateAlter(all, update, false, false);
+ validateAlter(all, delete, false, true);
+
+ log.info("add: {}, update: {}, delete: {}", add, update, delete);
+ HashMap<String, String> finalAttributes = new
HashMap<>(currentAttributes);
+ finalAttributes.putAll(init);
+ finalAttributes.putAll(add);
+ finalAttributes.putAll(update);
+ for (String s : delete.keySet()) {
+ finalAttributes.remove(s);
+ }
+ return finalAttributes;
+ }
+
+ private static void duplicationCheck(Set<String> keys, String key) {
+ boolean notExist = keys.add(key);
+ if (!notExist) {
+ throw new RuntimeException("alter duplication key. key: " + key);
+ }
+ }
+
+ private static void validate(String kvAttribute) {
+ if (Strings.isNullOrEmpty(kvAttribute)) {
+ throw new RuntimeException("kv string format wrong.");
+ }
+
+ if (kvAttribute.contains("+")) {
+ throw new RuntimeException("kv string format wrong.");
+ }
+
+ if (kvAttribute.contains("-")) {
+ throw new RuntimeException("kv string format wrong.");
+ }
+ }
+
+ private static void validateAlter(Map<String, Attribute> all, Map<String,
String> alter, boolean init, boolean delete) {
+ for (Map.Entry<String, String> entry : alter.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ Attribute attribute = all.get(key);
+ if (attribute == null) {
+ throw new RuntimeException("unsupported key: " + key);
+ }
+ if (!init && !attribute.isChangeable()) {
+ throw new RuntimeException("attempt to update an unchangeable
attribute. key: " + key);
+ }
+
+ if (!delete) {
+ attribute.verify(value);
+ }
+ }
+ }
+
+ private static String realKey(String key) {
+ return key.substring(1);
+ }
+}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
index 799c7492e9..5522059aaf 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/subscription/SubscriptionGroupConfig.java
@@ -18,6 +18,8 @@
package org.apache.rocketmq.remoting.protocol.subscription;
import com.google.common.base.MoreObjects;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.rocketmq.common.MixAll;
@@ -49,6 +51,8 @@ public class SubscriptionGroupConfig {
private Set<SimpleSubscriptionData> subscriptionDataSet;
+ private Map<String, String> attributes = new HashMap<>();
+
public String getGroupName() {
return groupName;
}
@@ -161,6 +165,14 @@ public class SubscriptionGroupConfig {
this.subscriptionDataSet = subscriptionDataSet;
}
+ public Map<String, String> getAttributes() {
+ return attributes;
+ }
+
+ public void setAttributes(Map<String, String> attributes) {
+ this.attributes = attributes;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
@@ -178,6 +190,7 @@ public class SubscriptionGroupConfig {
result = prime * result + groupSysFlag;
result = prime * result + consumeTimeoutMinute;
result = prime * result + subscriptionDataSet.hashCode();
+ result = prime * result + attributes.hashCode();
return result;
}
@@ -202,6 +215,7 @@ public class SubscriptionGroupConfig {
.append(groupSysFlag, other.groupSysFlag)
.append(consumeTimeoutMinute, other.consumeTimeoutMinute)
.append(subscriptionDataSet, other.subscriptionDataSet)
+ .append(attributes, other.attributes)
.isEquals();
}
@@ -216,11 +230,13 @@ public class SubscriptionGroupConfig {
.add("retryQueueNums", retryQueueNums)
.add("retryMaxTimes", retryMaxTimes)
.add("groupRetryPolicy", groupRetryPolicy)
+ .add("brokerId", brokerId)
.add("whichBrokerWhenConsumeSlowly", whichBrokerWhenConsumeSlowly)
.add("notifyConsumerIdsChangedEnable",
notifyConsumerIdsChangedEnable)
.add("groupSysFlag", groupSysFlag)
.add("consumeTimeoutMinute", consumeTimeoutMinute)
- .add("subscriptionTopicSet", subscriptionDataSet)
+ .add("subscriptionDataSet", subscriptionDataSet)
+ .add("attributes", attributes)
.toString();
}
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
index fddf6015de..f87bafc930 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
@@ -17,10 +17,12 @@
package org.apache.rocketmq.tools.command.consumer;
import com.alibaba.fastjson.JSON;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
+import org.apache.rocketmq.common.attribute.AttributeParser;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicy;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
@@ -99,6 +101,10 @@ public class UpdateSubGroupSubCommand implements SubCommand
{
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option(null, "attributes", true, "attribute(+a=b,+c=d,-e)");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
@@ -177,6 +183,12 @@ public class UpdateSubGroupSubCommand implements
SubCommand {
.getOptionValue('a').trim()));
}
+ if (commandLine.hasOption("attributes")) {
+ String attributesModification =
commandLine.getOptionValue("attributes").trim();
+ Map<String, String> attributes =
AttributeParser.parseToMap(attributesModification);
+ subscriptionGroupConfig.setAttributes(attributes);
+ }
+
if (commandLine.hasOption('b')) {
String addr = commandLine.getOptionValue('b').trim();