This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new abac81a62 [INLONG-6096][TubeMQ] Optimize some code styles under the nodeconsumer module (#6097) abac81a62 is described below commit abac81a628bcaa1e6258b1327928be8e99c89168 Author: Goson Zhang <4675...@qq.com> AuthorDate: Sun Oct 9 10:00:32 2022 +0800 [INLONG-6096][TubeMQ] Optimize some code styles under the nodeconsumer module (#6097) --- .../apache/inlong/tubemq/server/master/TMaster.java | 2 +- .../nodemanage/nodeconsumer/ConsumeGroupInfo.java | 19 +++++++++++-------- .../nodemanage/nodeconsumer/ConsumerEventManager.java | 8 +++----- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java index 08f3f01f8..c18c54b22 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java @@ -817,7 +817,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { strBuffer.delete(0, strBuffer.length()); try { consumeGroupInfo.settAllocated(); - consumerEventManager.removeFirst(clientId); + consumerEventManager.removeFirst(clientId, strBuffer); } catch (Throwable e) { logger.warn("Unknown exception for remove first event:", e); } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java index 65b828189..13a1bba85 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java @@ -96,7 +96,7 @@ public class ConsumeGroupInfo { new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED); /** - * Initial a Consume group node information. + * Initial a consumer group node information. * * @param consumer the consumer of consume group. */ @@ -298,7 +298,7 @@ public class ConsumeGroupInfo { return; } String newConfig; - String curCOnfig; + String curConfig; boolean isChanged = false; Set<String> newTopics = newMetaInfoMap.keySet(); Set<String> curTopics = topicMetaInfoMap.keySet(); @@ -308,11 +308,11 @@ public class ConsumeGroupInfo { } else { for (String topicKey : newTopics) { newConfig = newMetaInfoMap.get(topicKey); - curCOnfig = topicMetaInfoMap.get(topicKey); + curConfig = topicMetaInfoMap.get(topicKey); if (newConfig == null) { continue; } - if (!newConfig.equals(curCOnfig)) { + if (!newConfig.equals(curConfig)) { isChanged = true; break; } @@ -724,7 +724,8 @@ public class ConsumeGroupInfo { logger.warn(sBuffer.toString()); return false; } - boolean foundEqual = false; + boolean foundOccupied = false; + int occupiedNodeId = -1; String occupiedConsumerId = null; for (ConsumerInfo consumerInfo : consumerInfoMap.values()) { if (consumerInfo == null) { @@ -732,16 +733,18 @@ public class ConsumeGroupInfo { } if (consumerInfo.getNodeId() == inConsumer.getNodeId() && !consumerInfo.getConsumerId().equals(inConsumer.getConsumerId())) { - foundEqual = true; + foundOccupied = true; + occupiedNodeId = consumerInfo.getNodeId(); occupiedConsumerId = consumerInfo.getConsumerId(); break; } } - if (foundEqual) { + if (foundOccupied) { sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) .append("'s nodeId value(").append(inConsumer.getNodeId()) .append(") is occupied by ").append(occupiedConsumerId) - .append(" in the group!"); + .append("'s nodeId value(").append(occupiedNodeId) + .append(") in the group!"); result.setCheckResult(false, TErrCodeConstants.CLIENT_DUPLICATE_INDEXID, sBuffer.toString()); logger.warn(sBuffer.toString()); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java index a669754fa..6a35d2f1b 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java @@ -119,9 +119,8 @@ public class ConsumerEventManager { * disconnect event have priority over connect event * * @param consumerId the consumer id that need removed - * @return the first consumer removed from the event map */ - public ConsumerEvent removeFirst(String consumerId) { + public void removeFirst(String consumerId, StringBuilder strBuffer) { ConsumerEvent event = null; String group = consumerHolder.getGroupName(consumerId); boolean selDisConnMap = hasDisconnectEvent(group); @@ -144,12 +143,11 @@ public class ConsumerEventManager { } } if (event != null) { - logger.info(new StringBuilder(512) - .append("[Event Removed] rebalanceId=") + logger.info(strBuffer.append("[Event Removed] rebalanceId=") .append(event.getRebalanceId()).append(",clientId=") .append(consumerId).toString()); + strBuffer.delete(0, strBuffer.length()); } - return event; } public int getUnfinishedCount(String groupName) {