This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 6cc584bbc8941814ba2b4f95329a2443ba043a7d Author: Goson Zhang <4675...@qq.com> AuthorDate: Sun Jan 8 20:31:23 2023 +0800 [INLONG-7184][TubeMQ] Replace CertifiedResult with ProcessResult (#7185) --- .../tubemq/server/broker/BrokerServiceServer.java | 73 ++++---- .../common/aaaserver/CertificateBrokerHandler.java | 13 +- .../common/aaaserver/CertificateMasterHandler.java | 15 +- .../{CertifiedResult.java => CertifiedInfo.java} | 42 ++--- .../aaaserver/SimpleCertificateBrokerHandler.java | 61 +++---- .../aaaserver/SimpleCertificateMasterHandler.java | 103 ++++++----- .../inlong/tubemq/server/master/TMaster.java | 188 +++++++++------------ 7 files changed, 223 insertions(+), 272 deletions(-) diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java index 03c661370..94a901804 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java @@ -72,7 +72,7 @@ import org.apache.inlong.tubemq.server.broker.stats.audit.AuditUtils; import org.apache.inlong.tubemq.server.common.TServerConstants; import org.apache.inlong.tubemq.server.common.TStatusConstants; import org.apache.inlong.tubemq.server.common.aaaserver.CertificateBrokerHandler; -import org.apache.inlong.tubemq.server.common.aaaserver.CertifiedResult; +import org.apache.inlong.tubemq.server.common.aaaserver.CertifiedInfo; import org.apache.inlong.tubemq.server.common.exception.HeartbeatException; import org.apache.inlong.tubemq.server.common.heartbeat.HeartbeatManager; import org.apache.inlong.tubemq.server.common.heartbeat.TimeoutInfo; @@ -607,13 +607,12 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic builder.setErrMsg("Write StoreService temporary unavailable!"); return builder.build(); } - CertifiedResult certResult = - serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), true); - if (!certResult.result) { - builder.setErrCode(certResult.errCode); - builder.setErrMsg(certResult.errInfo); + if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), true, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } + CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData(); // get and check clientId field if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer, result)) { builder.setErrCode(result.getErrCode()); @@ -659,12 +658,10 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic .append(checkSum).toString()); return builder.build(); } - CertifiedResult authorizeResult = - serverAuthHandler.validProduceAuthorizeInfo( - certResult.userName, topicName, msgType, rmtAddress); - if (!authorizeResult.result) { - builder.setErrCode(authorizeResult.errCode); - builder.setErrMsg(authorizeResult.errInfo); + if (!serverAuthHandler.validProduceAuthorizeInfo( + certifiedInfo.getUserName(), topicName, msgType, rmtAddress, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } try { @@ -682,7 +679,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic AuditUtils.addProduceRecord(topicName, request.getMsgType(), request.getMsgTime(), 1, dataLength); builder.setSuccess(true); - builder.setRequireAuth(certResult.reAuth); + builder.setRequireAuth(certifiedInfo.isReAuth()); builder.setErrCode(TErrCodeConstants.SUCCESS); // begin Deprecated, after 1.0, the ErrMsg set "Ok" or "" builder.setErrMsg(String.valueOf(appendResult.getMsgId())); @@ -811,21 +808,21 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic public RegisterResponseB2C consumerRegisterC2B(RegisterRequestC2B request, final String rmtAddress, boolean overtls) throws Throwable { + ProcessResult result = new ProcessResult(); RegisterResponseB2C.Builder builder = RegisterResponseB2C.newBuilder(); builder.setSuccess(false); builder.setCurrOffset(-1); - CertifiedResult certResult = serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false); if (!this.started.get()) { builder.setErrCode(TErrCodeConstants.SERVICE_UNAVAILABLE); builder.setErrMsg("StoreService temporary unavailable!"); return builder.build(); } - if (!certResult.result) { - builder.setErrCode(certResult.errCode); - builder.setErrMsg(certResult.errInfo); + if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - ProcessResult result = new ProcessResult(); + CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData(); final StringBuilder strBuffer = new StringBuilder(512); // get and check clientId field if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer, result)) { @@ -859,12 +856,10 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic } } } - CertifiedResult authorizeResult = - serverAuthHandler.validConsumeAuthorizeInfo(certResult.userName, - groupName, topicName, filterCondSet, isRegister, rmtAddress); - if (!authorizeResult.result) { - builder.setErrCode(authorizeResult.errCode); - builder.setErrMsg(authorizeResult.errInfo); + if (!serverAuthHandler.validConsumeAuthorizeInfo(certifiedInfo.getUserName(), + groupName, topicName, filterCondSet, isRegister, rmtAddress, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } Integer lid = null; @@ -1098,13 +1093,12 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic builder.setErrMsg("StoreService temporary unavailable!"); return builder.build(); } - CertifiedResult certResult = - serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false); - if (!certResult.result) { - builder.setErrCode(certResult.errCode); - builder.setErrMsg(certResult.errInfo); + if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } + CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData(); // get and check clientId field if (!PBParameterUtils.checkClientId(request.getClientId(), strBuffer, result)) { builder.setErrCode(result.getErrCode()); @@ -1124,7 +1118,6 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic : TBaseConstants.META_VALUE_UNDEFINED; List<Partition> partitions = DataConverterUtil.convertPartitionInfo(request.getPartitionInfoList()); - CertifiedResult authorizeResult = null; boolean isAuthorized = false; List<String> failureInfo = new ArrayList<>(); for (Partition partition : partitions) { @@ -1135,7 +1128,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic if (consumerNodeInfo == null) { failureInfo.add(strBuffer.append(TErrCodeConstants.HB_NO_NODE) .append(TokenConstants.ATTR_SEP) - .append(partition.toString()).toString()); + .append(partition).toString()); strBuffer.delete(0, strBuffer.length()); logger.warn(strBuffer.append("[Heartbeat Check] UnRegistered Consumer:") .append(clientId).append(TokenConstants.SEGMENT_SEP) @@ -1145,7 +1138,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic } if (!clientId.equals(consumerNodeInfo.getConsumerId())) { failureInfo.add(strBuffer.append(TErrCodeConstants.DUPLICATE_PARTITION) - .append(TokenConstants.ATTR_SEP).append(partition.toString()).toString()); + .append(TokenConstants.ATTR_SEP).append(partition).toString()); strBuffer.delete(0, strBuffer.length()); strBuffer.append("[Heartbeat Check] Duplicated partition: Partition ").append(partStr) .append(" has been consumed by ").append(consumerNodeInfo.getConsumerId()) @@ -1155,13 +1148,10 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic continue; } if (!isAuthorized) { - authorizeResult = - serverAuthHandler.validConsumeAuthorizeInfo(certResult.userName, - groupName, topic, consumerNodeInfo.getFilterCondStrs(), true, rmtAddress); - if (!authorizeResult.result) { - builder.setRequireAuth(authorizeResult.reAuth); - builder.setErrCode(authorizeResult.errCode); - builder.setErrMsg(authorizeResult.errInfo); + if (!serverAuthHandler.validConsumeAuthorizeInfo(certifiedInfo.getUserName(), + groupName, topic, consumerNodeInfo.getFilterCondStrs(), true, rmtAddress, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } isAuthorized = true; @@ -1171,8 +1161,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic getHeartbeatNodeId(clientId, partStr)); } catch (HeartbeatException e) { failureInfo.add(strBuffer.append(TErrCodeConstants.HB_NO_NODE) - .append(TokenConstants.ATTR_SEP) - .append(partition.toString()).toString()); + .append(TokenConstants.ATTR_SEP).append(partition).toString()); strBuffer.delete(0, strBuffer.length()); logger.warn(strBuffer.append("[Heartbeat Check] Invalid Request") .append(clientId).append(TokenConstants.SEGMENT_SEP) @@ -1184,7 +1173,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic consumerNodeInfo.setQryPriorityId(reqQryPriorityId); } } - builder.setRequireAuth(certResult.reAuth); + builder.setRequireAuth(certifiedInfo.isReAuth()); builder.setSuccess(true); builder.setErrCode(TErrCodeConstants.SUCCESS); builder.setHasPartFailure(false); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertificateBrokerHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertificateBrokerHandler.java index 3c8a922e6..50a820d08 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertificateBrokerHandler.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertificateBrokerHandler.java @@ -20,6 +20,7 @@ package org.apache.inlong.tubemq.server.common.aaaserver; import java.util.Set; import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker; import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster; +import org.apache.inlong.tubemq.corebase.rv.ProcessResult; public interface CertificateBrokerHandler { @@ -27,15 +28,15 @@ public interface CertificateBrokerHandler { void appendVisitToken(ClientMaster.MasterBrokerAuthorizedInfo authorizedInfo); - CertifiedResult identityValidUserInfo(ClientBroker.AuthorizedInfo authorizedInfo, - boolean isProduce); + boolean identityValidUserInfo(ClientBroker.AuthorizedInfo authorizedInfo, + boolean isProduce, ProcessResult result); - CertifiedResult validProduceAuthorizeInfo(String userName, String topicName, - String msgType, String clientIp); + boolean validProduceAuthorizeInfo(String userName, String topicName, + String msgType, String clientIp, ProcessResult result); - CertifiedResult validConsumeAuthorizeInfo(String userName, String groupName, + boolean validConsumeAuthorizeInfo(String userName, String groupName, String topicName, Set<String> msgTypeLst, - boolean isRegister, String clientIp); + boolean isRegister, String clientIp, ProcessResult result); boolean isEnableProduceAuthenticate(); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertificateMasterHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertificateMasterHandler.java index ad497a4ce..eaf6401e8 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertificateMasterHandler.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertificateMasterHandler.java @@ -21,17 +21,20 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster; +import org.apache.inlong.tubemq.corebase.rv.ProcessResult; public interface CertificateMasterHandler { - CertifiedResult identityValidBrokerInfo(ClientMaster.MasterCertificateInfo authenticInfo); + boolean identityValidBrokerInfo( + ClientMaster.MasterCertificateInfo authenticInfo, ProcessResult result); - CertifiedResult identityValidUserInfo(ClientMaster.MasterCertificateInfo authenticInfo, - boolean isProduce); + boolean identityValidUserInfo(ClientMaster.MasterCertificateInfo authenticInfo, + boolean isProduce, ProcessResult result); - CertifiedResult validProducerAuthorizeInfo(String userName, Set<String> topics, String clientIp); + boolean validProducerAuthorizeInfo(String userName, + Set<String> topics, String clientIp, ProcessResult result); - CertifiedResult validConsumerAuthorizeInfo(String userName, String groupName, Set<String> topics, - Map<String, TreeSet<String>> topicConds, String clientIp); + boolean validConsumerAuthorizeInfo(String userName, String groupName, Set<String> topics, + Map<String, TreeSet<String>> topicConds, String clientIp, ProcessResult result); } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertifiedResult.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertifiedInfo.java similarity index 53% rename from inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertifiedResult.java rename to inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertifiedInfo.java index ebab5442a..25040f768 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertifiedResult.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/CertifiedInfo.java @@ -17,42 +17,36 @@ package org.apache.inlong.tubemq.server.common.aaaserver; -import org.apache.inlong.tubemq.corebase.TErrCodeConstants; +public class CertifiedInfo { -public class CertifiedResult { + private String userName = ""; + private String authorizedToken = ""; + private boolean reAuth = false; - public boolean result = false; - public int errCode = TErrCodeConstants.BAD_REQUEST; - public String errInfo = "Not authenticate!"; - public String authorizedToken = ""; - public boolean reAuth = false; - public String userName = ""; + public CertifiedInfo() { - public CertifiedResult() { - - } - - public void setFailureResult(int errCode, final String resultInfo) { - this.result = false; - this.errCode = errCode; - this.errInfo = resultInfo; } - public void setSuccessResult(final String userName, final String authorizedToken) { - this.result = true; - this.errCode = TErrCodeConstants.SUCCESS; - this.errInfo = "Ok!"; + public CertifiedInfo(String userName, String authorizedToken) { this.userName = userName; this.authorizedToken = authorizedToken; } - public void setSuccessResult(final String userName, final String authorizedToken, boolean reAuth) { - this.result = true; - this.errCode = TErrCodeConstants.SUCCESS; - this.errInfo = "Ok!"; + public void setSuccessResult(String userName, String authorizedToken, boolean reAuth) { this.userName = userName; this.authorizedToken = authorizedToken; this.reAuth = reAuth; } + public String getUserName() { + return userName; + } + + public String getAuthorizedToken() { + return authorizedToken; + } + + public boolean isReAuth() { + return reAuth; + } } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/SimpleCertificateBrokerHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/SimpleCertificateBrokerHandler.java index cebd578de..f310765fd 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/SimpleCertificateBrokerHandler.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/SimpleCertificateBrokerHandler.java @@ -25,6 +25,7 @@ import org.apache.inlong.tubemq.corebase.TErrCodeConstants; import org.apache.inlong.tubemq.corebase.TokenConstants; import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker; import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster; +import org.apache.inlong.tubemq.corebase.rv.ProcessResult; import org.apache.inlong.tubemq.corebase.utils.TStringUtils; import org.apache.inlong.tubemq.server.broker.TubeBroker; import org.slf4j.Logger; @@ -79,12 +80,12 @@ public class SimpleCertificateBrokerHandler implements CertificateBrokerHandler } lastUpdatedVisitTokens = curBrokerVisitTokens; String[] visitTokenItems = curBrokerVisitTokens.split(TokenConstants.ARRAY_SEP); - for (int i = 0; i < visitTokenItems.length; i++) { - if (TStringUtils.isBlank(visitTokenItems[i])) { + for (String visitTokenItem : visitTokenItems) { + if (TStringUtils.isBlank(visitTokenItem)) { continue; } try { - long curVisitToken = Long.parseLong(visitTokenItems[i].trim()); + long curVisitToken = Long.parseLong(visitTokenItem.trim()); List<Long> currList = visitTokenList.get(); if (!currList.contains(curVisitToken)) { while (true) { @@ -109,13 +110,12 @@ public class SimpleCertificateBrokerHandler implements CertificateBrokerHandler } @Override - public CertifiedResult identityValidUserInfo(final ClientBroker.AuthorizedInfo authorizedInfo, - boolean isProduce) { - CertifiedResult result = new CertifiedResult(); + public boolean identityValidUserInfo(ClientBroker.AuthorizedInfo authorizedInfo, + boolean isProduce, ProcessResult result) { if (authorizedInfo == null) { - result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE, + result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE, "Authorized Info is required!"); - return result; + return result.isSuccess(); } if (enableVisitTokenCheck) { long curVisitToken = authorizedInfo.getVisitAuthorizedToken(); @@ -123,58 +123,53 @@ public class SimpleCertificateBrokerHandler implements CertificateBrokerHandler if (tubeBroker.isKeepAlive()) { if (!currList.contains(curVisitToken) && (System.currentTimeMillis() - tubeBroker.getLastRegTime() > inValidTokenCheckTimeMs)) { - result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE, + result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE, "Visit Authorized Token is invalid!"); - return result; + return result.isSuccess(); } } } if ((isProduce && !enableProduceAuthenticate) || (!isProduce && !enableConsumeAuthenticate)) { - result.setSuccessResult("", ""); - return result; + result.setSuccResult(new CertifiedInfo()); + return result.isSuccess(); } if (TStringUtils.isBlank(authorizedInfo.getAuthAuthorizedToken())) { - result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE, + result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE, "authAuthorizedToken is Blank!"); - return result; + return result.isSuccess(); } // process authAuthorizedToken info from certificate center begin // process authAuthorizedToken info from certificate center end // set userName, reAuth info - result.setSuccessResult("", "", false); - return result; + result.setSuccResult(new CertifiedInfo()); + return result.isSuccess(); } @Override - public CertifiedResult validConsumeAuthorizeInfo(final String userName, final String groupName, - final String topicName, final Set<String> msgTypeLst, - boolean isRegister, String clientIp) { - CertifiedResult result = new CertifiedResult(); + public boolean validConsumeAuthorizeInfo(String userName, String groupName, String topicName, + Set<String> msgTypeLst, boolean isRegister, String clientIp, ProcessResult result) { if (!enableConsumeAuthorize) { - result.setSuccessResult("", ""); - return result; + result.setSuccResult(new CertifiedInfo()); + return result.isSuccess(); } - // process authorize from authorize center begin // process authorize from authorize center end - result.setSuccessResult("", ""); - return result; + result.setSuccResult(new CertifiedInfo()); + return result.isSuccess(); } @Override - public CertifiedResult validProduceAuthorizeInfo(final String userName, final String topicName, - final String msgType, String clientIp) { - CertifiedResult result = new CertifiedResult(); + public boolean validProduceAuthorizeInfo(String userName, String topicName, + String msgType, String clientIp, ProcessResult result) { if (!enableProduceAuthorize) { - result.setSuccessResult("", ""); - return result; + result.setSuccResult(new CertifiedInfo()); + return result.isSuccess(); } - // process authorize from authorize center begin // process authorize from authorize center end - result.setSuccessResult("", ""); - return result; + result.setSuccResult(new CertifiedInfo()); + return result.isSuccess(); } @Override diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/SimpleCertificateMasterHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/SimpleCertificateMasterHandler.java index da715adcc..1f8b56bfd 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/SimpleCertificateMasterHandler.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/aaaserver/SimpleCertificateMasterHandler.java @@ -22,6 +22,7 @@ import java.util.Set; import java.util.TreeSet; import org.apache.inlong.tubemq.corebase.TErrCodeConstants; import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster; +import org.apache.inlong.tubemq.corebase.rv.ProcessResult; import org.apache.inlong.tubemq.corebase.utils.TStringUtils; import org.apache.inlong.tubemq.server.master.MasterConfig; @@ -34,140 +35,138 @@ public class SimpleCertificateMasterHandler implements CertificateMasterHandler } @Override - public CertifiedResult identityValidBrokerInfo( - final ClientMaster.MasterCertificateInfo certificateInfo) { - CertifiedResult result = new CertifiedResult(); + public boolean identityValidBrokerInfo( + ClientMaster.MasterCertificateInfo certificateInfo, ProcessResult result) { if (!masterConfig.isNeedBrokerVisitAuth()) { - result.setSuccessResult("", ""); - return result; + result.setSuccResult(new CertifiedInfo()); + return result.isSuccess(); } if (certificateInfo == null) { - return result; + result.setSuccResult(new CertifiedInfo()); + return result.isSuccess(); } ClientMaster.AuthenticateInfo authInfo = certificateInfo.getAuthInfo(); if (authInfo == null) { - result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE, + result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE, "Illegal value: AuthenticateInfo is null!"); - return result; + return result.isSuccess(); } if (TStringUtils.isBlank(authInfo.getUserName())) { - result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE, + result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE, "Illegal value: authInfo.userName is Blank!"); - return result; + return result.isSuccess(); } String inUserName = authInfo.getUserName().trim(); if (TStringUtils.isBlank(authInfo.getSignature())) { - result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE, + result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE, "Illegal value: authInfo.signature is Blank!"); - return result; + return result.isSuccess(); } String inSignature = authInfo.getSignature().trim(); if (!inUserName.equals(masterConfig.getVisitName())) { - result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE, + result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE, "Illegal value: userName is not equal in authenticateToken!"); - return result; + return result.isSuccess(); } if (Math.abs(System.currentTimeMillis() - authInfo.getTimestamp()) > masterConfig .getAuthValidTimeStampPeriodMs()) { - result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE, + result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE, "Illegal value: timestamp out of effective period in authenticateToken!"); - return result; + return result.isSuccess(); } String signature = TStringUtils.getAuthSignature(inUserName, masterConfig.getVisitPassword(), authInfo.getTimestamp(), authInfo.getNonce()); if (!inSignature.equals(signature)) { - result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE, + result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE, "Illegal value: userName or password is not correct!"); - return result; + return result.isSuccess(); } - result.setSuccessResult("", ""); - return result; + result.setSuccResult(new CertifiedInfo()); + return result.isSuccess(); } @Override - public CertifiedResult identityValidUserInfo(final ClientMaster.MasterCertificateInfo certificateInfo, - boolean isProduce) { + public boolean identityValidUserInfo(ClientMaster.MasterCertificateInfo certificateInfo, + boolean isProduce, ProcessResult result) { String inUserName = ""; String authorizedToken = ""; String othParams = ""; - CertifiedResult result = new CertifiedResult(); if (isProduce) { if (!masterConfig.isStartProduceAuthenticate()) { - result.setSuccessResult(inUserName, authorizedToken); - return result; + result.setSuccResult(new CertifiedInfo(inUserName, authorizedToken)); + return result.isSuccess(); } } else { if (!masterConfig.isStartConsumeAuthenticate()) { - result.setSuccessResult(inUserName, authorizedToken); - return result; + result.setSuccResult(new CertifiedInfo(inUserName, authorizedToken)); + return result.isSuccess(); } } if (certificateInfo == null) { - result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE, + result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE, "Server required MasterCertificateInfo!"); - return result; + return result.isSuccess(); } ClientMaster.AuthenticateInfo authInfo = certificateInfo.getAuthInfo(); if (authInfo == null) { - result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE, + result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE, "Illegal value: AuthenticateInfo is null!"); - return result; + return result.isSuccess(); } if (TStringUtils.isBlank(authInfo.getUserName())) { - result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE, + result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE, "Illegal value: authInfo.userName is Blank!"); - return result; + return result.isSuccess(); } inUserName = authInfo.getUserName().trim(); if (TStringUtils.isNotBlank(authInfo.getOthParams())) { othParams = authInfo.getOthParams().trim(); } if (TStringUtils.isBlank(authInfo.getSignature())) { - result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE, + result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE, "Illegal value: authInfo.signature is Blank!"); - return result; + return result.isSuccess(); } String inSignature = authInfo.getSignature().trim(); if (Math.abs(System.currentTimeMillis() - authInfo.getTimestamp()) > masterConfig .getAuthValidTimeStampPeriodMs()) { - result.setFailureResult(TErrCodeConstants.CERTIFICATE_FAILURE, + result.setFailResult(TErrCodeConstants.CERTIFICATE_FAILURE, "Illegal value: timestamp out of effective period in authenticateToken!"); - return result; + return result.isSuccess(); } // get username and password from certificate center begin // get username and password from certificate center end // get identified userName and authorized token info and return - result.setSuccessResult(inUserName, authorizedToken); - return result; + result.setSuccResult(new CertifiedInfo(inUserName, authorizedToken)); + return result.isSuccess(); } @Override - public CertifiedResult validProducerAuthorizeInfo(String userName, Set<String> topics, String clientIp) { - CertifiedResult result = new CertifiedResult(); + public boolean validProducerAuthorizeInfo(String userName, Set<String> topics, + String clientIp, ProcessResult result) { if (!masterConfig.isStartProduceAuthorize()) { - result.setSuccessResult("", ""); - return result; + result.setSuccResult(new CertifiedInfo()); + return result.isSuccess(); } // call authorize center begin // call authorize center end - result.setSuccessResult("", ""); - return result; + result.setSuccResult(new CertifiedInfo()); + return result.isSuccess(); } @Override - public CertifiedResult validConsumerAuthorizeInfo(String userName, String groupName, Set<String> topics, - Map<String, TreeSet<String>> topicConds, String clientIp) { - CertifiedResult result = new CertifiedResult(); + public boolean validConsumerAuthorizeInfo(String userName, String groupName, Set<String> topics, + Map<String, TreeSet<String>> topicConds, String clientIp, ProcessResult result) { if (!masterConfig.isStartProduceAuthorize()) { - result.setSuccessResult("", ""); - return result; + result.setSuccResult(new CertifiedInfo()); + return result.isSuccess(); } // call authorize center begin // call authorize center end - result.setSuccessResult("", ""); - return result; + result.setSuccResult(new CertifiedInfo()); + return result.isSuccess(); } } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java index 2e1a67075..9cb7ced13 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java @@ -90,7 +90,7 @@ import org.apache.inlong.tubemq.corerpc.exception.StandbyException; import org.apache.inlong.tubemq.corerpc.service.MasterService; import org.apache.inlong.tubemq.server.Stoppable; import org.apache.inlong.tubemq.server.common.aaaserver.CertificateMasterHandler; -import org.apache.inlong.tubemq.server.common.aaaserver.CertifiedResult; +import org.apache.inlong.tubemq.server.common.aaaserver.CertifiedInfo; import org.apache.inlong.tubemq.server.common.aaaserver.SimpleCertificateMasterHandler; import org.apache.inlong.tubemq.server.common.exception.HeartbeatException; import org.apache.inlong.tubemq.server.common.heartbeat.HeartbeatManager; @@ -314,13 +314,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable { RegisterResponseM2P.Builder builder = RegisterResponseM2P.newBuilder(); builder.setSuccess(false); builder.setBrokerCheckSum(-1); - CertifiedResult certResult = - serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), true); - if (!certResult.result) { - builder.setErrCode(certResult.errCode); - builder.setErrMsg(certResult.errInfo); + if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), true, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } + CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData(); if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); @@ -345,12 +344,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable { return builder.build(); } checkNodeStatus(producerId, strBuff); - CertifiedResult authorizeResult = - serverAuthHandler.validProducerAuthorizeInfo( - certResult.userName, transTopicSet, rmtAddress); - if (!authorizeResult.result) { - builder.setErrCode(authorizeResult.errCode); - builder.setErrMsg(authorizeResult.errInfo); + if (!serverAuthHandler.validProducerAuthorizeInfo( + certifiedInfo.getUserName(), transTopicSet, rmtAddress, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } final String clientJdkVer = request.hasJdkVersion() ? request.getJdkVersion() : ""; @@ -365,7 +362,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { builder.setBrokerCheckSum(brokerStaticInfo.getF0()); builder.addAllBrokerInfos(brokerStaticInfo.getF1().values()); builder.setAuthorizedInfo(genAuthorizedInfo( - certResult.authorizedToken, false).build()); + certifiedInfo.getAuthorizedToken(), false).build()); ClientMaster.ApprovedClientConfig.Builder clientConfigBuilder = buildApprovedClientConfig(request.getAppdConfig(), prodTopicConfigTuple); if (clientConfigBuilder != null) { @@ -398,13 +395,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable { HeartResponseM2P.Builder builder = HeartResponseM2P.newBuilder(); builder.setSuccess(false); builder.setBrokerCheckSum(-1); - CertifiedResult certResult = - serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), true); - if (!certResult.result) { - builder.setErrCode(certResult.errCode); - builder.setErrMsg(certResult.errInfo); + if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), true, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } + CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData(); if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); @@ -430,12 +426,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable { } final long inBrokerCheckSum = request.getBrokerCheckSum(); checkNodeStatus(producerId, strBuff); - CertifiedResult authorizeResult = - serverAuthHandler.validProducerAuthorizeInfo( - certResult.userName, transTopicSet, rmtAddress); - if (!authorizeResult.result) { - builder.setErrCode(authorizeResult.errCode); - builder.setErrMsg(authorizeResult.errInfo); + if (!serverAuthHandler.validProducerAuthorizeInfo( + certifiedInfo.getUserName(), transTopicSet, rmtAddress, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } try { @@ -454,7 +448,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { final Tuple3<Long, Integer, Map<String, String>> prodTopicConfigTuple = getTopicConfigureInfos(producerId, false); builder.setAuthorizedInfo(genAuthorizedInfo( - certResult.authorizedToken, false).build()); + certifiedInfo.getAuthorizedToken(), false).build()); builder.setBrokerCheckSum(brokerStaticInfo.getF0()); if (brokerStaticInfo.getF0() != inBrokerCheckSum) { builder.addAllBrokerInfos(brokerStaticInfo.getF1().values()); @@ -498,11 +492,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable { final StringBuilder strBuff = new StringBuilder(512); CloseResponseM2P.Builder builder = CloseResponseM2P.newBuilder(); builder.setSuccess(false); - CertifiedResult certResult = - serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), true); - if (!certResult.result) { - builder.setErrCode(certResult.errCode); - builder.setErrMsg(certResult.errInfo); + if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), true, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { @@ -540,13 +532,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable { final StringBuilder strBuff = new StringBuilder(512); RegisterResponseM2C.Builder builder = RegisterResponseM2C.newBuilder(); builder.setSuccess(false); - CertifiedResult certResult = - serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false); - if (!certResult.result) { - builder.setErrCode(certResult.errCode); - builder.setErrMsg(certResult.errInfo); + if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } + CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData(); if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); @@ -618,12 +609,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable { return builder.build(); } ConsumerInfo inConsumerInfo2 = (ConsumerInfo) result.getRetData(); - CertifiedResult authorizeResult = - serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName, - groupName, reqTopicSet, reqTopicConditions, rmtAddress); - if (!authorizeResult.result) { - builder.setErrCode(authorizeResult.errCode); - builder.setErrMsg(authorizeResult.errInfo); + if (!serverAuthHandler.validConsumerAuthorizeInfo(certifiedInfo.getUserName(), + groupName, reqTopicSet, reqTopicConditions, rmtAddress, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } // need removed for authorize center begin @@ -708,7 +697,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { } } } - builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false)); + builder.setAuthorizedInfo(genAuthorizedInfo(certifiedInfo.getAuthorizedToken(), false)); builder.setNotAllocated(consumeGroupInfo.isNotAllocate()); builder.setSuccess(true); builder.setErrCode(TErrCodeConstants.SUCCESS); @@ -736,13 +725,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable { HeartResponseM2C.Builder builder = HeartResponseM2C.newBuilder(); builder.setSuccess(false); // identity valid - CertifiedResult certResult = - serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false); - if (!certResult.result) { - builder.setErrCode(certResult.errCode); - builder.setErrMsg(certResult.errInfo); + if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } + CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData(); if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); @@ -764,13 +752,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable { return builder.build(); } // authorize check - CertifiedResult authorizeResult = - serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName, - groupName, consumeGroupInfo.getTopicSet(), - consumeGroupInfo.getTopicConditions(), rmtAddress); - if (!authorizeResult.result) { - builder.setErrCode(authorizeResult.errCode); - builder.setErrMsg(authorizeResult.errInfo); + if (!serverAuthHandler.validConsumerAuthorizeInfo(certifiedInfo.getUserName(), + groupName, consumeGroupInfo.getTopicSet(), + consumeGroupInfo.getTopicConditions(), rmtAddress, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } // heartbeat check @@ -879,7 +865,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { } } } - builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false)); + builder.setAuthorizedInfo(genAuthorizedInfo(certifiedInfo.getAuthorizedToken(), false)); builder.setNotAllocated(consumeGroupInfo.isNotAllocate()); builder.setSuccess(true); builder.setErrCode(TErrCodeConstants.SUCCESS); @@ -904,11 +890,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable { StringBuilder strBuff = new StringBuilder(512); CloseResponseM2C.Builder builder = CloseResponseM2C.newBuilder(); builder.setSuccess(false); - CertifiedResult certResult = - serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false); - if (!certResult.result) { - builder.setErrCode(certResult.errCode); - builder.setErrMsg(certResult.errInfo); + if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { @@ -949,21 +933,19 @@ public class TMaster extends HasThread implements MasterService, Stoppable { final String rmtAddress, boolean overtls) throws Exception { // #lizard forgives + ProcessResult result = new ProcessResult(); + final StringBuilder strBuff = new StringBuilder(512); RegisterResponseM2B.Builder builder = RegisterResponseM2B.newBuilder(); builder.setSuccess(false); builder.setStopRead(false); builder.setStopWrite(false); builder.setTakeConfInfo(false); // auth - CertifiedResult cfResult = - serverAuthHandler.identityValidBrokerInfo(request.getAuthInfo()); - if (!cfResult.result) { - builder.setErrCode(cfResult.errCode); - builder.setErrMsg(cfResult.errInfo); + if (!serverAuthHandler.identityValidBrokerInfo(request.getAuthInfo(), result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - ProcessResult result = new ProcessResult(); - final StringBuilder strBuff = new StringBuilder(512); // get clientId and check valid if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { builder.setErrCode(result.getErrCode()); @@ -1061,6 +1043,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable { final String rmtAddress, boolean overtls) throws Exception { // #lizard forgives + ProcessResult result = new ProcessResult(); + final StringBuilder strBuff = new StringBuilder(512); // set response field HeartResponseM2B.Builder builder = HeartResponseM2B.newBuilder(); builder.setSuccess(false); @@ -1074,15 +1058,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable { builder.setCurBrokerConfId(TBaseConstants.META_VALUE_UNDEFINED); builder.setConfCheckSumId(TBaseConstants.META_VALUE_UNDEFINED); // identity broker info - CertifiedResult certResult = - serverAuthHandler.identityValidBrokerInfo(request.getAuthInfo()); - if (!certResult.result) { - builder.setErrCode(certResult.errCode); - builder.setErrMsg(certResult.errInfo); + if (!serverAuthHandler.identityValidBrokerInfo(request.getAuthInfo(), result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } - ProcessResult result = new ProcessResult(); - final StringBuilder strBuff = new StringBuilder(512); if (!PBParameterUtils.checkBrokerId(request.getBrokerId(), strBuff, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); @@ -1175,11 +1155,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable { StringBuilder strBuff = new StringBuilder(512); CloseResponseM2B.Builder builder = CloseResponseM2B.newBuilder(); builder.setSuccess(false); - CertifiedResult cfResult = - serverAuthHandler.identityValidBrokerInfo(request.getAuthInfo()); - if (!cfResult.result) { - builder.setErrCode(cfResult.errCode); - builder.setErrMsg(cfResult.errInfo); + if (!serverAuthHandler.identityValidBrokerInfo(request.getAuthInfo(), result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } if (!PBParameterUtils.checkBrokerId(request.getBrokerId(), strBuff, result)) { @@ -1216,13 +1194,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable { ProcessResult result = new ProcessResult(); final StringBuilder strBuff = new StringBuilder(512); RegisterResponseM2CV2.Builder builder = RegisterResponseM2CV2.newBuilder(); - CertifiedResult certResult = - serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false); - if (!certResult.result) { - builder.setErrCode(certResult.errCode); - builder.setErrMsg(certResult.errInfo); + if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } + CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData(); if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); @@ -1298,12 +1275,10 @@ public class TMaster extends HasThread implements MasterService, Stoppable { builder.setErrMsg(result.getErrMsg()); return builder.build(); } - CertifiedResult authorizeResult = - serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName, - groupName, reqTopicSet, reqTopicConditions, rmtAddress); - if (!authorizeResult.result) { - builder.setErrCode(authorizeResult.errCode); - builder.setErrMsg(authorizeResult.errInfo); + if (!serverAuthHandler.validConsumerAuthorizeInfo(certifiedInfo.getUserName(), + groupName, reqTopicSet, reqTopicConditions, rmtAddress, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } Integer lid = null; @@ -1366,7 +1341,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { builder.addAllBrokerConfigList(brokerStaticInfo.getF1().values()); } builder.setOpsTaskInfo(buildOpsTaskInfo(consumeGroupInfo, inConsumerInfo, opsTaskInfo)); - builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false)); + builder.setAuthorizedInfo(genAuthorizedInfo(certifiedInfo.getAuthorizedToken(), false)); builder.setErrCode(TErrCodeConstants.SUCCESS); builder.setErrMsg("OK!"); return builder.build(); @@ -1390,13 +1365,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable { // response HeartResponseM2CV2.Builder builder = HeartResponseM2CV2.newBuilder(); // identity valid - CertifiedResult certResult = - serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false); - if (!certResult.result) { - builder.setErrCode(certResult.errCode); - builder.setErrMsg(certResult.errInfo); + if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } + CertifiedInfo certifiedInfo = (CertifiedInfo) result.getRetData(); if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, result)) { builder.setErrCode(result.getErrCode()); builder.setErrMsg(result.getErrMsg()); @@ -1436,13 +1410,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable { return builder.build(); } // authorize check - CertifiedResult authorizeResult = - serverAuthHandler.validConsumerAuthorizeInfo(certResult.userName, - groupName, consumeGroupInfo.getTopicSet(), - consumeGroupInfo.getTopicConditions(), rmtAddress); - if (!authorizeResult.result) { - builder.setErrCode(authorizeResult.errCode); - builder.setErrMsg(authorizeResult.errInfo); + if (!serverAuthHandler.validConsumerAuthorizeInfo(certifiedInfo.getUserName(), + groupName, consumeGroupInfo.getTopicSet(), + consumeGroupInfo.getTopicConditions(), rmtAddress, result)) { + builder.setErrCode(result.getErrCode()); + builder.setErrMsg(result.getErrMsg()); return builder.build(); } // heartbeat check @@ -1488,7 +1460,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable { } } builder.setOpsTaskInfo(buildOpsTaskInfo(consumeGroupInfo, inConsumerInfo, opsTaskInfo)); - builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false)); + builder.setAuthorizedInfo(genAuthorizedInfo(certifiedInfo.getAuthorizedToken(), false)); builder.setErrCode(TErrCodeConstants.SUCCESS); builder.setErrMsg("OK!"); return builder.build(); @@ -1510,11 +1482,9 @@ public class TMaster extends HasThread implements MasterService, Stoppable { ProcessResult reslut = new ProcessResult(); StringBuilder strBuff = new StringBuilder(512); GetPartMetaResponseM2C.Builder builder = GetPartMetaResponseM2C.newBuilder(); - CertifiedResult certResult = - serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false); - if (!certResult.result) { - builder.setErrCode(certResult.errCode); - builder.setErrMsg(certResult.errInfo); + if (!serverAuthHandler.identityValidUserInfo(request.getAuthInfo(), false, reslut)) { + builder.setErrCode(reslut.getErrCode()); + builder.setErrMsg(reslut.getErrMsg()); return builder.build(); } if (!PBParameterUtils.checkClientId(request.getClientId(), strBuff, reslut)) {