This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit f06a60a40c43ab8e9dbc4b6def0b89089a708b41 Author: Goson Zhang <4675...@qq.com> AuthorDate: Sat Jan 7 23:05:16 2023 +0800 [INLONG-7182][TubeMQ] Replace ParamCheckResult with ProcessResult (#7183) --- .../tubemq/server/broker/BrokerServiceServer.java | 28 +- .../server/common/paramcheck/PBParameterUtils.java | 226 +++++----- .../server/common/paramcheck/ParamCheckResult.java | 60 --- .../inlong/tubemq/server/master/TMaster.java | 455 ++++++++++----------- .../nodemanage/nodeconsumer/ConsumeGroupInfo.java | 105 +++-- .../nodeconsumer/ConsumerInfoHolder.java | 14 +- .../tubemq/server/common/PBParameterTest.java | 45 +- 7 files changed, 391 insertions(+), 542 deletions(-) diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java index 658277519..03c661370 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java @@ -74,7 +74,6 @@ import org.apache.inlong.tubemq.server.common.TStatusConstants; import org.apache.inlong.tubemq.server.common.aaaserver.CertificateBrokerHandler; import org.apache.inlong.tubemq.server.common.aaaserver.CertifiedResult; import org.apache.inlong.tubemq.server.common.exception.HeartbeatException; -import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef; import org.apache.inlong.tubemq.server.common.heartbeat.HeartbeatManager; import org.apache.inlong.tubemq.server.common.heartbeat.TimeoutInfo; import org.apache.inlong.tubemq.server.common.heartbeat.TimeoutListener; @@ -295,16 +294,14 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic ProcessResult result = new ProcessResult(); StringBuilder strBuffer = new StringBuilder(512); // get and check clientId field - if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID, - request.getClientId(), strBuffer, result)) { + if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); return builder.build(); } final String clientId = (String) result.getRetData(); // get and check groupName field - if (!PBParameterUtils.getStringParameter(WebFieldDef.GROUPNAME, - request.getGroupName(), strBuffer, result)) { + if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); return builder.build(); @@ -618,8 +615,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic return builder.build(); } // get and check clientId field - if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID, - request.getClientId(), strBuffer, result)) { + if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); return builder.build(); @@ -832,8 +828,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic ProcessResult result = new ProcessResult(); final StringBuilder strBuffer = new StringBuilder(512); // get and check clientId field - if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID, - request.getClientId(), strBuffer, result)) { + if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); return builder.build(); @@ -849,8 +844,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic // get consumer info final String topicName = (String) result.getRetData(); // get and check groupName field - if (!PBParameterUtils.getStringParameter(WebFieldDef.GROUPNAME, - request.getGroupName(), strBuffer, result)) { + if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); return builder.build(); @@ -1112,16 +1106,14 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic return builder.build(); } // get and check clientId field - if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID, - request.getClientId(), strBuffer, result)) { + if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); return builder.build(); } final String clientId = (String) result.getRetData(); // get and check groupName field - if (!PBParameterUtils.getStringParameter(WebFieldDef.GROUPNAME, - request.getGroupName(), strBuffer, result)) { + if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); return builder.build(); @@ -1229,16 +1221,14 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic ProcessResult result = new ProcessResult(); StringBuilder strBuffer = new StringBuilder(512); // get and check clientId field - if (!PBParameterUtils.getStringParameter(WebFieldDef.CLIENTID, - request.getClientId(), strBuffer, result)) { + if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); return builder.build(); } final String clientId = (String) result.getRetData(); // get and check groupName field - if (!PBParameterUtils.getStringParameter(WebFieldDef.GROUPNAME, - request.getGroupName(), strBuffer, result)) { + if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); return builder.build(); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java index 2e32e4f5d..718b7f649 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/PBParameterUtils.java @@ -48,17 +48,16 @@ public class PBParameterUtils { * Check request topic list of producer * * @param reqTopicLst the topic list to be checked. - * @param strBuffer a string buffer used to construct the result - * @return the check result + * @param strBuff a string buffer used to construct the result + * @param result the process result + * @return success or failure */ - public static ParamCheckResult checkProducerTopicList(final List<String> reqTopicLst, - final StringBuilder strBuffer) { - ParamCheckResult retResult = new ParamCheckResult(); + public static boolean checkProducerTopicList(List<String> reqTopicLst, + StringBuilder strBuff, ProcessResult result) { if (reqTopicLst == null) { - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, + result.setFailResult(TErrCodeConstants.BAD_REQUEST, "Request miss necessary topic field info!"); - return retResult; + return result.isSuccess(); } Set<String> transTopicList = new HashSet<>(); if (!reqTopicLst.isEmpty()) { @@ -69,27 +68,25 @@ public class PBParameterUtils { topic = topic.trim(); // filter system topic OFFSET_HISTORY_NAME if (topic.equals(TServerConstants.OFFSET_HISTORY_NAME)) { - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, - strBuffer.append("System Topic ") + result.setFailResult(TErrCodeConstants.BAD_REQUEST, + strBuff.append("System Topic ") .append(TServerConstants.OFFSET_HISTORY_NAME) .append(" does not allow client produce data!").toString()); - strBuffer.delete(0, strBuffer.length()); - return retResult; + strBuff.delete(0, strBuff.length()); + return result.isSuccess(); } transTopicList.add(topic); } } if (transTopicList.size() > TBaseConstants.META_MAX_BOOKED_TOPIC_COUNT) { - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, - strBuffer.append("Booked topic's count over max value, required max count is ") + result.setFailResult(TErrCodeConstants.BAD_REQUEST, + strBuff.append("Booked topic's count over max value, required max count is ") .append(TBaseConstants.META_MAX_BOOKED_TOPIC_COUNT).toString()); - strBuffer.delete(0, strBuffer.length()); - return retResult; + strBuff.delete(0, strBuff.length()); + return result.isSuccess(); } - retResult.setCheckData(transTopicList); - return retResult; + result.setSuccResult(transTopicList); + return result.isSuccess(); } /** @@ -98,12 +95,11 @@ public class PBParameterUtils { * @param depTopicSet the deployed topic set * @param reqTopicLst the topic list to be checked. * @param strBuff a string buffer used to construct the result + * @param result process result * @return the check result */ public static boolean checkConsumerTopicList(Set<String> depTopicSet, - List<String> reqTopicLst, - ProcessResult result, - StringBuilder strBuff) { + List<String> reqTopicLst, StringBuilder strBuff, ProcessResult result) { if ((reqTopicLst == null) || (reqTopicLst.isEmpty())) { result.setFailResult(TErrCodeConstants.BAD_REQUEST, "Request miss necessary subscribed topicList data!"); @@ -154,61 +150,54 @@ public class PBParameterUtils { * @param csmType the topic list to be checked. * @param reqTopicSet the subscribed topic set * @param requiredParts the specified partitionKey-bootstrap offset map - * @param strBuffer the string buffer used to construct the result + * @param strBuff the string buffer used to construct the result * @return the check result */ - public static ParamCheckResult checkConsumerOffsetSetInfo(ConsumeType csmType, - final Set<String> reqTopicSet, - final String requiredParts, - final StringBuilder strBuffer) { + public static boolean checkConsumerOffsetSetInfo(ConsumeType csmType, Set<String> reqTopicSet, + String requiredParts, StringBuilder strBuff, ProcessResult result) { Map<String, Long> requiredPartMap = new HashMap<>(); - ParamCheckResult retResult = new ParamCheckResult(); if (csmType != ConsumeType.CONSUME_BAND) { - retResult.setCheckData(requiredPartMap); - return retResult; + result.setSuccResult(requiredPartMap); + return result.isSuccess(); } if (TStringUtils.isBlank(requiredParts)) { - retResult.setCheckData(requiredPartMap); - return retResult; + result.setSuccResult(requiredPartMap); + return result.isSuccess(); } String[] partOffsetItems = requiredParts.trim().split(TokenConstants.ARRAY_SEP); for (String partOffset : partOffsetItems) { String[] partKeyVal = partOffset.split(TokenConstants.EQ); if (partKeyVal.length == 1) { - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, - strBuffer.append("[Parameter error] unformatted Partition-Offset value : ") + result.setFailResult(TErrCodeConstants.BAD_REQUEST, + strBuff.append("[Parameter error] unformatted Partition-Offset value : ") .append(partOffset).append(" must be aa:bbb:ccc=val1,ddd:eee:ff=val2").toString()); - return retResult; + return result.isSuccess(); } String[] partKeyItems = partKeyVal[0].trim().split(TokenConstants.ATTR_SEP); if (partKeyItems.length != 3) { - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, - strBuffer.append("[Parameter error] unformatted Partition-Offset value : ") + result.setFailResult(TErrCodeConstants.BAD_REQUEST, + strBuff.append("[Parameter error] unformatted Partition-Offset value : ") .append(partOffset).append(" must be aa:bbb:ccc=val1,ddd:eee:ff=val2").toString()); - return retResult; + return result.isSuccess(); } if (!reqTopicSet.contains(partKeyItems[1].trim())) { - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, - strBuffer.append("[Parameter error] wrong offset reset for unsubscribed topic: reset item is ") + result.setFailResult(TErrCodeConstants.BAD_REQUEST, + strBuff.append("[Parameter error] wrong offset reset for unsubscribed topic: reset item is ") .append(partOffset).append(", request topicList are ") - .append(reqTopicSet.toString()).toString()); - return retResult; + .append(reqTopicSet).toString()); + return result.isSuccess(); } try { requiredPartMap.put(partKeyVal[0].trim(), Long.parseLong(partKeyVal[1].trim())); } catch (Throwable ex) { - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, - strBuffer.append("[Parameter error] required long type value of ") + result.setFailResult(TErrCodeConstants.BAD_REQUEST, + strBuff.append("[Parameter error] required long type value of ") .append(partOffset).append("' Offset!").toString()); - return retResult; + return result.isSuccess(); } } - retResult.setCheckData(requiredPartMap); - return retResult; + result.setSuccResult(requiredPartMap); + return result.isSuccess(); } /** @@ -219,43 +208,41 @@ public class PBParameterUtils { * @param masterConfig the master configure * @param defMetaDataService the cluster meta information * @param brokerRunManager the broker running information - * @param strBuffer the string buffer used to construct the result + * @param strBuff the string buffer used to construct the result + * @param result the process result * @return the check result */ - public static ParamCheckResult checkConsumerInputInfo(ConsumerInfo inConsumerInfo, + public static boolean checkConsumerInputInfo(ConsumerInfo inConsumerInfo, MasterConfig masterConfig, MetaDataService defMetaDataService, BrokerRunManager brokerRunManager, - StringBuilder strBuffer) throws Exception { - ParamCheckResult retResult = new ParamCheckResult(); + StringBuilder strBuff, + ProcessResult result) throws Exception { if (!inConsumerInfo.isRequireBound()) { - retResult.setCheckData(inConsumerInfo); - return retResult; + result.setSuccResult(inConsumerInfo); + return result.isSuccess(); } if (TStringUtils.isBlank(inConsumerInfo.getSessionKey())) { - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, + result.setFailResult(TErrCodeConstants.BAD_REQUEST, "[Parameter error] blank value of sessionKey!"); - return retResult; + return result.isSuccess(); } inConsumerInfo.setSessionKey(inConsumerInfo.getSessionKey().trim()); if (inConsumerInfo.getSourceCount() <= 0) { - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, + result.setFailResult(TErrCodeConstants.BAD_REQUEST, "[Parameter error] totalSourceCount must over zero!"); - return retResult; + return result.isSuccess(); } GroupResCtrlEntity offsetResetGroupEntity = defMetaDataService.getGroupCtrlConf(inConsumerInfo.getGroupName()); if (masterConfig.isStartOffsetResetCheck()) { if (offsetResetGroupEntity == null) { - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, - strBuffer.append("[unauthorized subscribe] ConsumeGroup must be ") + result.setFailResult(TErrCodeConstants.BAD_REQUEST, + strBuff.append("[unauthorized subscribe] ConsumeGroup must be ") .append("authorized by administrator before using bound subscribe") .append(", please contact to administrator!").toString()); - strBuffer.delete(0, strBuffer.length()); - return retResult; + strBuff.delete(0, strBuff.length()); + return result.isSuccess(); } } int allowRate = (offsetResetGroupEntity != null @@ -270,107 +257,82 @@ public class PBParameterUtils { if (maxBrokerCount % allowRate != 0) { minClientCnt += 1; } - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, - strBuffer.append("[Parameter error] System requires at least ") + result.setFailResult(TErrCodeConstants.BAD_REQUEST, + strBuff.append("[Parameter error] System requires at least ") .append(minClientCnt).append(" clients to consume data together, ") .append("please add client resources!").toString()); - return retResult; + return result.isSuccess(); } - retResult.setCheckData(inConsumerInfo); - return retResult; + result.setSuccResult(inConsumerInfo); + return result.isSuccess(); } /** * Check the id of broker * * @param brokerId the id of broker to be checked - * @param strBuffer the string buffer used to construct check result + * @param strBuff the string buffer used to construct check result + * @param result the process result * @return the check result */ - public static ParamCheckResult checkBrokerId(final String brokerId, - final StringBuilder strBuffer) { - ParamCheckResult retResult = new ParamCheckResult(); + public static boolean checkBrokerId(String brokerId, + StringBuilder strBuff, ProcessResult result) { if (TStringUtils.isBlank(brokerId)) { - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, + result.setFailResult(TErrCodeConstants.BAD_REQUEST, "Request miss necessary brokerId data"); - return retResult; + return result.isSuccess(); } String tmpValue = brokerId.trim(); try { - retResult.setCheckData(Integer.parseInt(tmpValue)); + result.setSuccResult(Integer.parseInt(tmpValue)); } catch (Throwable e) { - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, - strBuffer.append("Parse brokerId to int failure ").append(e.getMessage()).toString()); - strBuffer.delete(0, strBuffer.length()); - return retResult; + result.setFailResult(TErrCodeConstants.BAD_REQUEST, + strBuff.append("Parse brokerId to int failure ").append(e.getMessage()).toString()); + strBuff.delete(0, strBuff.length()); } - return retResult; + return result.isSuccess(); } /** * Check the clientID. * * @param clientId the client id to be checked - * @param strBuffer the string used to construct the result + * @param strBuff the string used to construct the result + * @param result process result * @return the check result */ - public static ParamCheckResult checkClientId(final String clientId, final StringBuilder strBuffer) { - return validStringParameter("clientId", - clientId, TBaseConstants.META_MAX_CLIENT_ID_LENGTH, strBuffer); + public static boolean checkClientId(String clientId, + StringBuilder strBuff, ProcessResult result) { + return PBParameterUtils.getStringParameter( + WebFieldDef.CLIENTID, clientId, strBuff, result); } /** * Check the hostname. * * @param hostName the hostname to be checked. - * @param strBuffer the string used to construct the result + * @param strBuff the string used to construct the result + * @param result the process result * @return the check result */ - public static ParamCheckResult checkHostName(final String hostName, final StringBuilder strBuffer) { - return validStringParameter("hostName", - hostName, TBaseConstants.META_MAX_CLIENT_HOSTNAME_LENGTH, strBuffer); + public static boolean checkHostName(String hostName, + StringBuilder strBuff, ProcessResult result) { + return PBParameterUtils.getStringParameter( + WebFieldDef.HOSTNAME, hostName, strBuff, result); } /** * Check the group name * * @param groupName the group name to be checked - * @param strBuffer the string used to construct the result + * @param strBuff the string used to construct the result + * @param result the process result * @return the check result */ - public static ParamCheckResult checkGroupName(final String groupName, final StringBuilder strBuffer) { - return validStringParameter("groupName", - groupName, TBaseConstants.META_MAX_GROUPNAME_LENGTH, strBuffer); - } - - private static ParamCheckResult validStringParameter(final String paramName, - final String paramValue, - int paramMaxLen, - final StringBuilder strBuffer) { - ParamCheckResult retResult = new ParamCheckResult(); - if (TStringUtils.isBlank(paramValue)) { - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, - strBuffer.append("Request miss necessary ") - .append(paramName).append(" data!").toString()); - strBuffer.delete(0, strBuffer.length()); - return retResult; - } - String tmpValue = paramValue.trim(); - if (tmpValue.length() > paramMaxLen) { - retResult.setCheckResult(false, - TErrCodeConstants.BAD_REQUEST, - strBuffer.append(paramName) - .append("'s length over max value, required max length is ") - .append(paramMaxLen).toString()); - strBuffer.delete(0, strBuffer.length()); - return retResult; - } - retResult.setCheckData(tmpValue); - return retResult; + public static boolean checkGroupName(String groupName, + StringBuilder strBuff, ProcessResult result) { + return PBParameterUtils.getStringParameter( + WebFieldDef.GROUPNAME, groupName, strBuff, result); } /** @@ -383,9 +345,7 @@ public class PBParameterUtils { * @return result success or failure */ public static boolean getStringParameter(WebFieldDef fieldDef, - String paramValue, - StringBuilder strBuffer, - ProcessResult result) { + String paramValue, StringBuilder strBuffer, ProcessResult result) { if (TStringUtils.isBlank(paramValue)) { result.setFailResult(strBuffer.append("Request miss necessary ") .append(fieldDef.name).append(" data!").toString()); @@ -414,9 +374,7 @@ public class PBParameterUtils { * @return the check result */ public static boolean getTopicNameParameter(String topicName, - MetadataManager metadataManager, - StringBuilder strBuffer, - ProcessResult result) { + MetadataManager metadataManager, StringBuilder strBuffer, ProcessResult result) { if (!getStringParameter(WebFieldDef.TOPICNAME, topicName, strBuffer, result)) { return result.isSuccess(); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/ParamCheckResult.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/ParamCheckResult.java deleted file mode 100644 index 8da6562d6..000000000 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/paramcheck/ParamCheckResult.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.tubemq.server.common.paramcheck; - -import org.apache.inlong.tubemq.corebase.TErrCodeConstants; - -public class ParamCheckResult { - - public boolean result; - public int errCode; - public String errMsg; - public Object checkData; - - public ParamCheckResult() { - this.result = false; - this.errCode = TErrCodeConstants.BAD_REQUEST; - this.errMsg = "Unset object!"; - } - - public ParamCheckResult(boolean result, int errCode, final String errMsg) { - this.result = result; - this.errCode = errCode; - this.errMsg = errMsg; - } - - public ParamCheckResult(Object checkData) { - this.result = true; - this.errCode = TErrCodeConstants.SUCCESS; - this.errMsg = "Ok"; - this.checkData = checkData; - } - - public void setCheckResult(boolean result, int errCode, final String errMsg) { - this.result = result; - this.errCode = errCode; - this.errMsg = errMsg; - } - - public void setCheckData(Object checkData) { - this.result = true; - this.errCode = TErrCodeConstants.SUCCESS; - this.errMsg = "Ok"; - this.checkData = checkData; - } -} diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java index 5b119dd3a..2e1a67075 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java @@ -97,7 +97,6 @@ import org.apache.inlong.tubemq.server.common.heartbeat.HeartbeatManager; import org.apache.inlong.tubemq.server.common.heartbeat.TimeoutInfo; import org.apache.inlong.tubemq.server.common.heartbeat.TimeoutListener; import org.apache.inlong.tubemq.server.common.paramcheck.PBParameterUtils; -import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult; import org.apache.inlong.tubemq.server.common.utils.ClientSyncInfo; import org.apache.inlong.tubemq.server.common.utils.HasThread; import org.apache.inlong.tubemq.server.common.utils.RowLock; @@ -310,7 +309,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable { public RegisterResponseM2P producerRegisterP2M(RegisterRequestP2M request, final String rmtAddress, boolean overtls) throws Exception { - final StringBuilder strBuffer = new StringBuilder(512); + ProcessResult result = new ProcessResult(); + final StringBuilder strBuff = new StringBuilder(512); RegisterResponseM2P.Builder builder = RegisterResponseM2P.newBuilder(); builder.setSuccess(false); builder.setBrokerCheckSum(-1); @@ -321,35 +321,30 @@ public class TMaster extends HasThread implements MasterService, Stoppable { builder.setErrMsg(certResult.errInfo); return builder.build(); } - ParamCheckResult paramCheckResult = - PBParameterUtils.checkClientId(request.getClientId(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final String producerId = (String) paramCheckResult.checkData; - paramCheckResult = PBParameterUtils.checkHostName(request.getHostName(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + final String producerId = (String) result.getRetData(); + if (!PBParameterUtils.checkHostName(request.getHostName(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final String hostName = (String) paramCheckResult.checkData; - paramCheckResult = - PBParameterUtils.checkProducerTopicList(request.getTopicListList(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + final String hostName = (String) result.getRetData(); + if (!PBParameterUtils.checkProducerTopicList(request.getTopicListList(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final Set<String> transTopicSet = (Set<String>) paramCheckResult.checkData; + final Set<String> transTopicSet = (Set<String>) result.getRetData(); if (!request.hasBrokerCheckSum()) { builder.setErrCode(TErrCodeConstants.BAD_REQUEST); builder.setErrMsg("Request miss necessary brokerCheckSum field!"); return builder.build(); } - checkNodeStatus(producerId, strBuffer); + checkNodeStatus(producerId, strBuff); CertifiedResult authorizeResult = serverAuthHandler.validProducerAuthorizeInfo( certResult.userName, transTopicSet, rmtAddress); @@ -376,7 +371,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { if (clientConfigBuilder != null) { builder.setAppdConfig(clientConfigBuilder); } - logger.info(strBuffer.append("[Producer Register] ").append(producerId) + logger.info(strBuff.append("[Producer Register] ").append(producerId) .append(", isOverTLS=").append(overtls) .append(", clientJDKVer=").append(clientJdkVer).toString()); builder.setSuccess(true); @@ -398,7 +393,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable { public HeartResponseM2P producerHeartbeatP2M(HeartRequestP2M request, final String rmtAddress, boolean overtls) throws Exception { - final StringBuilder strBuffer = new StringBuilder(512); + ProcessResult result = new ProcessResult(); + final StringBuilder strBuff = new StringBuilder(512); HeartResponseM2P.Builder builder = HeartResponseM2P.newBuilder(); builder.setSuccess(false); builder.setBrokerCheckSum(-1); @@ -409,36 +405,31 @@ public class TMaster extends HasThread implements MasterService, Stoppable { builder.setErrMsg(certResult.errInfo); return builder.build(); } - ParamCheckResult paramCheckResult = - PBParameterUtils.checkClientId(request.getClientId(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final String producerId = (String) paramCheckResult.checkData; - paramCheckResult = PBParameterUtils.checkHostName(request.getHostName(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + final String producerId = (String) result.getRetData(); + if (!PBParameterUtils.checkHostName(request.getHostName(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final String hostName = (String) paramCheckResult.checkData; - paramCheckResult = - PBParameterUtils.checkProducerTopicList(request.getTopicListList(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + final String hostName = (String) result.getRetData(); + if (!PBParameterUtils.checkProducerTopicList(request.getTopicListList(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final Set<String> transTopicSet = (Set<String>) paramCheckResult.checkData; + final Set<String> transTopicSet = (Set<String>) result.getRetData(); if (!request.hasBrokerCheckSum()) { builder.setErrCode(TErrCodeConstants.BAD_REQUEST); builder.setErrMsg("Request miss necessary brokerCheckSum field!"); return builder.build(); } final long inBrokerCheckSum = request.getBrokerCheckSum(); - checkNodeStatus(producerId, strBuffer); + checkNodeStatus(producerId, strBuff); CertifiedResult authorizeResult = serverAuthHandler.validProducerAuthorizeInfo( certResult.userName, transTopicSet, rmtAddress); @@ -477,7 +468,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { builder.setAppdConfig(clientConfigBuilder); } if (logger.isDebugEnabled()) { - logger.debug(strBuffer.append("[Push Producer's available topic count:]") + logger.debug(strBuff.append("[Push Producer's available topic count:]") .append(producerId).append(TokenConstants.LOG_SEG_SEP) .append((prodTopicConfigTuple.getF2() == null) ? 0 @@ -503,7 +494,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable { public CloseResponseM2P producerCloseClientP2M(CloseRequestP2M request, final String rmtAddress, boolean overtls) throws Exception { - final StringBuilder strBuffer = new StringBuilder(512); + ProcessResult result = new ProcessResult(); + final StringBuilder strBuff = new StringBuilder(512); CloseResponseM2P.Builder builder = CloseResponseM2P.newBuilder(); builder.setSuccess(false); CertifiedResult certResult = @@ -513,18 +505,16 @@ public class TMaster extends HasThread implements MasterService, Stoppable { builder.setErrMsg(certResult.errInfo); return builder.build(); } - ParamCheckResult paramCheckResult = - PBParameterUtils.checkClientId(request.getClientId(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final String producerId = (String) paramCheckResult.checkData; - checkNodeStatus(producerId, strBuffer); + final String producerId = (String) result.getRetData(); + checkNodeStatus(producerId, strBuff); new ReleaseProducer().run(producerId, false); heartbeatManager.unRegProducerNode(producerId); - logger.info(strBuffer.append("[Producer Closed] ") + logger.info(strBuff.append("[Producer Closed] ") .append(producerId).append(", isOverTLS=").append(overtls).toString()); builder.setSuccess(true); builder.setErrCode(TErrCodeConstants.SUCCESS); @@ -547,7 +537,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { boolean overtls) throws Exception { // #lizard forgives ProcessResult result = new ProcessResult(); - final StringBuilder strBuffer = new StringBuilder(512); + final StringBuilder strBuff = new StringBuilder(512); RegisterResponseM2C.Builder builder = RegisterResponseM2C.newBuilder(); builder.setSuccess(false); CertifiedResult certResult = @@ -557,31 +547,27 @@ public class TMaster extends HasThread implements MasterService, Stoppable { builder.setErrMsg(certResult.errInfo); return builder.build(); } - ParamCheckResult paramCheckResult = - PBParameterUtils.checkClientId(request.getClientId(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final String consumerId = (String) paramCheckResult.checkData; - paramCheckResult = PBParameterUtils.checkHostName(request.getHostName(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + final String consumerId = (String) result.getRetData(); + if (!PBParameterUtils.checkHostName(request.getHostName(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - // final String hostName = (String) paramCheckResult.checkData; - paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + // final String hostName = (String) result.getRetData(); + if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final String groupName = (String) paramCheckResult.checkData; - checkNodeStatus(consumerId, strBuffer); + final String groupName = (String) result.getRetData(); + checkNodeStatus(consumerId, strBuff); if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(), - request.getTopicListList(), result, strBuffer)) { + request.getTopicListList(), strBuff, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); return builder.build(); @@ -594,14 +580,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable { ? ConsumeType.CONSUME_BAND : ConsumeType.CONSUME_NORMAL; final String clientJdkVer = request.hasJdkVersion() ? request.getJdkVersion() : ""; - paramCheckResult = PBParameterUtils.checkConsumerOffsetSetInfo(csmType, - reqTopicSet, requiredParts, strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + if (!PBParameterUtils.checkConsumerOffsetSetInfo( + csmType, reqTopicSet, requiredParts, strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - Map<String, Long> requiredPartMap = (Map<String, Long>) paramCheckResult.checkData; + Map<String, Long> requiredPartMap = (Map<String, Long>) result.getRetData(); String sessionKey = request.hasSessionKey() ? request.getSessionKey() : ""; long sessionTime = request.hasSessionTime() ? request.getSessionTime() @@ -626,15 +611,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable { reqTopicSet, reqTopicConditions, csmType, sessionKey, sessionTime, sourceCount, isSelectBig, requiredPartMap, rmtAddress); - paramCheckResult = - PBParameterUtils.checkConsumerInputInfo(inConsumerInfo, - masterConfig, defMetaDataService, brokerRunManager, strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + if (!PBParameterUtils.checkConsumerInputInfo(inConsumerInfo, + masterConfig, defMetaDataService, brokerRunManager, strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - ConsumerInfo inConsumerInfo2 = (ConsumerInfo) paramCheckResult.checkData; + ConsumerInfo inConsumerInfo2 = (ConsumerInfo) result.getRetData(); CertifiedResult authorizeResult = serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName, groupName, reqTopicSet, reqTopicConditions, rmtAddress); @@ -646,9 +629,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable { // need removed for authorize center begin if (!this.defMetaDataService .isConsumeTargetAuthorized(consumerId, groupName, - reqTopicSet, reqTopicConditions, strBuffer, result)) { - if (strBuffer.length() > 0) { - logger.warn(strBuffer.toString()); + reqTopicSet, reqTopicConditions, strBuff, result)) { + if (strBuff.length() > 0) { + logger.warn(strBuff.toString()); } builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); @@ -660,32 +643,32 @@ public class TMaster extends HasThread implements MasterService, Stoppable { try { lid = masterRowLock.getLock(null, StringUtils.getBytesUtf8(consumerId), true); if (!consumerHolder.addConsumer(inConsumerInfo2, - isNotAllocated, strBuffer, paramCheckResult)) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + isNotAllocated, strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - consumeGroupInfo = (ConsumeGroupInfo) paramCheckResult.checkData; + consumeGroupInfo = (ConsumeGroupInfo) result.getRetData(); topicPSInfoManager.addGroupSubTopicInfo(groupName, reqTopicSet); if (CollectionUtils.isNotEmpty(subscribeList)) { int reportCnt = 0; Map<String, Partition> partMap; Map<String, Map<String, Partition>> topicPartSubMap = new HashMap<>(); currentSubInfo.put(consumerId, topicPartSubMap); - strBuffer.append("[SubInfo Report] client=").append(consumerId) + strBuff.append("[SubInfo Report] client=").append(consumerId) .append(", subscribed partitions=["); for (SubscribeInfo info : subscribeList) { partMap = topicPartSubMap.computeIfAbsent( info.getTopic(), k -> new HashMap<>()); partMap.put(info.getPartition().getPartitionKey(), info.getPartition()); if (reportCnt++ > 0) { - strBuffer.append(","); + strBuff.append(","); } - strBuffer.append(info.getPartitionStr()); + strBuff.append(info.getPartitionStr()); } - strBuffer.append("]"); - logger.info(strBuffer.toString()); - strBuffer.delete(0, strBuffer.length()); + strBuff.append("]"); + logger.info(strBuff.toString()); + strBuff.delete(0, strBuff.length()); } heartbeatManager.regConsumerNode(getConsumerKey(groupName, consumerId)); } catch (IOException e) { @@ -695,10 +678,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable { this.masterRowLock.releaseRowLock(lid); } } - logger.info(strBuffer.append("[Consumer Register] ") + logger.info(strBuff.append("[Consumer Register] ") .append(consumerId).append(", isOverTLS=").append(overtls) .append(", clientJDKVer=").append(clientJdkVer).toString()); - strBuffer.delete(0, strBuffer.length()); + strBuff.delete(0, strBuff.length()); if (request.hasDefFlowCheckId() || request.hasGroupFlowCheckId()) { builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED); builder.setDefFlowCheckId(TBaseConstants.META_VALUE_UNDEFINED); @@ -747,7 +730,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable { final String rmtAddress, boolean overtls) throws Throwable { // #lizard forgives - final StringBuilder strBuffer = new StringBuilder(512); + ProcessResult result = new ProcessResult(); + final StringBuilder strBuff = new StringBuilder(512); // response HeartResponseM2C.Builder builder = HeartResponseM2C.newBuilder(); builder.setSuccess(false); @@ -759,26 +743,23 @@ public class TMaster extends HasThread implements MasterService, Stoppable { builder.setErrMsg(certResult.errInfo); return builder.build(); } - ParamCheckResult paramCheckResult = - PBParameterUtils.checkClientId(request.getClientId(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final String clientId = (String) paramCheckResult.checkData; - paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + final String clientId = (String) result.getRetData(); + if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final String groupName = (String) paramCheckResult.checkData; - checkNodeStatus(clientId, strBuffer); + final String groupName = (String) result.getRetData(); + checkNodeStatus(clientId, strBuff); ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(groupName); if (consumeGroupInfo == null) { builder.setErrCode(TErrCodeConstants.HB_NO_NODE); - builder.setErrMsg(strBuffer.append("Not found groupName ") + builder.setErrMsg(strBuff.append("Not found groupName ") .append(groupName).append(" in holder!").toString()); return builder.build(); } @@ -797,7 +778,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { heartbeatManager.updConsumerNode(getConsumerKey(groupName, clientId)); } catch (HeartbeatException e) { builder.setErrCode(TErrCodeConstants.HB_NO_NODE); - builder.setErrMsg(strBuffer + builder.setErrMsg(strBuff .append("Update consumer node exception:") .append(e.getMessage()).toString()); return builder.build(); @@ -834,21 +815,21 @@ public class TMaster extends HasThread implements MasterService, Stoppable { partMap.put(regPart.getPartitionKey(), regPart); } if (rebalanceId <= 0) { - logger.warn(strBuffer.append("[Consistent Warn]").append(clientId) + logger.warn(strBuff.append("[Consistent Warn]").append(clientId) .append(" sub info is not consistent with master.").toString()); - strBuffer.delete(0, strBuffer.length()); + strBuff.delete(0, strBuff.length()); } } } // if (rebalanceId > 0) { - logger.info(strBuffer.append("[Event Processed] rebalanceId=") + logger.info(strBuff.append("[Event Processed] rebalanceId=") .append(request.getEvent().getRebalanceId()) .append(", clientId=").append(clientId).toString()); - strBuffer.delete(0, strBuffer.length()); + strBuff.delete(0, strBuff.length()); try { consumeGroupInfo.settAllocated(); - consumerEventManager.removeFirst(clientId, strBuffer); + consumerEventManager.removeFirst(clientId, strBuff); } catch (Throwable e) { logger.warn("Unknown exception for remove first event:", e); } @@ -859,9 +840,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable { if (event != null && event.getStatus() != EventStatus.PROCESSING) { event.setStatus(EventStatus.PROCESSING); - strBuffer.append("[Push Consumer Event]"); - logger.info(event.toStrBuilder(clientId, strBuffer).toString()); - strBuffer.delete(0, strBuffer.length()); + strBuff.append("[Push Consumer Event]"); + logger.info(event.toStrBuilder(clientId, strBuff).toString()); + strBuff.delete(0, strBuff.length()); EventProto.Builder eventProtoBuilder = EventProto.newBuilder(); eventProtoBuilder.setRebalanceId(event.getRebalanceId()); @@ -919,7 +900,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable { public CloseResponseM2C consumerCloseClientC2M(CloseRequestC2M request, final String rmtAddress, boolean overtls) throws Exception { - StringBuilder strBuffer = new StringBuilder(512); + ProcessResult result = new ProcessResult(); + StringBuilder strBuff = new StringBuilder(512); CloseResponseM2C.Builder builder = CloseResponseM2C.newBuilder(); builder.setSuccess(false); CertifiedResult certResult = @@ -929,24 +911,21 @@ public class TMaster extends HasThread implements MasterService, Stoppable { builder.setErrMsg(certResult.errInfo); return builder.build(); } - ParamCheckResult paramCheckResult = - PBParameterUtils.checkClientId(request.getClientId(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final String clientId = (String) paramCheckResult.checkData; - paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + final String clientId = (String) result.getRetData(); + if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final String groupName = (String) paramCheckResult.checkData; - checkNodeStatus(clientId, strBuffer); + final String groupName = (String) result.getRetData(); + checkNodeStatus(clientId, strBuff); String nodeId = getConsumerKey(groupName, clientId); - logger.info(strBuffer.append("[Consumer Closed]").append(nodeId) + logger.info(strBuff.append("[Consumer Closed]").append(nodeId) .append(", isOverTLS=").append(overtls).toString()); new ReleaseConsumer().run(nodeId, false); heartbeatManager.unRegConsumerNode(nodeId); @@ -984,18 +963,16 @@ public class TMaster extends HasThread implements MasterService, Stoppable { return builder.build(); } ProcessResult result = new ProcessResult(); - final StringBuilder strBuffer = new StringBuilder(512); + final StringBuilder strBuff = new StringBuilder(512); // get clientId and check valid - ParamCheckResult paramCheckResult = - PBParameterUtils.checkClientId(request.getClientId(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final String clientId = (String) paramCheckResult.checkData; + final String clientId = (String) result.getRetData(); // check authority - checkNodeStatus(clientId, strBuffer); + checkNodeStatus(clientId, strBuff); // get optional filed ClusterSettingEntity defSetting = defMetaDataService.getClusterDefSetting(false); @@ -1016,13 +993,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable { request.getCurBrokerConfId(), request.getConfCheckSumId(), true, request.getBrokerDefaultConfInfo(), request.getBrokerTopicSetConfInfoList(), request.getBrokerOnline(), - overtls, strBuffer, result)) { + overtls, strBuff, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); return builder.build(); } // print broker register log - logger.info(strBuffer.append("[Broker Register] ").append(clientId) + logger.info(strBuff.append("[Broker Register] ").append(clientId) .append(" report, configureId=").append(request.getCurBrokerConfId()) .append(",readStatusRpt=").append(request.getReadStatusRpt()) .append(",writeStatusRpt=").append(request.getWriteStatusRpt()) @@ -1031,7 +1008,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { .append(",FlowCtrlId=").append(reFlowCtrlId) .append(",qryPriorityId=").append(qryPriorityId) .append(",checksumId=").append(request.getConfCheckSumId()).toString()); - strBuffer.delete(0, strBuffer.length()); + strBuff.delete(0, strBuff.length()); // response builder.setSuccess(true); builder.setErrCode(TErrCodeConstants.SUCCESS); @@ -1047,7 +1024,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { enableInfo.setEnableConsumeAuthenticate(masterConfig.isStartConsumeAuthenticate()); enableInfo.setEnableConsumeAuthorize(masterConfig.isStartConsumeAuthorize()); builder.setEnableBrokerInfo(enableInfo); - brokerRunManager.setRegisterDownConfInfo(brokerInfo.getBrokerId(), strBuffer, builder); + brokerRunManager.setRegisterDownConfInfo(brokerInfo.getBrokerId(), strBuff, builder); builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED); ClientMaster.ClusterConfig.Builder clusterConfigBuilder = buildClusterConfig(request.getClsConfig()); @@ -1065,7 +1042,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { } } } - logger.info(strBuffer.append("[Broker Register] ").append(clientId) + logger.info(strBuff.append("[Broker Register] ").append(clientId) .append(", isOverTLS=").append(overtls).toString()); return builder.build(); } @@ -1105,35 +1082,33 @@ public class TMaster extends HasThread implements MasterService, Stoppable { return builder.build(); } ProcessResult result = new ProcessResult(); - final StringBuilder strBuffer = new StringBuilder(512); - ParamCheckResult paramCheckResult = - PBParameterUtils.checkBrokerId(request.getBrokerId(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + final StringBuilder strBuff = new StringBuilder(512); + if (!PBParameterUtils.checkBrokerId(request.getBrokerId(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - int brokerId = (int) paramCheckResult.checkData; + int brokerId = (int) result.getRetData(); long reFlowCtrlId = request.hasFlowCheckId() ? request.getFlowCheckId() : TBaseConstants.META_VALUE_UNDEFINED; int qryPriorityId = request.hasQryPriorityId() ? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED; - checkNodeStatus(String.valueOf(brokerId), strBuffer); + checkNodeStatus(String.valueOf(brokerId), strBuff); if (!brokerRunManager.brokerHeartBeat2M(brokerId, request.getCurBrokerConfId(), request.getConfCheckSumId(), request.getTakeConfInfo(), request.getBrokerDefaultConfInfo(), request.getBrokerTopicSetConfInfoList(), request.getTakeRemovedTopicInfo(), request.getRemovedTopicsInfoList(), request.getReadStatusRpt(), request.getWriteStatusRpt(), request.getBrokerOnline(), - strBuffer, result)) { + strBuff, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); return builder.build(); } if (request.getTakeConfInfo()) { - strBuffer.append("[Broker Report] heartbeat report: brokerId=") + strBuff.append("[Broker Report] heartbeat report: brokerId=") .append(request.getBrokerId()).append(", configureId=") .append(request.getCurBrokerConfId()) .append(",readStatusRpt=").append(request.getReadStatusRpt()) @@ -1145,15 +1120,15 @@ public class TMaster extends HasThread implements MasterService, Stoppable { .append(",brokerOnline=").append(request.getBrokerOnline()) .append(",default broker configure is ").append(request.getBrokerDefaultConfInfo()) .append(",broker topic configure is ").append(request.getBrokerTopicSetConfInfoList()); - strBuffer.delete(0, strBuffer.length()); + strBuff.delete(0, strBuff.length()); } // create response - brokerRunManager.setHeatBeatDownConfInfo(brokerId, strBuffer, builder); + brokerRunManager.setHeatBeatDownConfInfo(brokerId, strBuff, builder); BrokerConfEntity brokerConfEntity = defMetaDataService.getBrokerConfByBrokerId(brokerId); builder.setTakeRemoveTopicInfo(true); builder.addAllRemoveTopicConfInfo(defMetaDataService - .getBrokerRemovedTopicStrConfigInfo(brokerConfEntity, strBuffer).values()); + .getBrokerRemovedTopicStrConfigInfo(brokerConfEntity, strBuff).values()); builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED); if (request.hasFlowCheckId()) { ClusterSettingEntity defSetting = @@ -1197,7 +1172,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { final String rmtAddress, boolean overtls) throws Throwable { ProcessResult result = new ProcessResult(); - StringBuilder strBuffer = new StringBuilder(512); + StringBuilder strBuff = new StringBuilder(512); CloseResponseM2B.Builder builder = CloseResponseM2B.newBuilder(); builder.setSuccess(false); CertifiedResult cfResult = @@ -1207,16 +1182,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable { builder.setErrMsg(cfResult.errInfo); return builder.build(); } - ParamCheckResult paramCheckResult = - PBParameterUtils.checkBrokerId(request.getBrokerId(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + if (!PBParameterUtils.checkBrokerId(request.getBrokerId(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final int brokerId = (int) paramCheckResult.checkData; - checkNodeStatus(String.valueOf(brokerId), strBuffer); - if (!brokerRunManager.brokerClose2M(brokerId, strBuffer, result)) { + final int brokerId = (int) result.getRetData(); + checkNodeStatus(String.valueOf(brokerId), strBuff); + if (!brokerRunManager.brokerClose2M(brokerId, strBuff, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); return builder.build(); @@ -1241,7 +1214,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { String rmtAddress, boolean overtls) throws Throwable { ProcessResult result = new ProcessResult(); - final StringBuilder sBuffer = new StringBuilder(512); + final StringBuilder strBuff = new StringBuilder(512); RegisterResponseM2CV2.Builder builder = RegisterResponseM2CV2.newBuilder(); CertifiedResult certResult = serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false); @@ -1250,32 +1223,28 @@ public class TMaster extends HasThread implements MasterService, Stoppable { builder.setErrMsg(certResult.errInfo); return builder.build(); } - ParamCheckResult paramCheckResult = - PBParameterUtils.checkClientId(request.getClientId(), sBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final String consumerId = (String) paramCheckResult.checkData; - paramCheckResult = PBParameterUtils.checkHostName(request.getHostName(), sBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + final String consumerId = (String) result.getRetData(); + if (!PBParameterUtils.checkHostName(request.getHostName(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - // final String hostName = (String) paramCheckResult.checkData; - paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), sBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + // final String hostName = (String) result.getRetData(); + if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final String groupName = (String) paramCheckResult.checkData; + final String groupName = (String) result.getRetData(); // check master current status - checkNodeStatus(consumerId, sBuffer); + checkNodeStatus(consumerId, strBuff); if (!PBParameterUtils.checkConsumerTopicList(defMetaDataService.getDeployedTopicSet(), - request.getTopicListList(), result, sBuffer)) { + request.getTopicListList(), strBuff, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); return builder.build(); @@ -1313,9 +1282,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable { // need removed for authorize center begin if (!this.defMetaDataService .isConsumeTargetAuthorized(consumerId, groupName, - reqTopicSet, reqTopicConditions, sBuffer, result)) { - if (sBuffer.length() > 0) { - logger.warn(sBuffer.toString()); + reqTopicSet, reqTopicConditions, strBuff, result)) { + if (strBuff.length() > 0) { + logger.warn(strBuff.toString()); } builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); @@ -1323,12 +1292,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable { } // need removed for authorize center end // check resource require - paramCheckResult = - PBParameterUtils.checkConsumerInputInfo(inConsumerInfo, - masterConfig, defMetaDataService, brokerRunManager, sBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + if (!PBParameterUtils.checkConsumerInputInfo(inConsumerInfo, + masterConfig, defMetaDataService, brokerRunManager, strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } CertifiedResult authorizeResult = @@ -1343,9 +1310,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable { try { lid = masterRowLock.getLock(null, StringUtils.getBytesUtf8(consumerId), true); - if (!consumerHolder.addConsumer(inConsumerInfo, false, sBuffer, paramCheckResult)) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + if (!consumerHolder.addConsumer(inConsumerInfo, false, strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } topicPSInfoManager.addGroupSubTopicInfo(groupName, reqTopicSet); @@ -1360,21 +1327,21 @@ public class TMaster extends HasThread implements MasterService, Stoppable { ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(groupName); if (consumeGroupInfo == null) { - logger.warn(sBuffer.append("[Illegal Process] ").append(consumerId) + logger.warn(strBuff.append("[Illegal Process] ").append(consumerId) .append(" visit consume group(").append(groupName) .append(" info failure, null information").toString()); builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR); - builder.setErrMsg(sBuffer.toString()); - sBuffer.delete(0, sBuffer.length()); + builder.setErrMsg(strBuff.toString()); + strBuff.delete(0, strBuff.length()); return builder.build(); } inConsumerInfo = consumeGroupInfo.getConsumerInfo(consumerId); if (inConsumerInfo == null) { - logger.warn(sBuffer.append("[Illegal Process] ").append(consumerId) + logger.warn(strBuff.append("[Illegal Process] ").append(consumerId) .append(" visit consume info failure, null information").toString()); builder.setErrCode(TErrCodeConstants.INTERNAL_SERVER_ERROR); - builder.setErrMsg(sBuffer.toString()); - sBuffer.delete(0, sBuffer.length()); + builder.setErrMsg(strBuff.toString()); + strBuff.delete(0, strBuff.length()); return builder.build(); } Map<String, Map<String, Partition>> topicPartSubMap = new HashMap<>(); @@ -1386,12 +1353,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable { topicPartSubMap.computeIfAbsent(info.getTopic(), k -> new HashMap<>()); partMap.put(info.getPartitionKey(), info); } - printReportInfo(consumerId, null, topicPartSubMap, sBuffer); + printReportInfo(consumerId, null, topicPartSubMap, strBuff); } - logger.info(sBuffer.append("[Consumer Register] ") + logger.info(strBuff.append("[Consumer Register] ") .append(consumerId).append(", isOverTLS=").append(overtls) .append(", clientJDKVer=").append(clientJdkVer).toString()); - sBuffer.delete(0, sBuffer.length()); + strBuff.delete(0, strBuff.length()); Tuple2<Long, Map<Integer, String>> brokerStaticInfo = brokerRunManager.getBrokerStaticInfo(overtls); builder.setBrokerConfigId(brokerStaticInfo.getF0()); @@ -1418,7 +1385,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable { public HeartResponseM2CV2 consumerHeartbeatC2MV2(HeartRequestC2MV2 request, String rmtAddress, boolean overtls) throws Throwable { - final StringBuilder strBuffer = new StringBuilder(512); + ProcessResult result = new ProcessResult(); + final StringBuilder strBuff = new StringBuilder(512); // response HeartResponseM2CV2.Builder builder = HeartResponseM2CV2.newBuilder(); // identity valid @@ -1429,27 +1397,24 @@ public class TMaster extends HasThread implements MasterService, Stoppable { builder.setErrMsg(certResult.errInfo); return builder.build(); } - ParamCheckResult paramCheckResult = - PBParameterUtils.checkClientId(request.getClientId(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final String clientId = (String) paramCheckResult.checkData; - paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + final String clientId = (String) result.getRetData(); + if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - final String groupName = (String) paramCheckResult.checkData; + final String groupName = (String) result.getRetData(); OpsSyncInfo opsTaskInfo = new OpsSyncInfo(); if (request.hasOpsTaskInfo()) { opsTaskInfo.updOpsSyncInfo(request.getOpsTaskInfo()); } // check master current status - checkNodeStatus(clientId, strBuffer); + checkNodeStatus(clientId, strBuff); ClientSyncInfo clientSyncInfo = new ClientSyncInfo(); if (request.hasSubRepInfo()) { clientSyncInfo.updSubRepInfo(brokerRunManager, request.getSubRepInfo()); @@ -1457,7 +1422,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(groupName); if (consumeGroupInfo == null) { builder.setErrCode(TErrCodeConstants.HB_NO_NODE); - builder.setErrMsg(strBuffer.append("Not found groupName ") + builder.setErrMsg(strBuff.append("Not found groupName ") .append(groupName).append(" in holder!").toString()); return builder.build(); } @@ -1465,9 +1430,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable { consumeGroupInfo.getConsumerInfo(clientId); if (inConsumerInfo == null) { builder.setErrCode(TErrCodeConstants.HB_NO_NODE); - builder.setErrMsg(strBuffer.append("Not found client ").append(clientId) + builder.setErrMsg(strBuff.append("Not found client ").append(clientId) .append(" in group(").append(groupName).append(")").toString()); - strBuffer.delete(0, strBuffer.length()); + strBuff.delete(0, strBuff.length()); return builder.build(); } // authorize check @@ -1485,7 +1450,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { heartbeatManager.updConsumerNode(getConsumerKey(groupName, clientId)); } catch (HeartbeatException e) { builder.setErrCode(TErrCodeConstants.HB_NO_NODE); - builder.setErrMsg(strBuffer + builder.setErrMsg(strBuff .append("Update consumer node exception:") .append(e.getMessage()).toString()); return builder.build(); @@ -1502,7 +1467,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { newPartSubMap.computeIfAbsent(info.getTopic(), k -> new HashMap<>()); partMap.put(info.getPartitionKey(), info); } - printReportInfo(clientId, curPartSubMap, newPartSubMap, strBuffer); + printReportInfo(clientId, curPartSubMap, newPartSubMap, strBuff); currentSubInfo.put(clientId, newPartSubMap); } Tuple2<Long, Map<Integer, String>> brokerStaticInfo = @@ -1542,7 +1507,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable { public GetPartMetaResponseM2C consumerGetPartMetaInfoC2M(GetPartMetaRequestC2M request, String rmtAddress, boolean overtls) throws Throwable { - StringBuilder strBuffer = new StringBuilder(512); + ProcessResult reslut = new ProcessResult(); + StringBuilder strBuff = new StringBuilder(512); GetPartMetaResponseM2C.Builder builder = GetPartMetaResponseM2C.newBuilder(); CertifiedResult certResult = serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false); @@ -1551,41 +1517,38 @@ public class TMaster extends HasThread implements MasterService, Stoppable { builder.setErrMsg(certResult.errInfo); return builder.build(); } - ParamCheckResult paramCheckResult = - PBParameterUtils.checkClientId(request.getClientId(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, reslut)) { + builder.setErrCode(reslut.getErrCode()); + builder.setErrMsg(reslut.getErrMsg()); return builder.build(); } - final String clientId = (String) paramCheckResult.checkData; - paramCheckResult = PBParameterUtils.checkGroupName(request.getGroupName(), strBuffer); - if (!paramCheckResult.result) { - builder.setErrCode(paramCheckResult.errCode); - builder.setErrMsg(paramCheckResult.errMsg); + final String clientId = (String) reslut.getRetData(); + if (!PBParameterUtils.checkGroupName(request.getGroupName(), strBuff, reslut)) { + builder.setErrCode(reslut.getErrCode()); + builder.setErrMsg(reslut.getErrMsg()); return builder.build(); } - final String groupName = (String) paramCheckResult.checkData; + final String groupName = (String) reslut.getRetData(); final long brokerConfigId = request.getBrokerConfigId(); final long topicMetaInfoId = request.getTopicMetaInfoId(); - checkNodeStatus(clientId, strBuffer); + checkNodeStatus(clientId, strBuff); // get control object ConsumeGroupInfo consumeGroupInfo = consumerHolder.getConsumeGroupInfo(groupName); if (consumeGroupInfo == null) { builder.setErrCode(TErrCodeConstants.HB_NO_NODE); - builder.setErrMsg(strBuffer.append("Not found groupName ") + builder.setErrMsg(strBuff.append("Not found groupName ") .append(groupName).append(" in holder!").toString()); - strBuffer.delete(0, strBuffer.length()); + strBuff.delete(0, strBuff.length()); return builder.build(); } ConsumerInfo inConsumerInfo = consumeGroupInfo.getConsumerInfo(clientId); if (inConsumerInfo == null) { builder.setErrCode(TErrCodeConstants.HB_NO_NODE); - builder.setErrMsg(strBuffer.append("Not found client ").append(clientId) + builder.setErrMsg(strBuff.append("Not found client ").append(clientId) .append(" in group(").append(groupName).append(")").toString()); - strBuffer.delete(0, strBuffer.length()); + strBuff.delete(0, strBuff.length()); return builder.build(); } // heartbeat check @@ -1593,14 +1556,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable { heartbeatManager.updConsumerNode(getConsumerKey(groupName, clientId)); } catch (HeartbeatException e) { builder.setErrCode(TErrCodeConstants.HB_NO_NODE); - builder.setErrMsg(strBuffer + builder.setErrMsg(strBuff .append("Update consumer node exception:") .append(e.getMessage()).toString()); return builder.build(); } Tuple2<Long, List<String>> topicMetaInfoTuple = consumeGroupInfo.getTopicMetaInfo(); if (topicMetaInfoTuple.getF0() == TBaseConstants.META_VALUE_UNDEFINED) { - freshTopicMetaInfo(consumeGroupInfo, strBuffer); + freshTopicMetaInfo(consumeGroupInfo, strBuff); topicMetaInfoTuple = consumeGroupInfo.getTopicMetaInfo(); } builder.setTopicMetaInfoId(topicMetaInfoTuple.getF0()); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java index f82cd3eaf..58ccb2e35 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java @@ -33,9 +33,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.collections.CollectionUtils; import org.apache.inlong.tubemq.corebase.TBaseConstants; import org.apache.inlong.tubemq.corebase.TErrCodeConstants; +import org.apache.inlong.tubemq.corebase.rv.ProcessResult; import org.apache.inlong.tubemq.corebase.utils.TStringUtils; import org.apache.inlong.tubemq.corebase.utils.Tuple2; -import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,13 +113,12 @@ public class ConsumeGroupInfo { * Add consumer to consume group * * @param inConsumer consumer object - * @param sBuffer the string buffer + * @param strBuff the string buffer * @param result the process result * @return whether the addition is successful */ public boolean addConsumer(ConsumerInfo inConsumer, - StringBuilder sBuffer, - ParamCheckResult result) { + StringBuilder strBuff, ProcessResult result) { try { csmInfoRWLock.writeLock().lock(); if (this.consumerInfoMap.isEmpty()) { @@ -133,14 +132,14 @@ public class ConsumeGroupInfo { this.sourceCount = inConsumer.getSourceCount(); } } else { - if (!validConsumerInfo(inConsumer, sBuffer, result)) { + if (!validConsumerInfo(inConsumer, strBuff, result)) { return false; } ConsumerInfo curConsumerInfo = consumerInfoMap.get(inConsumer.getConsumerId()); if (curConsumerInfo != null) { curConsumerInfo.updCurConsumerInfo(inConsumer); - result.setCheckData(false); + result.setSuccResult(false); return true; } } @@ -148,7 +147,7 @@ public class ConsumeGroupInfo { if (consumeType == ConsumeType.CONSUME_BAND) { bookPartitionInfo(inConsumer); } - result.setCheckData(true); + result.setSuccResult(true); return true; } finally { csmInfoRWLock.writeLock().unlock(); @@ -620,38 +619,37 @@ public class ConsumeGroupInfo { * Check the validity of consumer's parameters * * @param inConsumer consumer info - * @param sBuffer string buffer + * @param strBuff string buffer * @param result process result * @return true if valid, or false if invalid */ private boolean validConsumerInfo(ConsumerInfo inConsumer, - StringBuilder sBuffer, - ParamCheckResult result) { + StringBuilder strBuff, ProcessResult result) { // check whether the consumer behavior is consistent if (inConsumer.getConsumeType() != this.consumeType) { - sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) + strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) .append(" using ").append(inConsumer.getConsumeType().getName()) .append(" subscribe is inconsistency with other consumers using ") .append(this.consumeType.getName()) .append(" subscribe in the group"); - result.setCheckResult(false, - TErrCodeConstants.CLIENT_INCONSISTENT_CONSUMETYPE, sBuffer.toString()); - logger.warn(sBuffer.toString()); - sBuffer.delete(0, sBuffer.length()); + result.setFailResult( + TErrCodeConstants.CLIENT_INCONSISTENT_CONSUMETYPE, strBuff.toString()); + logger.warn(strBuff.toString()); + strBuff.delete(0, strBuff.length()); return false; } // check the topics of consumption if (CollectionUtils.isNotEmpty(topicSet) && (topicSet.size() != inConsumer.getTopicSet().size() || !topicSet.containsAll(inConsumer.getTopicSet()))) { - sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) + strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) .append(" subscribed topics ").append(inConsumer.getTopicSet()) .append(" is inconsistency with other consumers in the group, existedTopics: ") .append(topicSet); - result.setCheckResult(false, - TErrCodeConstants.CLIENT_INCONSISTENT_TOPICSET, sBuffer.toString()); - logger.warn(sBuffer.toString()); - sBuffer.delete(0, sBuffer.length()); + result.setFailResult( + TErrCodeConstants.CLIENT_INCONSISTENT_TOPICSET, strBuff.toString()); + logger.warn(strBuff.toString()); + strBuff.delete(0, strBuff.length()); return false; } // check the topic conditions of consumption @@ -659,7 +657,7 @@ public class ConsumeGroupInfo { if (topicConditions.isEmpty()) { if (!inConsumer.getTopicConditions().isEmpty()) { isCondEqual = false; - sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) + strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) .append(" subscribe with filter condition ") .append(inConsumer.getTopicConditions()) .append(" is inconsistency with other consumers in the group: topic without conditions"); @@ -668,7 +666,7 @@ public class ConsumeGroupInfo { // check the filter conditions of the topic if (inConsumer.getTopicConditions().isEmpty()) { isCondEqual = false; - sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) + strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) .append(" subscribe without filter condition ") .append(" is inconsistency with other consumers in the group, existed topic conditions is ") .append(topicConditions); @@ -678,7 +676,7 @@ public class ConsumeGroupInfo { if (existedCondTopics.size() != reqCondTopics.size() || !existedCondTopics.containsAll(reqCondTopics)) { isCondEqual = false; - sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) + strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) .append(" subscribe with filter condition ") .append(inConsumer.getTopicConditions()) .append(" is inconsistency with other consumers in the group, existed topic conditions is ") @@ -690,7 +688,7 @@ public class ConsumeGroupInfo { || (!topicConditions.get(topicKey).containsAll( inConsumer.getTopicConditions().get(topicKey)))) { isCondEqual = false; - sBuffer.append("[Inconsistency subscribe] ") + strBuff.append("[Inconsistency subscribe] ") .append(inConsumer.getConsumerId()) .append(" subscribe with filter condition ") .append(inConsumer.getTopicConditions()) @@ -704,25 +702,25 @@ public class ConsumeGroupInfo { } } if (!isCondEqual) { - result.setCheckResult(false, - TErrCodeConstants.CLIENT_INCONSISTENT_FILTERSET, sBuffer.toString()); - logger.warn(sBuffer.toString()); + result.setFailResult( + TErrCodeConstants.CLIENT_INCONSISTENT_FILTERSET, strBuff.toString()); + logger.warn(strBuff.toString()); return false; } // Check the validity of bound consumer's parameters if (this.consumeType == ConsumeType.CONSUME_BAND) { - if (!validBoundParameters(inConsumer, sBuffer, result)) { + if (!validBoundParameters(inConsumer, strBuff, result)) { return false; } } else if (this.consumeType == ConsumeType.CONSUME_CLIENT_REB) { if (this.sourceCount > 0) { if (this.sourceCount != inConsumer.getSourceCount()) { - sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) + strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) .append("'s sourceCount is inconsistency with other consumers in the group, required is ") .append(sourceCount).append(", request is ").append(inConsumer.getSourceCount()); - result.setCheckResult(false, - TErrCodeConstants.CLIENT_INCONSISTENT_SOURCECOUNT, sBuffer.toString()); - logger.warn(sBuffer.toString()); + result.setFailResult( + TErrCodeConstants.CLIENT_INCONSISTENT_SOURCECOUNT, strBuff.toString()); + logger.warn(strBuff.toString()); return false; } boolean foundOccupied = false; @@ -741,19 +739,19 @@ public class ConsumeGroupInfo { } } if (foundOccupied) { - sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) + strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) .append("'s nodeId value(").append(inConsumer.getNodeId()) .append(") is occupied by ").append(occupiedConsumerId) .append("'s nodeId value(").append(occupiedNodeId) .append(") in the group!"); - result.setCheckResult(false, - TErrCodeConstants.CLIENT_DUPLICATE_INDEXID, sBuffer.toString()); - logger.warn(sBuffer.toString()); + result.setFailResult( + TErrCodeConstants.CLIENT_DUPLICATE_INDEXID, strBuff.toString()); + logger.warn(strBuff.toString()); return false; } } } - result.setCheckData("Ok"); + result.setSuccResult("Ok"); return true; } @@ -761,50 +759,49 @@ public class ConsumeGroupInfo { * Check the validity of bound consumer's parameters * * @param inConsumer consumer info - * @param sBuffer string buffer + * @param strBuff string buffer * @param result process result * @return true if valid, or false if invalid */ private boolean validBoundParameters(ConsumerInfo inConsumer, - StringBuilder sBuffer, - ParamCheckResult result) { + StringBuilder strBuff, ProcessResult result) { if (consumeType != ConsumeType.CONSUME_BAND) { - result.setCheckData(""); + result.setSuccResult(""); return true; } // If the sessionKey is inconsistent, it means that the previous round of consumption has not completely // exited. In order to avoid the incomplete offset setting, it is necessary to completely clear the above // data before resetting and consuming this round of consumption if (!sessionKey.equals(inConsumer.getSessionKey())) { - sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) + strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) .append("'s sessionKey is inconsistency with other consumers in the group, required is ") .append(sessionKey).append(", request is ").append(inConsumer.getSessionKey()); - result.setCheckResult(false, - TErrCodeConstants.CLIENT_INCONSISTENT_SESSIONKEY, sBuffer.toString()); - logger.warn(sBuffer.toString()); + result.setFailResult( + TErrCodeConstants.CLIENT_INCONSISTENT_SESSIONKEY, strBuff.toString()); + logger.warn(strBuff.toString()); return false; } // check the offset config if (isSelectedBig != inConsumer.isSelectedBig()) { - sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) + strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) .append("'s isSelectBig is inconsistency with other consumers in the group, required is ") .append(isSelectedBig).append(", request is ").append(inConsumer.isSelectedBig()); - result.setCheckResult(false, - TErrCodeConstants.CLIENT_INCONSISTENT_SELECTBIG, sBuffer.toString()); - logger.warn(sBuffer.toString()); + result.setFailResult( + TErrCodeConstants.CLIENT_INCONSISTENT_SELECTBIG, strBuff.toString()); + logger.warn(strBuff.toString()); return false; } // check the consumers count if (sourceCount != inConsumer.getSourceCount()) { - sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) + strBuff.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId()) .append("'s sourceCount is inconsistency with other consumers in the group, required is ") .append(sourceCount).append(", request is ").append(inConsumer.getSourceCount()); - result.setCheckResult(false, - TErrCodeConstants.CLIENT_INCONSISTENT_SOURCECOUNT, sBuffer.toString()); - logger.warn(sBuffer.toString()); + result.setFailResult( + TErrCodeConstants.CLIENT_INCONSISTENT_SOURCECOUNT, strBuff.toString()); + logger.warn(strBuff.toString()); return false; } - result.setCheckData("Ok"); + result.setSuccResult("Ok"); return true; } } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java index db09ca02d..0f600bb94 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java @@ -24,9 +24,9 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.codec.binary.StringUtils; +import org.apache.inlong.tubemq.corebase.rv.ProcessResult; import org.apache.inlong.tubemq.corebase.utils.ConcurrentHashSet; import org.apache.inlong.tubemq.corebase.utils.Tuple2; -import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult; import org.apache.inlong.tubemq.server.common.utils.RowLock; import org.apache.inlong.tubemq.server.master.MasterConfig; import org.apache.inlong.tubemq.server.master.TMaster; @@ -347,12 +347,12 @@ public class ConsumerInfoHolder { * * @param consumer consumer info * @param isNotAllocated whether balanced - * @param sBuffer string buffer + * @param strBuff string buffer * @param result check result * @return process result */ public boolean addConsumer(ConsumerInfo consumer, boolean isNotAllocated, - StringBuilder sBuffer, ParamCheckResult result) { + StringBuilder strBuff, ProcessResult result) { ConsumeGroupInfo consumeGroupInfo; String group = consumer.getGroupName(); Integer lid = null; @@ -388,8 +388,8 @@ public class ConsumerInfoHolder { consumeGroupInfo.isClientBalance()); } } - if (consumeGroupInfo.addConsumer(consumer, sBuffer, result)) { - if ((Boolean) result.checkData) { + if (consumeGroupInfo.addConsumer(consumer, strBuff, result)) { + if ((Boolean) result.getRetData()) { MasterSrvStatsHolder.incConsumerCnt(false, consumeGroupInfo.isClientBalance()); } @@ -397,7 +397,7 @@ public class ConsumerInfoHolder { consumeGroupInfo.settAllocated(); } consumerIndexMap.put(consumer.getConsumerId(), group); - result.setCheckData(consumeGroupInfo); + result.setSuccResult(consumeGroupInfo); } } catch (IOException e) { logger.warn("Failed to lock.", e); @@ -406,7 +406,7 @@ public class ConsumerInfoHolder { groupRowLock.releaseRowLock(lid); } } - return result.result; + return result.isSuccess(); } /** diff --git a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java index 103fd0f11..4ccfcc5b4 100644 --- a/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java +++ b/inlong-tubemq/tubemq-server/src/test/java/org/apache/inlong/tubemq/server/common/PBParameterTest.java @@ -24,7 +24,6 @@ import java.util.Set; import org.apache.inlong.tubemq.corebase.TErrCodeConstants; import org.apache.inlong.tubemq.corebase.rv.ProcessResult; import org.apache.inlong.tubemq.server.common.paramcheck.PBParameterUtils; -import org.apache.inlong.tubemq.server.common.paramcheck.ParamCheckResult; import org.junit.Assert; import org.junit.Test; @@ -32,52 +31,54 @@ public class PBParameterTest { @Test public void checkProducerTopicTest() { - ParamCheckResult result = PBParameterUtils.checkProducerTopicList(null, null); - Assert.assertEquals(result.errCode, TErrCodeConstants.BAD_REQUEST); + ProcessResult result = new ProcessResult(); + StringBuilder strBuff = new StringBuilder(128); + Assert.assertFalse(PBParameterUtils.checkProducerTopicList(null, strBuff, result)); + Assert.assertEquals(result.getErrCode(), TErrCodeConstants.BAD_REQUEST); final List<String> topicList = new ArrayList<>(); topicList.add("test1"); - result = PBParameterUtils.checkProducerTopicList(topicList, new StringBuilder(128)); - Assert.assertEquals(result.errCode, TErrCodeConstants.SUCCESS); + Assert.assertTrue(PBParameterUtils.checkProducerTopicList(topicList, strBuff, result)); + Assert.assertEquals(result.getErrCode(), TErrCodeConstants.SUCCESS); for (int i = 0; i < 1025; i++) { topicList.add("test" + i); } - result = PBParameterUtils.checkProducerTopicList(topicList, new StringBuilder(128)); - Assert.assertEquals(result.errCode, TErrCodeConstants.BAD_REQUEST); + Assert.assertFalse(PBParameterUtils.checkProducerTopicList(topicList, strBuff, result)); + Assert.assertEquals(result.getErrCode(), TErrCodeConstants.BAD_REQUEST); } @Test public void checkConsumerTopicTest() { ProcessResult result = new ProcessResult(); - PBParameterUtils.checkConsumerTopicList(null, null, result, null); + StringBuilder strBuff = new StringBuilder(128); + PBParameterUtils.checkConsumerTopicList(null, null, strBuff, result); Assert.assertEquals(result.getErrCode(), TErrCodeConstants.BAD_REQUEST); final Set<String> depTopicList = new HashSet<>(); final List<String> reqTopicList = new ArrayList<>(); depTopicList.add("test1"); reqTopicList.add("test1"); - PBParameterUtils.checkConsumerTopicList(depTopicList, - reqTopicList, result, new StringBuilder(128)); + PBParameterUtils.checkConsumerTopicList(depTopicList, reqTopicList, strBuff, result); Assert.assertEquals(result.getErrCode(), TErrCodeConstants.SUCCESS); reqTopicList.add("test2"); - PBParameterUtils.checkConsumerTopicList(depTopicList, - reqTopicList, result, new StringBuilder(128)); + PBParameterUtils.checkConsumerTopicList(depTopicList, reqTopicList, strBuff, result); Assert.assertEquals(result.getErrCode(), TErrCodeConstants.TOPIC_NOT_DEPLOYED); for (int i = 0; i < 1025; i++) { reqTopicList.add("test" + i); } - PBParameterUtils.checkConsumerTopicList(depTopicList, - reqTopicList, result, new StringBuilder(128)); + PBParameterUtils.checkConsumerTopicList(depTopicList, reqTopicList, strBuff, result); Assert.assertEquals(result.getErrCode(), TErrCodeConstants.BAD_REQUEST); } @Test public void checkIdTest() { - ParamCheckResult result = PBParameterUtils.checkClientId("100", new StringBuilder(128)); - Assert.assertEquals(result.errCode, TErrCodeConstants.SUCCESS); - result = PBParameterUtils.checkClientId("", new StringBuilder(128)); - Assert.assertEquals(result.errCode, TErrCodeConstants.BAD_REQUEST); - result = PBParameterUtils.checkBrokerId("100", new StringBuilder(128)); - Assert.assertEquals(result.errCode, TErrCodeConstants.SUCCESS); - result = PBParameterUtils.checkBrokerId("", new StringBuilder(128)); - Assert.assertEquals(result.errCode, TErrCodeConstants.BAD_REQUEST); + ProcessResult result = new ProcessResult(); + StringBuilder strBuff = new StringBuilder(128); + Assert.assertTrue(PBParameterUtils.checkClientId("100", strBuff, result)); + Assert.assertEquals(result.getErrCode(), TErrCodeConstants.SUCCESS); + Assert.assertFalse(PBParameterUtils.checkClientId("", strBuff, result)); + Assert.assertEquals(result.getErrCode(), TErrCodeConstants.BAD_REQUEST); + Assert.assertTrue(PBParameterUtils.checkBrokerId("100", strBuff, result)); + Assert.assertEquals(result.getErrCode(), TErrCodeConstants.SUCCESS); + Assert.assertFalse(PBParameterUtils.checkBrokerId("", strBuff, result)); + Assert.assertEquals(result.getErrCode(), TErrCodeConstants.BAD_REQUEST); } }