This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 36a9017c1f [INLONG-11589][SDK] Optimize the implementation of proxy configuration management (#11590) 36a9017c1f is described below commit 36a9017c1f92e8a907b7654f2e997ce7287e259a Author: Goson Zhang <4675...@qq.com> AuthorDate: Mon Dec 9 11:36:06 2024 +0800 [INLONG-11589][SDK] Optimize the implementation of proxy configuration management (#11590) Co-authored-by: gosonzhang <gosonzh...@tencent.com> --- .../inlong/sdk/dataproxy/ConfigConstants.java | 48 +- .../inlong/sdk/dataproxy/DefaultMessageSender.java | 16 +- .../inlong/sdk/dataproxy/ProxyClientConfig.java | 328 +++--- .../sdk/dataproxy/config/ProxyConfigEntry.java | 33 +- .../sdk/dataproxy/config/ProxyConfigManager.java | 1111 +++++++++++--------- .../sdk/dataproxy/example/HttpClientExample.java | 5 +- .../sdk/dataproxy/example/TcpClientExample.java | 4 +- .../inlong/sdk/dataproxy/network/ClientMgr.java | 37 +- .../sdk/dataproxy/network/HttpProxySender.java | 20 +- .../inlong/sdk/dataproxy/network/Sender.java | 19 +- .../inlong/sdk/dataproxy/utils/ProxyUtils.java | 8 +- .../apache/inlong/sdk/dataproxy/utils/Tuple2.java | 71 ++ .../sdk/dataproxy/ProxyConfigManagerTest.java | 9 +- .../inlong/sdk/dirtydata/InlongSdkDirtySender.java | 2 +- 14 files changed, 995 insertions(+), 716 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java index 26f8d131b4..81b9f0dad0 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java @@ -21,6 +21,42 @@ public class ConfigConstants { public static final String PROXY_SDK_VERSION = "1.2.11"; + public static String HTTP = "http://";; + public static String HTTPS = "https://";; + + // dataproxy node config + public static final String MANAGER_DATAPROXY_API = "/inlong/manager/openapi/dataproxy/getIpList/"; + public static final String META_STORE_SUB_DIR = "/.inlong/"; + public static final String LOCAL_DP_CONFIG_FILE_SUFFIX = ".local"; + public static final String REMOTE_DP_CACHE_FILE_SUFFIX = ".proxyip"; + public static final String REMOTE_ENCRYPT_CACHE_FILE_SUFFIX = ".pubKey"; + // authorization key + public static final String BASIC_AUTH_HEADER = "authorization"; + + // config info sync interval in minutes + public static final int VAL_DEF_CONFIG_SYNC_INTERVAL_MIN = 3; + public static final int VAL_MIN_CONFIG_SYNC_INTERVAL_MIN = 1; + public static final int VAL_MAX_CONFIG_SYNC_INTERVAL_MIN = 30; + public static final long VAL_UNIT_MIN_TO_MS = 60 * 1000L; + // config info sync max retry if failure + public static final int VAL_DEF_RETRY_IF_CONFIG_SYNC_FAIL = 3; + public static final int VAL_MAX_RETRY_IF_CONFIG_SYNC_FAIL = 5; + // cache config expired time in ms + public static final long VAL_DEF_CACHE_CONFIG_EXPIRED_MS = 20 * 60 * 1000L; + // node force choose interval in ms + public static final long VAL_DEF_FORCE_CHOOSE_INR_MS = 10 * 60 * 1000L; + public static final long VAL_MIN_FORCE_CHOOSE_INR_MS = 30 * 1000L; + + // connection timeout in milliseconds + public static final int VAL_DEF_CONNECT_TIMEOUT_MS = 10000; + public static final int VAL_MIN_CONNECT_TIMEOUT_MS = 2000; + public static final int VAL_MAX_CONNECT_TIMEOUT_MS = 60000; + public static final int VAL_DEF_CONNECT_CLOSE_DELAY_MS = 500; + // socket timeout in milliseconds + public static final int VAL_DEF_SOCKET_TIMEOUT_MS = 20000; + public static final int VAL_MIN_SOCKET_TIMEOUT_MS = 2000; + public static final int VAL_MAX_SOCKET_TIMEOUT_MS = 60000; + public static final int ALIVE_CONNECTIONS = 3; public static final int MAX_TIMEOUT_CNT = 3; public static final int LOAD_THRESHOLD = 0; @@ -43,17 +79,11 @@ public class ConfigConstants { /* one hour interval */ public static final int PROXY_HTTP_UPDATE_INTERVAL_MINUTES = 60; - public static final int PROXY_UPDATE_MAX_RETRY = 10; - public static final int MAX_LINE_CNT = 30; - // connection timeout in milliseconds - public static final long VAL_DEF_CONNECT_TIMEOUT_MS = 20000L; - public static final long VAL_MIN_CONNECT_TIMEOUT_MS = 1L; - public static final long VAL_DEF_CONNECT_CLOSE_DELAY_MS = 500L; // request timeout in milliseconds public static final long VAL_DEF_REQUEST_TIMEOUT_MS = 10000L; - public static final long VAL_MIN_REQUEST_TIMEOUT_MS = 1L; + public static final long VAL_MIN_REQUEST_TIMEOUT_MS = 500L; public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216; public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216; @@ -65,14 +95,10 @@ public class ConfigConstants { public static final int FLAG_ALLOW_ENCRYPT = 1 << 6; public static final int FLAG_ALLOW_COMPRESS = 1 << 5; - public static final String MANAGER_DATAPROXY_API = "/inlong/manager/openapi/dataproxy/getIpList/"; public static LoadBalance DEFAULT_LOAD_BALANCE = LoadBalance.ROBIN; public static int DEFAULT_VIRTUAL_NODE = 1000; public static int DEFAULT_RANDOM_MAX_RETRY = 1000; - public static String HTTP = "http://";; - public static String HTTPS = "https://";; - public static int DEFAULT_SENDER_MAX_ATTEMPT = 1; /* Reserved attribute data size(bytes). */ diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java index dd22e63cce..b99f3726db 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java @@ -30,6 +30,7 @@ import org.apache.inlong.sdk.dataproxy.network.Sender; import org.apache.inlong.sdk.dataproxy.network.SequentialID; import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread; import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; +import org.apache.inlong.sdk.dataproxy.utils.Tuple2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,17 +108,20 @@ public class DefaultMessageSender implements MessageSender { } LOGGER.info("Initial tcp sender, configure is {}", configure); // initial sender object - ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure, null); - proxyConfigManager.setInlongGroupId(configure.getInlongGroupId()); - ProxyConfigEntry entry = proxyConfigManager.getGroupIdConfigure(); - DefaultMessageSender sender = CACHE_SENDER.get(entry.getClusterId()); + ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure); + Tuple2<ProxyConfigEntry, String> result = + proxyConfigManager.getGroupIdConfigure(true); + if (result.getF0() == null) { + throw new Exception(result.getF1()); + } + DefaultMessageSender sender = CACHE_SENDER.get(result.getF0().getClusterId()); if (sender != null) { return sender; } else { DefaultMessageSender tmpMessageSender = new DefaultMessageSender(configure, selfDefineFactory); - tmpMessageSender.setMaxPacketLength(entry.getMaxPacketLength()); - CACHE_SENDER.put(entry.getClusterId(), tmpMessageSender); + tmpMessageSender.setMaxPacketLength(result.getF0().getMaxPacketLength()); + CACHE_SENDER.put(result.getF0().getClusterId(), tmpMessageSender); return tmpMessageSender; } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java index ce8a3b3b39..412b1950c5 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java @@ -27,41 +27,40 @@ import org.apache.commons.lang3.StringUtils; @Data public class ProxyClientConfig { + private String managerIP = ""; + private int managerPort = 8099; + private boolean visitManagerByHttp = true; + private boolean onlyUseLocalProxyConfig = false; + private int managerConnTimeoutMs = ConfigConstants.VAL_DEF_CONNECT_TIMEOUT_MS; + private int managerSocketTimeoutMs = ConfigConstants.VAL_DEF_SOCKET_TIMEOUT_MS; + private long managerConfigSyncInrMs = + ConfigConstants.VAL_DEF_CONFIG_SYNC_INTERVAL_MIN * ConfigConstants.VAL_UNIT_MIN_TO_MS; + private int configSyncMaxRetryIfFail = ConfigConstants.VAL_DEF_RETRY_IF_CONFIG_SYNC_FAIL; + private String configStoreBasePath = System.getProperty("user.dir"); + // max expired time for config cache. + private long configCacheExpiredMs = ConfigConstants.VAL_DEF_CACHE_CONFIG_EXPIRED_MS; + // nodes force choose interval ms + private long forceReChooseInrMs = ConfigConstants.VAL_DEF_FORCE_CHOOSE_INR_MS; + private boolean enableAuthentication = false; + private String authSecretId = ""; + private String authSecretKey = ""; + private int aliveConnections; private int syncThreadPoolSize; private int asyncCallbackSize; - private int managerPort = 8099; - private String managerIP = ""; - private String managerAddress; - private String managerUrl = ""; - private int proxyUpdateIntervalMinutes; - private int proxyUpdateMaxRetry; private String inlongGroupId; - private boolean requestByHttp = true; private boolean isNeedDataEncry = false; - private boolean needAuthentication = false; - private String userName = ""; - private String secretKey = ""; private String rsaPubKeyUrl = ""; - private String confStoreBasePath = System.getProperty("user.dir") + "/.inlong/"; private String tlsServerCertFilePathAndName; private String tlsServerKey; private String tlsVersion = "TLSv1.2"; private int maxTimeoutCnt = ConfigConstants.MAX_TIMEOUT_CNT; - private String authSecretId; - private String authSecretKey; private String protocolType; // metric configure private MetricConfig metricConfig = new MetricConfig(); - private int managerConnectionTimeout = 10000; - // http socket timeout in milliseconds - private int managerSocketTimeout = 30 * 1000; - - private boolean readProxyIPFromLocal = false; - // connect timeout in milliseconds private long connectTimeoutMs = ConfigConstants.VAL_DEF_CONNECT_TIMEOUT_MS; // request timeout in milliseconds @@ -79,8 +78,6 @@ public class ProxyClientConfig { // interval for async worker in microseconds. private int asyncWorkerInterval = 500; private boolean cleanHttpCacheWhenClosing = false; - // max cache time for proxy config. - private long maxProxyCacheTimeInMs = 30 * 60 * 1000; private int ioThreadNum = Runtime.getRuntime().availableProcessors(); private boolean enableBusyWait = false; @@ -93,32 +90,30 @@ public class ProxyClientConfig { private int senderMaxAttempt = ConfigConstants.DEFAULT_SENDER_MAX_ATTEMPT; /* pay attention to the last url parameter ip */ - public ProxyClientConfig(String localHost, boolean requestByHttp, String managerIp, + public ProxyClientConfig(String localHost, boolean visitManagerByHttp, String managerIp, int managerPort, String inlongGroupId, String authSecretId, String authSecretKey, LoadBalance loadBalance, int virtualNode, int maxRetry) throws ProxysdkException { if (StringUtils.isBlank(localHost)) { throw new ProxysdkException("localHost is blank!"); } if (StringUtils.isBlank(managerIp)) { - throw new IllegalArgumentException("managerIp is Blank!"); + throw new ProxysdkException("managerIp is Blank!"); + } + if (managerPort <= 0) { + throw new ProxysdkException("managerPort <= 0!"); } if (StringUtils.isBlank(inlongGroupId)) { throw new ProxysdkException("groupId is blank!"); } - this.inlongGroupId = inlongGroupId; - this.requestByHttp = requestByHttp; + this.inlongGroupId = inlongGroupId.trim(); + this.visitManagerByHttp = visitManagerByHttp; this.managerPort = managerPort; this.managerIP = managerIp; - this.managerAddress = getManagerAddress(managerIp, managerPort, requestByHttp); - this.managerUrl = - getManagerUrl(managerAddress, inlongGroupId); IpUtils.validLocalIp(localHost); this.aliveConnections = ConfigConstants.ALIVE_CONNECTIONS; this.syncThreadPoolSize = ConfigConstants.SYNC_THREAD_POOL_SIZE; this.asyncCallbackSize = ConfigConstants.ASYNC_CALLBACK_SIZE; - this.proxyUpdateIntervalMinutes = ConfigConstants.PROXY_UPDATE_INTERVAL_MINUTES; this.proxyHttpUpdateIntervalMinutes = ConfigConstants.PROXY_HTTP_UPDATE_INTERVAL_MINUTES; - this.proxyUpdateMaxRetry = ConfigConstants.PROXY_UPDATE_MAX_RETRY; this.authSecretId = authSecretId; this.authSecretKey = authSecretKey; this.loadBalance = loadBalance; @@ -129,25 +124,15 @@ public class ProxyClientConfig { /* pay attention to the last url parameter ip */ public ProxyClientConfig(String managerAddress, String inlongGroupId, String authSecretId, String authSecretKey, LoadBalance loadBalance, int virtualNode, int maxRetry) throws ProxysdkException { - if (StringUtils.isBlank(managerAddress) || (!managerAddress.startsWith(ConfigConstants.HTTP) - && !managerAddress.startsWith(ConfigConstants.HTTPS))) { - throw new ProxysdkException("managerAddress is blank or missing http/https protocol "); - } + checkAndParseAddress(managerAddress); if (StringUtils.isBlank(inlongGroupId)) { throw new ProxysdkException("groupId is blank!"); } - if (managerAddress.startsWith(ConfigConstants.HTTPS)) { - this.requestByHttp = false; - } - this.managerAddress = managerAddress; - this.managerUrl = getManagerUrl(managerAddress, inlongGroupId); - this.inlongGroupId = inlongGroupId; + this.inlongGroupId = inlongGroupId.trim(); this.aliveConnections = ConfigConstants.ALIVE_CONNECTIONS; this.syncThreadPoolSize = ConfigConstants.SYNC_THREAD_POOL_SIZE; this.asyncCallbackSize = ConfigConstants.ASYNC_CALLBACK_SIZE; - this.proxyUpdateIntervalMinutes = ConfigConstants.PROXY_UPDATE_INTERVAL_MINUTES; this.proxyHttpUpdateIntervalMinutes = ConfigConstants.PROXY_HTTP_UPDATE_INTERVAL_MINUTES; - this.proxyUpdateMaxRetry = ConfigConstants.PROXY_UPDATE_MAX_RETRY; this.authSecretId = authSecretId; this.authSecretKey = authSecretKey; this.loadBalance = loadBalance; @@ -155,21 +140,9 @@ public class ProxyClientConfig { this.maxRetry = maxRetry; } - private String getManagerUrl(String managerAddress, String inlongGroupId) { - return managerAddress + ConfigConstants.MANAGER_DATAPROXY_API + inlongGroupId; - } - - private String getManagerAddress(String managerIp, int managerPort, boolean requestByHttp) { - String protocolType = ConfigConstants.HTTPS; - if (requestByHttp) { - protocolType = ConfigConstants.HTTP; - } - return protocolType + managerIp + ":" + managerPort; - } - - public ProxyClientConfig(String localHost, boolean requestByHttp, String managerIp, int managerPort, + public ProxyClientConfig(String localHost, boolean visitManagerByHttp, String managerIp, int managerPort, String inlongGroupId, String authSecretId, String authSecretKey) throws ProxysdkException { - this(localHost, requestByHttp, managerIp, managerPort, inlongGroupId, authSecretId, authSecretKey, + this(localHost, visitManagerByHttp, managerIp, managerPort, inlongGroupId, authSecretId, authSecretKey, ConfigConstants.DEFAULT_LOAD_BALANCE, ConfigConstants.DEFAULT_VIRTUAL_NODE, ConfigConstants.DEFAULT_RANDOM_MAX_RETRY); } @@ -181,40 +154,131 @@ public class ProxyClientConfig { ConfigConstants.DEFAULT_RANDOM_MAX_RETRY); } - public String getTlsServerCertFilePathAndName() { - return tlsServerCertFilePathAndName; + public String getManagerIP() { + return managerIP; } - public String getTlsServerKey() { - return tlsServerKey; + public int getManagerPort() { + return managerPort; } - public boolean isRequestByHttp() { - return requestByHttp; + public boolean isVisitManagerByHttp() { + return visitManagerByHttp; } - public String getInlongGroupId() { - return inlongGroupId; + public boolean isOnlyUseLocalProxyConfig() { + return onlyUseLocalProxyConfig; } - public void setInlongGroupId(String inlongGroupId) { - this.inlongGroupId = inlongGroupId; + public void setOnlyUseLocalProxyConfig(boolean onlyUseLocalProxyConfig) { + this.onlyUseLocalProxyConfig = onlyUseLocalProxyConfig; } - public int getManagerPort() { - return managerPort; + public boolean isEnableAuthentication() { + return this.enableAuthentication; } - public String getManagerIP() { - return managerIP; + public String getAuthSecretId() { + return authSecretId; } - public String getConfStoreBasePath() { - return confStoreBasePath; + public String getAuthSecretKey() { + return authSecretKey; } - public void setConfStoreBasePath(String confStoreBasePath) { - this.confStoreBasePath = confStoreBasePath; + public void setAuthenticationInfo(boolean needAuthentication, String secretId, String secretKey) { + this.enableAuthentication = needAuthentication; + if (!this.enableAuthentication) { + return; + } + if (StringUtils.isBlank(secretId)) { + throw new IllegalArgumentException("secretId is Blank!"); + } + if (StringUtils.isBlank(secretKey)) { + throw new IllegalArgumentException("secretKey is Blank!"); + } + this.authSecretId = secretId.trim(); + this.authSecretKey = secretKey.trim(); + } + + public long getManagerConfigSyncInrMs() { + return managerConfigSyncInrMs; + } + + public void setManagerConfigSyncInrMin(int managerConfigSyncInrMin) { + int tmpValue = + Math.min(ConfigConstants.VAL_MAX_CONFIG_SYNC_INTERVAL_MIN, + Math.max(ConfigConstants.VAL_MIN_CONFIG_SYNC_INTERVAL_MIN, managerConfigSyncInrMin)); + this.managerConfigSyncInrMs = tmpValue * ConfigConstants.VAL_UNIT_MIN_TO_MS; + } + + public int getManagerConnTimeoutMs() { + return managerConnTimeoutMs; + } + + public void setManagerConnTimeoutMs(int managerConnTimeoutMs) { + this.managerConnTimeoutMs = + Math.min(ConfigConstants.VAL_MAX_CONNECT_TIMEOUT_MS, + Math.max(ConfigConstants.VAL_MIN_CONNECT_TIMEOUT_MS, managerConnTimeoutMs)); + } + + public int getManagerSocketTimeoutMs() { + return managerSocketTimeoutMs; + } + + public void setManagerSocketTimeoutMs(int managerSocketTimeoutMs) { + this.managerSocketTimeoutMs = + Math.min(ConfigConstants.VAL_MAX_SOCKET_TIMEOUT_MS, + Math.max(ConfigConstants.VAL_MIN_SOCKET_TIMEOUT_MS, managerSocketTimeoutMs)); + } + + public int getConfigSyncMaxRetryIfFail() { + return configSyncMaxRetryIfFail; + } + + public void setConfigSyncMaxRetryIfFail(int configSyncMaxRetryIfFail) { + this.configSyncMaxRetryIfFail = + Math.min(configSyncMaxRetryIfFail, ConfigConstants.VAL_MAX_RETRY_IF_CONFIG_SYNC_FAIL); + } + + public String getConfigStoreBasePath() { + return configStoreBasePath; + } + + public void setConfigStoreBasePath(String configStoreBasePath) { + if (StringUtils.isBlank(configStoreBasePath)) { + return; + } + this.configStoreBasePath = configStoreBasePath.trim(); + } + + public long getConfigCacheExpiredMs() { + return configCacheExpiredMs; + } + + public void setConfigCacheExpiredMs(long configCacheExpiredMs) { + this.configCacheExpiredMs = configCacheExpiredMs; + } + + public long getForceReChooseInrMs() { + return forceReChooseInrMs; + } + + public void setForceReChooseInrMs(long forceReChooseInrMs) { + this.forceReChooseInrMs = + Math.max(ConfigConstants.VAL_MIN_FORCE_CHOOSE_INR_MS, forceReChooseInrMs); + } + + public String getTlsServerCertFilePathAndName() { + return tlsServerCertFilePathAndName; + } + + public String getTlsServerKey() { + return tlsServerKey; + } + + public String getInlongGroupId() { + return inlongGroupId; } public int getAliveConnections() { @@ -244,10 +308,6 @@ public class ProxyClientConfig { this.asyncCallbackSize = asyncCallbackSize; } - public String getManagerUrl() { - return managerUrl; - } - public int getMaxTimeoutCnt() { return maxTimeoutCnt; } @@ -259,22 +319,6 @@ public class ProxyClientConfig { this.maxTimeoutCnt = maxTimeoutCnt; } - public int getProxyUpdateIntervalMinutes() { - return proxyUpdateIntervalMinutes; - } - - public void setProxyUpdateIntervalMinutes(int proxyUpdateIntervalMinutes) { - this.proxyUpdateIntervalMinutes = proxyUpdateIntervalMinutes; - } - - public int getProxyUpdateMaxRetry() { - return proxyUpdateMaxRetry; - } - - public void setProxyUpdateMaxRetry(int proxyUpdateMaxRetry) { - this.proxyUpdateMaxRetry = proxyUpdateMaxRetry; - } - public long getConnectTimeoutMs() { return connectTimeoutMs; } @@ -315,26 +359,6 @@ public class ProxyClientConfig { return isNeedDataEncry; } - public boolean isNeedAuthentication() { - return this.needAuthentication; - } - - public void setAuthenticationInfo(boolean needAuthentication, boolean needDataEncry, - final String userName, final String secretKey) { - this.needAuthentication = needAuthentication; - this.isNeedDataEncry = needDataEncry; - if (this.needAuthentication || this.isNeedDataEncry) { - if (StringUtils.isBlank(userName)) { - throw new IllegalArgumentException("userName is Blank!"); - } - if (StringUtils.isBlank(secretKey)) { - throw new IllegalArgumentException("secretKey is Blank!"); - } - } - this.userName = userName.trim(); - this.secretKey = secretKey.trim(); - } - public void setHttpsInfo(String tlsServerCertFilePathAndName, String tlsServerKey) { if (StringUtils.isBlank(tlsServerCertFilePathAndName)) { throw new IllegalArgumentException("tlsServerCertFilePathAndName is Blank!"); @@ -354,22 +378,6 @@ public class ProxyClientConfig { this.tlsVersion = tlsVersion; } - public String getUserName() { - return userName; - } - - public String getSecretKey() { - return secretKey; - } - - public boolean isReadProxyIPFromLocal() { - return readProxyIPFromLocal; - } - - public void setReadProxyIPFromLocal(boolean readProxyIPFromLocal) { - this.readProxyIPFromLocal = readProxyIPFromLocal; - } - public int getProxyHttpUpdateIntervalMinutes() { return proxyHttpUpdateIntervalMinutes; } @@ -402,14 +410,6 @@ public class ProxyClientConfig { this.asyncWorkerInterval = asyncWorkerInterval; } - public int getManagerSocketTimeout() { - return managerSocketTimeout; - } - - public void setManagerSocketTimeout(int managerSocketTimeout) { - this.managerSocketTimeout = managerSocketTimeout; - } - public boolean isCleanHttpCacheWhenClosing() { return cleanHttpCacheWhenClosing; } @@ -418,22 +418,6 @@ public class ProxyClientConfig { this.cleanHttpCacheWhenClosing = cleanHttpCacheWhenClosing; } - public long getMaxProxyCacheTimeInMs() { - return maxProxyCacheTimeInMs; - } - - public void setMaxProxyCacheTimeInMs(long maxProxyCacheTimeInMs) { - this.maxProxyCacheTimeInMs = maxProxyCacheTimeInMs; - } - - public int getManagerConnectionTimeout() { - return managerConnectionTimeout; - } - - public void setManagerConnectionTimeout(int managerConnectionTimeout) { - this.managerConnectionTimeout = managerConnectionTimeout; - } - public MetricConfig getMetricConfig() { return metricConfig; } @@ -495,4 +479,40 @@ public class ProxyClientConfig { public void setSenderAttempt(int senderMaxAttempt) { this.senderMaxAttempt = senderMaxAttempt; } + + private void checkAndParseAddress(String managerAddress) throws ProxysdkException { + if (StringUtils.isBlank(managerAddress) + || (!managerAddress.startsWith(ConfigConstants.HTTP) + && !managerAddress.startsWith(ConfigConstants.HTTPS))) { + throw new ProxysdkException("managerAddress is blank or missing http/https protocol"); + } + String hostPortInfo; + if (managerAddress.startsWith(ConfigConstants.HTTPS)) { + this.visitManagerByHttp = false; + hostPortInfo = managerAddress.substring(ConfigConstants.HTTPS.length() + 1); + } else { + hostPortInfo = managerAddress.substring(ConfigConstants.HTTP.length() + 1); + } + if (StringUtils.isBlank(hostPortInfo)) { + throw new ProxysdkException("managerAddress must include host:port info!"); + } + String[] fields = hostPortInfo.split(":"); + if (fields.length == 1) { + throw new ProxysdkException("managerAddress must include port info!"); + } else if (fields.length > 2) { + throw new ProxysdkException("managerAddress must only include host:port info!"); + } + if (StringUtils.isBlank(fields[0])) { + throw new ProxysdkException("managerAddress's host is blank!"); + } + this.managerIP = fields[0].trim(); + if (StringUtils.isBlank(fields[1])) { + throw new ProxysdkException("managerAddress's port is blank!"); + } + try { + this.managerPort = Integer.parseInt(fields[1]); + } catch (Throwable ex) { + throw new ProxysdkException("managerAddress's port must be number!"); + } + } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java index 441c62abe5..e37b9b2c0a 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java @@ -17,13 +17,14 @@ package org.apache.inlong.sdk.dataproxy.config; +import org.apache.commons.lang3.builder.ToStringBuilder; + import java.util.Map; public class ProxyConfigEntry implements java.io.Serializable { private int clusterId; private String groupId; - private int size; private Map<String, HostInfo> hostMap; private int load; private int switchStat; @@ -59,16 +60,14 @@ public class ProxyConfigEntry implements java.io.Serializable { } public void setHostMap(Map<String, HostInfo> hostMap) { - this.size = hostMap.size(); this.hostMap = hostMap; } - - public int getSize() { - return size; + public boolean isNodesEmpty() { + return this.hostMap.isEmpty(); } - public void setSize(int size) { - this.size = size; + public int getSize() { + return hostMap.size(); } public String getGroupId() { @@ -87,13 +86,6 @@ public class ProxyConfigEntry implements java.io.Serializable { isInterVisit = interVisit; } - @Override - public String toString() { - return "ProxyConfigEntry [hostMap=" + hostMap + ", load=" + load + ", size=" + size + ", isInterVisit=" - + isInterVisit + ", groupId=" + groupId + ", switch=" + switchStat + ", maxPacketLength=" - + maxPacketLength + "]"; - } - public int getClusterId() { return clusterId; } @@ -101,4 +93,17 @@ public class ProxyConfigEntry implements java.io.Serializable { public void setClusterId(int clusterId) { this.clusterId = clusterId; } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("clusterId", clusterId) + .append("groupId", groupId) + .append("hostMap", hostMap) + .append("load", load) + .append("switchStat", switchStat) + .append("isInterVisit", isInterVisit) + .append("maxPacketLength", maxPacketLength) + .toString(); + } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java index d259cdff08..986c59457e 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java @@ -21,20 +21,18 @@ import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo; import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse; import org.apache.inlong.common.util.BasicAuth; import org.apache.inlong.sdk.dataproxy.ConfigConstants; -import org.apache.inlong.sdk.dataproxy.LoadBalance; import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; import org.apache.inlong.sdk.dataproxy.network.ClientMgr; -import org.apache.inlong.sdk.dataproxy.network.HashRing; import org.apache.inlong.sdk.dataproxy.network.IpUtils; +import org.apache.inlong.sdk.dataproxy.utils.LogCounter; +import org.apache.inlong.sdk.dataproxy.utils.Tuple2; import com.google.gson.Gson; -import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.stream.JsonReader; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.Header; @@ -44,7 +42,6 @@ import org.apache.http.client.config.RequestConfig; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.HttpPost; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; -import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.impl.client.HttpClients; @@ -67,7 +64,7 @@ import java.io.FileReader; import java.io.FileWriter; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.security.KeyManagementException; @@ -77,8 +74,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; /** @@ -89,161 +87,199 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; */ public class ProxyConfigManager extends Thread { - public static final String APPLICATION_JSON = "application/json"; private static final Logger logger = LoggerFactory.getLogger(ProxyConfigManager.class); - private final ProxyClientConfig clientConfig; - private final ClientMgr clientManager; - private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock(); + private static final LogCounter exptCounter = new LogCounter(10, 100000, 60 * 1000L); + private static final LogCounter parseCounter = new LogCounter(10, 100000, 60 * 1000L); + private static final ReentrantReadWriteLock fileRw = new ReentrantReadWriteLock(); + + private final String callerId; + private ProxyClientConfig clientConfig; private final Gson gson = new Gson(); - private final HashRing hashRing = HashRing.getInstance(); - private List<HostInfo> proxyInfoList = new ArrayList<HostInfo>(); - /* the status of the cluster.if this value is changed,we need rechoose three proxy */ + private final ClientMgr clientManager; + private final ThreadLocalRandom random = ThreadLocalRandom.current(); + private final AtomicBoolean shutDown = new AtomicBoolean(false); + // proxy configure info + private String localProxyConfigStoreFile; + private String proxyConfigVisitUrl; + private String proxyConfigCacheFile; + private List<HostInfo> proxyInfoList = new ArrayList<>(); private int oldStat = 0; - private String inlongGroupId; private String localMd5; - private boolean bShutDown = false; - private long lstUpdatedTime = 0; + private long lstUpdateTime = 0; + // encrypt configure info + private String encryptConfigVisitUrl; + private String encryptConfigCacheFile; private EncryptConfigEntry userEncryptConfigEntry; - public ProxyConfigManager(final ProxyClientConfig configure, final ClientMgr clientManager) { - this.clientConfig = configure; - this.clientManager = clientManager; - this.hashRing.setVirtualNode(configure.getVirtualNode()); + public ProxyConfigManager(ProxyClientConfig configure) { + this("MetaQuery", configure, null); } - public String getInlongGroupId() { - return inlongGroupId; + public ProxyConfigManager(String callerId, ProxyClientConfig configure, ClientMgr clientManager) { + this.callerId = callerId; + this.clientManager = clientManager; + this.storeAndBuildMetaConfigure(configure); + if (this.clientManager != null) { + this.setName("ConfigManager-" + this.callerId); + logger.info("ConfigManager({}) started, groupId={}", + this.callerId, clientConfig.getInlongGroupId()); + } } - public void setInlongGroupId(String inlongGroupId) { - this.inlongGroupId = inlongGroupId; + /** + * Update proxy client configure for query case + * + * @param configure proxy client configure + * @throws Exception exception + */ + public void updProxyClientConfig(ProxyClientConfig configure) throws Exception { + if (configure == null) { + throw new Exception("ProxyClientConfig is null"); + } + if (this.clientManager != null) { + throw new Exception("Not allowed for non meta-query case!"); + } + if (shutDown.get()) { + return; + } + this.storeAndBuildMetaConfigure(configure); } public void shutDown() { - logger.info("Begin to shut down ProxyConfigManager!"); - bShutDown = true; - } - - @Override - public void run() { - while (!bShutDown) { - try { - doProxyEntryQueryWork(); - updateEncryptConfigEntry(); - logger.info("ProxyConf update!"); - } catch (Throwable e) { - logger.error("Refresh proxy ip list runs into exception {}, {}", e.toString(), e.getStackTrace()); - e.printStackTrace(); - } - - /* Sleep some time.240-360s */ - try { - Random random = new Random(); - int proxyUpdateIntervalSec = this.clientConfig.getProxyUpdateIntervalMinutes() * 60; - - int sleepTimeSec = proxyUpdateIntervalSec; - if (proxyUpdateIntervalSec > 5) { - sleepTimeSec = proxyUpdateIntervalSec + random.nextInt() % (proxyUpdateIntervalSec / 5); - } - logger.info("sleep time {}", sleepTimeSec); - Thread.sleep(sleepTimeSec * 1000); - } catch (Throwable e2) { - // - } + if (clientManager == null) { + return; + } + if (shutDown.compareAndSet(false, true)) { + this.interrupt(); + logger.info("ConfigManager({}) begin to shutdown, groupId={}!", + this.callerId, clientConfig.getInlongGroupId()); } - logger.info("ProxyConfigManager worker existed!"); } /** - * try to read cache of proxy entry + * get groupId config * - * @return + * @return proxyConfigEntry + * @throws Exception ex */ - private ProxyConfigEntry tryToReadCacheProxyEntry(String configCachePath) { - rw.readLock().lock(); - try { - File file = new File(configCachePath); - long diffTime = System.currentTimeMillis() - file.lastModified(); - - if (diffTime < clientConfig.getMaxProxyCacheTimeInMs()) { - JsonReader reader = new JsonReader(new FileReader(configCachePath)); - ProxyConfigEntry proxyConfigEntry = gson.fromJson(reader, ProxyConfigEntry.class); - logger.info("{} has a backup! {}", inlongGroupId, proxyConfigEntry); - return proxyConfigEntry; - } - } catch (Exception ex) { - logger.warn("try to read local cache, caught {}", ex.getMessage()); - } finally { - rw.readLock().unlock(); + public Tuple2<ProxyConfigEntry, String> getGroupIdConfigure(boolean needRetry) throws Exception { + if (shutDown.get()) { + return new Tuple2<>(null, "SDK has shutdown!"); } - return null; - } - - private void tryToWriteCacheProxyEntry(ProxyConfigEntry entry, String configCachePath) { - rw.writeLock().lock(); - try { - File file = new File(configCachePath); - if (!file.getParentFile().exists()) { - // try to create parent - file.getParentFile().mkdirs(); + if (clientConfig.isOnlyUseLocalProxyConfig()) { + return getLocalProxyListFromFile(this.localProxyConfigStoreFile); + } else { + boolean readFromRmt = false; + Tuple2<ProxyConfigEntry, String> result; + result = tryToReadCacheProxyEntry(); + if (result.getF0() == null) { + int retryCount = 0; + do { + result = requestProxyEntryQuietly(); + if (result.getF0() != null || !needRetry || shutDown.get()) { + if (result.getF0() != null) { + readFromRmt = true; + } + break; + } + // sleep then retry + TimeUnit.MILLISECONDS.sleep(500); + } while (++retryCount < clientConfig.getConfigSyncMaxRetryIfFail()); } - logger.info("try to write {}} to local cache {}", entry, configCachePath); - FileWriter fileWriter = new FileWriter(configCachePath); - gson.toJson(entry, fileWriter); - fileWriter.flush(); - fileWriter.close(); - } catch (Exception ex) { - logger.warn("try to write local cache, caught {}", ex.getMessage()); - } finally { - rw.writeLock().unlock(); - } - } - - private ProxyConfigEntry requestProxyEntryQuietly() { - try { - return requestProxyList(this.clientConfig.getManagerUrl()); - } catch (Exception e) { - logger.warn("try to request proxy list by http, caught {}", e.getMessage()); + if (shutDown.get()) { + return new Tuple2<>(null, "SDK has shutdown!"); + } + if (result.getF0() == null) { + return new Tuple2<>(null, "Visit manager error:" + result.getF1()); + } else if (readFromRmt) { + tryToWriteCacheProxyEntry(result.getF0()); + } + return result; } - return null; } /** - * get groupId config + * get encrypt config * * @return proxyConfigEntry - * @throws Exception + * @throws Exception ex */ - public ProxyConfigEntry getGroupIdConfigure() throws Exception { - ProxyConfigEntry proxyEntry; - String configAddr = clientConfig.getConfStoreBasePath() + inlongGroupId; - if (this.clientConfig.isReadProxyIPFromLocal()) { - configAddr = configAddr + ".local"; - proxyEntry = getLocalProxyListFromFile(configAddr); - } else { - configAddr = configAddr + ".proxyip"; - - proxyEntry = tryToReadCacheProxyEntry(configAddr); - if (proxyEntry == null) { - proxyEntry = requestProxyEntryQuietly(); - int requestCount = 0; - - while (requestCount < 3 && proxyEntry == null) { - proxyEntry = requestProxyEntryQuietly(); - requestCount += 1; - if (proxyEntry == null) { - // sleep then retry - TimeUnit.MILLISECONDS.sleep(500); + public Tuple2<EncryptConfigEntry, String> getEncryptConfigure(boolean needRetry) throws Exception { + if (!clientConfig.isNeedDataEncry()) { + return new Tuple2<>(null, "Not need data encrypt!"); + } + if (shutDown.get()) { + return new Tuple2<>(null, "SDK has shutdown!"); + } + EncryptConfigEntry encryptEntry = this.userEncryptConfigEntry; + if (encryptEntry != null) { + return new Tuple2<>(encryptEntry, "Ok"); + } + boolean readFromRmt = false; + Tuple2<EncryptConfigEntry, String> result = readCachedPubKeyEntry(); + if (result.getF0() == null) { + int retryCount = 0; + do { + result = requestPubKeyFromManager(); + if (result.getF0() != null || !needRetry || shutDown.get()) { + if (result.getF0() != null) { + readFromRmt = true; } + break; + } + // sleep then retry + TimeUnit.MILLISECONDS.sleep(500); + } while (++retryCount < clientConfig.getConfigSyncMaxRetryIfFail()); + } + if (shutDown.get()) { + return new Tuple2<>(null, "SDK has shutdown!"); + } + if (result.getF0() == null) { + return new Tuple2<>(null, "Visit manager error:" + result.getF1()); + } else if (readFromRmt) { + updateEncryptConfigEntry(result.getF0()); + writeCachePubKeyEntryFile(result.getF0()); + } + return result; + } + + @Override + public void run() { + logger.info("ConfigManager({}) thread start, groupId={}", + this.callerId, clientConfig.getInlongGroupId()); + while (!shutDown.get()) { + // update proxy nodes meta configures + try { + doProxyEntryQueryWork(); + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.warn("ConfigManager({}) refresh proxy configure exception, groupId={}", + this.callerId, clientConfig.getInlongGroupId(), ex); } } - if (proxyEntry == null) { - throw new Exception("Visit manager error, please check log!"); - } else { - tryToWriteCacheProxyEntry(proxyEntry, configAddr); + // update encrypt configure + if (clientConfig.isNeedDataEncry()) { + try { + doEncryptConfigEntryQueryWork(); + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.warn("ConfigManager({}) refresh encrypt info exception, groupId={}", + this.callerId, clientConfig.getInlongGroupId(), ex); + } + } + } + if (shutDown.get()) { + break; + } + // sleep some time + try { + Thread.sleep(clientConfig.getManagerConfigSyncInrMs() + random.nextInt(100) * 100); + } catch (Throwable e2) { + // } } - return proxyEntry; + logger.info("ConfigManager({}) worker existed, groupId={}", + this.callerId, this.clientConfig.getInlongGroupId()); } /** @@ -252,55 +288,140 @@ public class ProxyConfigManager extends Thread { * @throws Exception */ public void doProxyEntryQueryWork() throws Exception { + if (shutDown.get()) { + return; + } /* Request the configuration from manager. */ if (localMd5 == null) { localMd5 = calcHostInfoMd5(proxyInfoList); } - ProxyConfigEntry proxyEntry = null; - String configAddr = clientConfig.getConfStoreBasePath() + inlongGroupId; - if (clientConfig.isReadProxyIPFromLocal()) { - configAddr = configAddr + ".local"; - proxyEntry = getLocalProxyListFromFile(configAddr); + Tuple2<ProxyConfigEntry, String> result; + if (clientConfig.isOnlyUseLocalProxyConfig()) { + result = getLocalProxyListFromFile(this.localProxyConfigStoreFile); } else { - /* Do a compare and see if it needs to re-choose the channel. */ - configAddr = configAddr + ".managerip"; - int retryCount = 1; - while (proxyEntry == null && retryCount < this.clientConfig.getProxyUpdateMaxRetry()) { - proxyEntry = requestProxyEntryQuietly(); - retryCount++; - if (proxyEntry == null) { - // sleep then retry. - TimeUnit.SECONDS.sleep(1); + int retryCnt = 0; + do { + result = requestProxyEntryQuietly(); + if (result.getF0() != null || shutDown.get()) { + break; } + // sleep then retry. + TimeUnit.SECONDS.sleep(2); + } while (++retryCnt < this.clientConfig.getConfigSyncMaxRetryIfFail() && !shutDown.get()); + if (shutDown.get()) { + return; } - if (proxyEntry != null) { - tryToWriteCacheProxyEntry(proxyEntry, configAddr); + if (result.getF0() != null) { + tryToWriteCacheProxyEntry(result.getF0()); } - /* We should exit if no local IP list and can't request it from manager. */ - if (localMd5 == null && proxyEntry == null) { - logger.error("Can't connect manager at the start of proxy API {}", - this.clientConfig.getManagerUrl()); - proxyEntry = tryToReadCacheProxyEntry(configAddr); + /* We should exit if no local IP list and can't request it from TDManager. */ + if (localMd5 == null && result.getF0() == null) { + if (exptCounter.shouldPrint()) { + logger.warn("ConfigManager({}) connect manager({}) failure, get cached configure, groupId={}", + this.callerId, this.proxyConfigVisitUrl, this.clientConfig.getInlongGroupId()); + } + result = tryToReadCacheProxyEntry(); } - if (localMd5 != null && proxyEntry == null && proxyInfoList != null) { - StringBuffer s = new StringBuffer(); - for (HostInfo tmp : proxyInfoList) { - s.append(tmp.getHostName()).append(";").append(tmp.getPortNumber()) - .append(","); + if (localMd5 != null && result.getF0() == null && proxyInfoList != null) { + if (exptCounter.shouldPrint()) { + logger.warn("ConfigManager({}) connect manager({}) failure, using the last configure, groupId={}", + this.callerId, this.proxyConfigVisitUrl, this.clientConfig.getInlongGroupId()); } - logger.warn("Backup proxyEntry [{}]", s); } } - if (localMd5 == null && proxyEntry == null && proxyInfoList == null) { - if (clientConfig.isReadProxyIPFromLocal()) { - throw new Exception("Local proxy address configure " - + "read failure, please check first!"); + if (localMd5 == null && result.getF0() == null && proxyInfoList == null) { + if (clientConfig.isOnlyUseLocalProxyConfig()) { + throw new Exception("Read local proxy configure failure, please check first!"); } else { throw new Exception("Connect Manager failure, please check first!"); } } - compareProxyList(proxyEntry); + compareAndUpdateProxyList(result.getF0()); + } + + private void doEncryptConfigEntryQueryWork() throws Exception { + if (shutDown.get()) { + return; + } + int retryCount = 0; + Tuple2<EncryptConfigEntry, String> result; + do { + result = requestPubKeyFromManager(); + if (result.getF0() != null || shutDown.get()) { + break; + } + // sleep then retry + TimeUnit.MILLISECONDS.sleep(500); + } while (++retryCount < clientConfig.getConfigSyncMaxRetryIfFail()); + if (shutDown.get()) { + return; + } + if (result.getF0() == null) { + if (this.userEncryptConfigEntry != null) { + logger.warn("ConfigManager({}) connect manager({}) failure, using the last pubKey, secretId={}", + this.callerId, this.encryptConfigVisitUrl, this.clientConfig.getAuthSecretId()); + return; + } + throw new Exception("Visit manager error:" + result.getF1()); + } + updateEncryptConfigEntry(result.getF0()); + writeCachePubKeyEntryFile(result.getF0()); + } + + public Tuple2<ProxyConfigEntry, String> getLocalProxyListFromFile(String filePath) { + String strRet; + try { + byte[] fileBytes = Files.readAllBytes(Paths.get(filePath)); + strRet = new String(fileBytes); + } catch (Throwable ex) { + return new Tuple2<>(null, "Read local configure failure from " + + filePath + ", reason is " + ex.getMessage()); + } + if (StringUtils.isBlank(strRet)) { + return new Tuple2<>(null, "Blank configure local file from " + filePath); + } + return getProxyConfigEntry(strRet); + } + private Tuple2<ProxyConfigEntry, String> requestProxyEntryQuietly() { + List<BasicNameValuePair> params = buildProxyNodeQueryParams(); + // request meta info from manager + logger.debug("ConfigManager({}) request configure to manager({}), param={}", + this.callerId, this.proxyConfigVisitUrl, params); + Tuple2<Boolean, String> queryResult = requestConfiguration(this.proxyConfigVisitUrl, params); + if (!queryResult.getF0()) { + return new Tuple2<>(null, queryResult.getF1()); + } + // parse result + logger.debug("ConfigManager({}) received configure, from manager({}), groupId={}, result={}", + callerId, proxyConfigVisitUrl, clientConfig.getInlongGroupId(), queryResult.getF1()); + try { + return getProxyConfigEntry(queryResult.getF1()); + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.warn("ConfigManager({}) parse failure, from manager({}), groupId={}, result={}", + callerId, proxyConfigVisitUrl, clientConfig.getInlongGroupId(), queryResult.getF1(), ex); + } + return new Tuple2<>(null, ex.getMessage()); + } + } + + private String calcHostInfoMd5(List<HostInfo> hostInfoList) { + if (hostInfoList == null || hostInfoList.isEmpty()) { + return null; + } + Collections.sort(hostInfoList); + StringBuilder hostInfoMd5 = new StringBuilder(); + for (HostInfo hostInfo : hostInfoList) { + if (hostInfo == null) { + continue; + } + hostInfoMd5.append(hostInfo.getHostName()); + hostInfoMd5.append(":"); + hostInfoMd5.append(hostInfo.getPortNumber()); + hostInfoMd5.append(";"); + } + return DigestUtils.md5Hex(hostInfoMd5.toString()); } /** @@ -308,139 +429,206 @@ public class ProxyConfigManager extends Thread { * * @param proxyEntry */ - private void compareProxyList(ProxyConfigEntry proxyEntry) { - if (proxyEntry != null) { - logger.info("{}", proxyEntry.toString()); - if (proxyEntry.getSize() != 0) { - /* Initialize the current proxy information list first. */ - clientManager.setLoadThreshold(proxyEntry.getLoad()); - - List<HostInfo> newProxyInfoList = new ArrayList<HostInfo>(); - for (Map.Entry<String, HostInfo> entry : proxyEntry.getHostMap().entrySet()) { - newProxyInfoList.add(entry.getValue()); - } - - String newMd5 = calcHostInfoMd5(newProxyInfoList); - String oldMd5 = calcHostInfoMd5(proxyInfoList); - if (newMd5 != null && !newMd5.equals(oldMd5)) { - /* Choose random alive connections to send messages. */ - logger.info("old md5 {} new md5 {}", oldMd5, newMd5); - proxyInfoList.clear(); - proxyInfoList = newProxyInfoList; - clientManager.setProxyInfoList(proxyInfoList); - lstUpdatedTime = System.currentTimeMillis(); - } else if (proxyEntry.getSwitchStat() != oldStat) { - /* judge cluster's switch state */ - oldStat = proxyEntry.getSwitchStat(); - if ((System.currentTimeMillis() - lstUpdatedTime) > 3 * 60 * 1000) { - logger.info("switch the cluster!"); - proxyInfoList.clear(); - proxyInfoList = newProxyInfoList; - clientManager.setProxyInfoList(proxyInfoList); - } else { - logger.info("only change oldStat "); - } - } else { - newProxyInfoList.clear(); - logger.info("proxy IP list doesn't change, load {}", proxyEntry.getLoad()); - } - if (clientConfig.getLoadBalance() == LoadBalance.CONSISTENCY_HASH) { - updateHashRing(proxyInfoList); - } - } else { - logger.error("proxyEntry's size is zero"); + private void compareAndUpdateProxyList(ProxyConfigEntry proxyEntry) { + if ((proxyEntry == null || proxyEntry.isNodesEmpty()) + && (proxyInfoList.isEmpty() + || (System.currentTimeMillis() - lstUpdateTime) < clientConfig.getForceReChooseInrMs())) { + return; + } + int newSwitchStat; + List<HostInfo> newBusInfoList; + if (proxyEntry == null || proxyEntry.isNodesEmpty()) { + newSwitchStat = oldStat; + newBusInfoList = new ArrayList<>(proxyInfoList.size()); + newBusInfoList.addAll(proxyInfoList); + } else { + /* Initialize the current nodes information list first. */ + clientManager.setLoadThreshold(proxyEntry.getLoad()); + newSwitchStat = proxyEntry.getSwitchStat(); + newBusInfoList = new ArrayList<>(proxyEntry.getSize()); + for (Map.Entry<String, HostInfo> entry : proxyEntry.getHostMap().entrySet()) { + newBusInfoList.add(entry.getValue()); } } + String newMd5 = calcHostInfoMd5(newBusInfoList); + String oldMd5 = calcHostInfoMd5(proxyInfoList); + boolean nodeChanged = newMd5 != null && !newMd5.equals(oldMd5); + if (nodeChanged || newSwitchStat != oldStat + || (System.currentTimeMillis() - lstUpdateTime) >= clientConfig.getForceReChooseInrMs()) { + proxyInfoList = newBusInfoList; + clientManager.setProxyInfoList(proxyInfoList); + lstUpdateTime = System.currentTimeMillis(); + oldStat = newSwitchStat; + } } - public EncryptConfigEntry getEncryptConfigEntry(final String userName) { - if (StringUtils.isBlank(userName)) { - return null; + private void tryToWriteCacheProxyEntry(ProxyConfigEntry entry) { + logger.debug("ConfigManager({}) write {} to cache file ({})", + this.callerId, entry, this.proxyConfigCacheFile); + fileRw.writeLock().lock(); + try { + File file = new File(this.proxyConfigCacheFile); + if (!file.getParentFile().exists()) { + // try to create parent + file.getParentFile().mkdirs(); + } + FileWriter fileWriter = new FileWriter(this.proxyConfigCacheFile); + gson.toJson(entry, fileWriter); + fileWriter.flush(); + fileWriter.close(); + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.warn("ConfigManager({}) write cache file({}) exception, groupId={}, data={}", + this.callerId, this.clientConfig.getInlongGroupId(), + this.proxyConfigCacheFile, entry.toString(), ex); + } + } finally { + fileRw.writeLock().unlock(); } - EncryptConfigEntry encryptEntry = this.userEncryptConfigEntry; - if (encryptEntry == null) { - int retryCount = 0; - encryptEntry = requestPubKey(this.clientConfig.getRsaPubKeyUrl(), userName, false); - while (encryptEntry == null && retryCount < this.clientConfig.getProxyUpdateMaxRetry()) { - encryptEntry = requestPubKey(this.clientConfig.getRsaPubKeyUrl(), userName, false); - retryCount++; - } - if (encryptEntry == null) { - encryptEntry = getStoredPubKeyEntry(userName); - if (encryptEntry != null) { - encryptEntry.getRsaEncryptedKey(); - synchronized (this) { - if (this.userEncryptConfigEntry == null) { - this.userEncryptConfigEntry = encryptEntry; - } else { - encryptEntry = this.userEncryptConfigEntry; - } - } + } + + /** + * try to read cache of proxy entry + * + * @return read result + */ + private Tuple2<ProxyConfigEntry, String> tryToReadCacheProxyEntry() { + fileRw.readLock().lock(); + try { + File file = new File(this.proxyConfigCacheFile); + if (file.exists()) { + long diffTime = System.currentTimeMillis() - file.lastModified(); + if (clientConfig.getConfigCacheExpiredMs() > 0 + && diffTime < clientConfig.getConfigCacheExpiredMs()) { + JsonReader reader = new JsonReader(new FileReader(this.proxyConfigCacheFile)); + ProxyConfigEntry proxyConfigEntry = gson.fromJson(reader, ProxyConfigEntry.class); + return new Tuple2<>(proxyConfigEntry, "Ok"); } + return new Tuple2<>(null, "cache configure expired!"); } else { - synchronized (this) { - if (this.userEncryptConfigEntry == null || this.userEncryptConfigEntry != encryptEntry) { - storePubKeyEntry(encryptEntry); - encryptEntry.getRsaEncryptedKey(); - this.userEncryptConfigEntry = encryptEntry; - } else { - encryptEntry = this.userEncryptConfigEntry; - } - } + return new Tuple2<>(null, "no cache configure!"); + } + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.warn("ConfigManager({}) read cache file({}) exception, groupId={}", + this.callerId, this.proxyConfigCacheFile, this.clientConfig.getInlongGroupId(), ex); } + return new Tuple2<>(null, "read cache configure failure:" + ex.getMessage()); + } finally { + fileRw.readLock().unlock(); } - return encryptEntry; } - private void updateEncryptConfigEntry() { - if (StringUtils.isBlank(this.clientConfig.getUserName())) { - return; + private Tuple2<EncryptConfigEntry, String> requestPubKeyFromManager() { + List<BasicNameValuePair> params = buildPubKeyQueryParams(); + // request meta info from manager + logger.debug("ConfigManager({}) request pubkey to manager({}), param={}", + this.callerId, this.encryptConfigVisitUrl, params); + Tuple2<Boolean, String> queryResult = requestConfiguration(this.encryptConfigVisitUrl, params); + if (!queryResult.getF0()) { + return new Tuple2<>(null, queryResult.getF1()); + } + logger.debug("ConfigManager({}) received pubkey from manager({}), result={}", + this.callerId, this.encryptConfigVisitUrl, queryResult.getF1()); + JsonObject pubKeyConf; + try { + pubKeyConf = JsonParser.parseString(queryResult.getF1()).getAsJsonObject(); + } catch (Throwable ex) { + if (parseCounter.shouldPrint()) { + logger.warn("ConfigManager({}) parse failure, secretId={}, config={}!", + this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); + } + return new Tuple2<>(null, "parse pubkey failure:" + ex.getMessage()); } - int retryCount = 0; - EncryptConfigEntry encryptConfigEntry = requestPubKey(this.clientConfig.getRsaPubKeyUrl(), - this.clientConfig.getUserName(), false); - while (encryptConfigEntry == null && retryCount < this.clientConfig.getProxyUpdateMaxRetry()) { - encryptConfigEntry = requestPubKey(this.clientConfig.getRsaPubKeyUrl(), - this.clientConfig.getUserName(), false); - retryCount++; - } - if (encryptConfigEntry == null) { - return; + if (pubKeyConf == null) { + return new Tuple2<>(null, "No public key information"); + } + if (!pubKeyConf.has("resultCode")) { + if (parseCounter.shouldPrint()) { + logger.warn("ConfigManager({}) config failure: resultCode field not exist, secretId={}, config={}!", + this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); + } + return new Tuple2<>(null, "resultCode field not exist"); + } + int resultCode = pubKeyConf.get("resultCode").getAsInt(); + if (resultCode != 0) { + if (parseCounter.shouldPrint()) { + logger.warn("ConfigManager({}) config failure: resultCode != 0, secretId={}, config={}!", + this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); + } + return new Tuple2<>(null, "resultCode != 0!"); } - synchronized (this) { - if (this.userEncryptConfigEntry == null || this.userEncryptConfigEntry != encryptConfigEntry) { - storePubKeyEntry(encryptConfigEntry); - encryptConfigEntry.getRsaEncryptedKey(); - this.userEncryptConfigEntry = encryptConfigEntry; + if (!pubKeyConf.has("resultData")) { + if (parseCounter.shouldPrint()) { + logger.warn("ConfigManager({}) config failure: resultData field not exist, secretId={}, config={}!", + this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); } + return new Tuple2<>(null, "resultData field not exist"); } - return; + JsonObject resultData = pubKeyConf.get("resultData").getAsJsonObject(); + if (resultData != null) { + String publicKey = resultData.get("publicKey").getAsString(); + if (StringUtils.isBlank(publicKey)) { + if (parseCounter.shouldPrint()) { + logger.warn("ConfigManager({}) config failure: publicKey is blank, secretId={}, config={}!", + this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); + } + return new Tuple2<>(null, "publicKey is blank!"); + } + String username = resultData.get("username").getAsString(); + if (StringUtils.isBlank(username)) { + if (parseCounter.shouldPrint()) { + logger.warn("ConfigManager({}) config failure: username is blank, secretId={}, config={}!", + this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); + } + return new Tuple2<>(null, "username is blank!"); + } + String versionStr = resultData.get("version").getAsString(); + if (StringUtils.isBlank(versionStr)) { + if (parseCounter.shouldPrint()) { + logger.warn("ConfigManager({}) config failure: version is blank, secretId={}, config={}!", + this.callerId, this.clientConfig.getAuthSecretId(), queryResult.getF1()); + } + return new Tuple2<>(null, "version is blank!"); + } + return new Tuple2<>(new EncryptConfigEntry(username, versionStr, publicKey), "Ok"); + } + return new Tuple2<>(null, "resultData value is null!"); } - private EncryptConfigEntry getStoredPubKeyEntry(String userName) { - if (StringUtils.isBlank(userName)) { - logger.warn(" userName(" + userName + ") is not available"); - return null; - } - EncryptConfigEntry entry; + private void updateEncryptConfigEntry(EncryptConfigEntry newEncryptEntry) { + newEncryptEntry.getRsaEncryptedKey(); + this.userEncryptConfigEntry = newEncryptEntry; + } + + private Tuple2<EncryptConfigEntry, String> readCachedPubKeyEntry() { + ObjectInputStream is; FileInputStream fis = null; - ObjectInputStream is = null; - rw.readLock().lock(); + EncryptConfigEntry entry; + fileRw.readLock().lock(); try { - File file = new File(clientConfig.getConfStoreBasePath() + userName + ".pubKey"); + File file = new File(this.encryptConfigCacheFile); if (file.exists()) { - fis = new FileInputStream(file); - is = new ObjectInputStream(fis); - entry = (EncryptConfigEntry) is.readObject(); - // is.close(); - fis.close(); - return entry; + long diffTime = System.currentTimeMillis() - file.lastModified(); + if (clientConfig.getConfigCacheExpiredMs() > 0 + && diffTime < clientConfig.getConfigCacheExpiredMs()) { + fis = new FileInputStream(file); + is = new ObjectInputStream(fis); + entry = (EncryptConfigEntry) is.readObject(); + // is.close(); + fis.close(); + return new Tuple2<>(entry, "Ok"); + } + return new Tuple2<>(null, "cache PubKeyEntry expired!"); } else { - return null; + return new Tuple2<>(null, "no PubKeyEntry file!"); } - } catch (Throwable e1) { - logger.error("Read " + userName + " stored PubKeyEntry error ", e1); - return null; + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.warn("ConfigManager({}) read({}) file exception, secretId={}", + callerId, encryptConfigCacheFile, clientConfig.getAuthSecretId(), ex); + } + return new Tuple2<>(null, "read PubKeyEntry file failure:" + ex.getMessage()); } finally { if (fis != null) { try { @@ -449,16 +637,16 @@ public class ProxyConfigManager extends Thread { // } } - rw.readLock().unlock(); + fileRw.readLock().unlock(); } } - private void storePubKeyEntry(EncryptConfigEntry entry) { + private void writeCachePubKeyEntryFile(EncryptConfigEntry entry) { + ObjectOutputStream p; FileOutputStream fos = null; - ObjectOutputStream p = null; - rw.writeLock().lock(); + fileRw.writeLock().lock(); try { - File file = new File(clientConfig.getConfStoreBasePath() + entry.getUserName() + ".pubKey"); + File file = new File(this.encryptConfigCacheFile); if (!file.getParentFile().exists()) { file.getParentFile().mkdir(); } @@ -470,9 +658,11 @@ public class ProxyConfigManager extends Thread { p.writeObject(entry); p.flush(); // p.close(); - } catch (Throwable e) { - logger.error("store EncryptConfigEntry " + entry.toString() + " exception ", e); - e.printStackTrace(); + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.warn("ConfigManager({}) write file({}) exception, secretId={}, content={}", + callerId, encryptConfigCacheFile, clientConfig.getAuthSecretId(), entry.toString(), ex); + } } finally { if (fos != null) { try { @@ -481,153 +671,192 @@ public class ProxyConfigManager extends Thread { // } } - rw.writeLock().unlock(); + fileRw.writeLock().unlock(); } } - private String calcHostInfoMd5(List<HostInfo> hostInfoList) { - if (hostInfoList == null || hostInfoList.isEmpty()) { - return null; - } - Collections.sort(hostInfoList); - StringBuffer hostInfoMd5 = new StringBuffer(); - for (HostInfo hostInfo : hostInfoList) { - if (hostInfo == null) { - continue; + /* Request new configurations from Manager. */ + private Tuple2<Boolean, String> requestConfiguration(String url, List<BasicNameValuePair> params) { + HttpParams myParams = new BasicHttpParams(); + HttpConnectionParams.setConnectionTimeout(myParams, clientConfig.getManagerConnTimeoutMs()); + HttpConnectionParams.setSoTimeout(myParams, clientConfig.getManagerSocketTimeoutMs()); + CloseableHttpClient httpClient; + // build http(s) client + try { + if (this.clientConfig.isVisitManagerByHttp()) { + httpClient = new DefaultHttpClient(myParams); + } else { + httpClient = getCloseableHttpClient(params); } - hostInfoMd5.append(hostInfo.getHostName()); - hostInfoMd5.append(";"); - hostInfoMd5.append(hostInfo.getPortNumber()); - hostInfoMd5.append(";"); - } - - return DigestUtils.md5Hex(hostInfoMd5.toString()); - } - - private EncryptConfigEntry requestPubKey(String pubKeyUrl, String userName, boolean needGet) { - if (StringUtils.isBlank(userName)) { - logger.error("Queried userName is null!"); - return null; - } - List<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>(); - params.add(new BasicNameValuePair("operation", "query")); - params.add(new BasicNameValuePair("username", userName)); - String returnStr = requestConfiguration(pubKeyUrl, params); - if (StringUtils.isBlank(returnStr)) { - logger.info("No public key information returned from manager"); - return null; - } - JsonObject pubKeyConf = JsonParser.parseString(returnStr).getAsJsonObject(); - if (pubKeyConf == null) { - logger.info("No public key information returned from manager"); - return null; - } - if (!pubKeyConf.has("resultCode")) { - logger.info("Parse pubKeyConf failure: No resultCode key information returned from manager"); - return null; - } - int resultCode = pubKeyConf.get("resultCode").getAsInt(); - if (resultCode != 0) { - logger.info("query pubKeyConf failure, error code is " + resultCode + ", errInfo is " - + pubKeyConf.get("message").getAsString()); - return null; - } - if (!pubKeyConf.has("resultData")) { - logger.info("Parse pubKeyConf failure: No resultData key information returned from manager"); - return null; + } catch (Throwable eHttp) { + if (exptCounter.shouldPrint()) { + logger.warn("ConfigManager({}) create Http(s) client failure, url={}, params={}", + this.callerId, url, params, eHttp); + } + return new Tuple2<>(false, eHttp.getMessage()); } - JsonObject resultData = pubKeyConf.get("resultData").getAsJsonObject(); - if (resultData != null) { - String publicKey = resultData.get("publicKey").getAsString(); - if (StringUtils.isBlank(publicKey)) { - return null; + // post request and get response + HttpPost httpPost = null; + try { + httpPost = new HttpPost(url); + this.addAuthorizationInfo(httpPost); + UrlEncodedFormEntity urlEncodedFormEntity = + new UrlEncodedFormEntity(params, StandardCharsets.UTF_8); + httpPost.setEntity(urlEncodedFormEntity); + HttpResponse response = httpClient.execute(httpPost); + if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) { + return new Tuple2<>(false, response.getStatusLine().getStatusCode() + + ":" + response.getStatusLine().getStatusCode()); } - String username = resultData.get("username").getAsString(); - if (StringUtils.isBlank(username)) { - return null; + String returnStr = EntityUtils.toString(response.getEntity()); + if (StringUtils.isBlank(returnStr)) { + return new Tuple2<>(false, "query result is blank!"); } - String versionStr = resultData.get("version").getAsString(); - if (StringUtils.isBlank(versionStr)) { - return null; + return new Tuple2<>(true, returnStr); + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.warn("ConfigManager({}) connect manager({}) exception, params={}", + this.callerId, url, params, ex); + } + return new Tuple2<>(false, ex.getMessage()); + } finally { + if (httpPost != null) { + httpPost.releaseConnection(); + } + if (httpClient != null) { + httpClient.getConnectionManager().shutdown(); } - return new EncryptConfigEntry(username, versionStr, publicKey); } - return null; } - public ProxyConfigEntry getLocalProxyListFromFile(String filePath) throws Exception { - DataProxyNodeResponse proxyCluster; - try { - byte[] fileBytes = Files.readAllBytes(Paths.get(filePath)); - proxyCluster = gson.fromJson(new String(fileBytes), DataProxyNodeResponse.class); - } catch (Throwable e) { - throw new Exception("Read local proxyList File failure by " + filePath + ", reason is " + e.getCause()); - } - if (ObjectUtils.isEmpty(proxyCluster)) { - logger.warn("no proxyCluster configure from local file"); - return null; + private CloseableHttpClient getCloseableHttpClient(List<BasicNameValuePair> params) + throws NoSuchAlgorithmException, KeyManagementException { + CloseableHttpClient httpClient; + ArrayList<Header> headers = new ArrayList<>(); + for (BasicNameValuePair paramItem : params) { + headers.add(new BasicHeader(paramItem.getName(), paramItem.getValue())); } + RequestConfig requestConfig = RequestConfig.custom() + .setConnectTimeout(clientConfig.getManagerConnTimeoutMs()) + .setSocketTimeout(clientConfig.getManagerSocketTimeoutMs()).build(); + SSLContext sslContext = SSLContexts.custom().build(); + SSLConnectionSocketFactory sslSf = new SSLConnectionSocketFactory(sslContext, + new String[]{clientConfig.getTlsVersion()}, null, + SSLConnectionSocketFactory.getDefaultHostnameVerifier()); + httpClient = HttpClients.custom().setDefaultHeaders(headers).setDefaultRequestConfig(requestConfig) + .setSSLSocketFactory(sslSf).build(); + return httpClient; + } - return getProxyConfigEntry(proxyCluster); + private void storeAndBuildMetaConfigure(ProxyClientConfig config) { + this.clientConfig = config; + StringBuilder strBuff = new StringBuilder(512); + this.proxyConfigVisitUrl = strBuff + .append(clientConfig.isVisitManagerByHttp() ? ConfigConstants.HTTP : ConfigConstants.HTTPS) + .append(clientConfig.getManagerIP()).append(":").append(clientConfig.getManagerPort()) + .append(ConfigConstants.MANAGER_DATAPROXY_API).append(clientConfig.getInlongGroupId()) + .toString(); + strBuff.delete(0, strBuff.length()); + this.localProxyConfigStoreFile = strBuff + .append(clientConfig.getConfigStoreBasePath()) + .append(ConfigConstants.META_STORE_SUB_DIR) + .append(clientConfig.getInlongGroupId()) + .append(ConfigConstants.LOCAL_DP_CONFIG_FILE_SUFFIX) + .toString(); + strBuff.delete(0, strBuff.length()); + this.proxyConfigCacheFile = strBuff + .append(clientConfig.getConfigStoreBasePath()) + .append(ConfigConstants.META_STORE_SUB_DIR) + .append(clientConfig.getInlongGroupId()) + .append(ConfigConstants.REMOTE_DP_CACHE_FILE_SUFFIX) + .toString(); + strBuff.delete(0, strBuff.length()); + this.encryptConfigVisitUrl = clientConfig.getRsaPubKeyUrl(); + this.encryptConfigCacheFile = strBuff + .append(clientConfig.getConfigStoreBasePath()) + .append(ConfigConstants.META_STORE_SUB_DIR) + .append(clientConfig.getAuthSecretId()) + .append(ConfigConstants.REMOTE_ENCRYPT_CACHE_FILE_SUFFIX) + .toString(); + strBuff.delete(0, strBuff.length()); } - private Map<String, Integer> getStreamIdMap(JsonObject localProxyAddrJson) { - Map<String, Integer> streamIdMap = new HashMap<String, Integer>(); - if (localProxyAddrJson.has("tsn")) { - JsonArray jsonStreamId = localProxyAddrJson.getAsJsonArray("tsn"); - for (int i = 0; i < jsonStreamId.size(); i++) { - JsonObject jsonItem = jsonStreamId.get(i).getAsJsonObject(); - if (jsonItem != null && jsonItem.has("streamId") && jsonItem.has("sn")) { - streamIdMap.put(jsonItem.get("streamId").getAsString(), jsonItem.get("sn").getAsInt()); - } - } - } - return streamIdMap; + private void addAuthorizationInfo(HttpPost httpPost) { + httpPost.addHeader(BasicAuth.BASIC_AUTH_HEADER, + BasicAuth.genBasicAuthCredential(clientConfig.getAuthSecretId(), + clientConfig.getAuthSecretKey())); } - public ProxyConfigEntry requestProxyList(String url) { - ArrayList<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>(); + private List<BasicNameValuePair> buildProxyNodeQueryParams() { + ArrayList<BasicNameValuePair> params = new ArrayList<>(); params.add(new BasicNameValuePair("ip", IpUtils.getLocalIp())); params.add(new BasicNameValuePair("protocolType", clientConfig.getProtocolType())); - logger.info("Begin to get configure from manager {}, param is {}", url, params); - - String resultStr = requestConfiguration(url, params); - ProxyClusterConfig clusterConfig = gson.fromJson(resultStr, ProxyClusterConfig.class); - if (clusterConfig == null || !clusterConfig.isSuccess() || clusterConfig.getData() == null) { - return null; - } + return params; + } - DataProxyNodeResponse proxyCluster = clusterConfig.getData(); - return getProxyConfigEntry(proxyCluster); + private List<BasicNameValuePair> buildPubKeyQueryParams() { + List<BasicNameValuePair> params = new ArrayList<>(); + params.add(new BasicNameValuePair("operation", "query")); + params.add(new BasicNameValuePair("username", clientConfig.getAuthSecretId())); + return params; } - private ProxyConfigEntry getProxyConfigEntry(DataProxyNodeResponse proxyCluster) { + private Tuple2<ProxyConfigEntry, String> getProxyConfigEntry(String strRet) { + DataProxyNodeResponse proxyCluster; + try { + proxyCluster = gson.fromJson(strRet, DataProxyNodeResponse.class); + } catch (Throwable ex) { + if (parseCounter.shouldPrint()) { + logger.warn("ConfigManager({}) parse exception, groupId={}, config={}", + this.callerId, clientConfig.getInlongGroupId(), strRet, ex); + } + return new Tuple2<>(null, "parse failure:" + ex.getMessage()); + } + // parse nodeList List<DataProxyNodeInfo> nodeList = proxyCluster.getNodeList(); if (CollectionUtils.isEmpty(nodeList)) { - logger.error("dataproxy nodeList is empty in DataProxyNodeResponse!"); - return null; + return new Tuple2<>(null, "nodeList is empty!"); } - Map<String, HostInfo> hostMap = formatHostInfoMap(nodeList); - if (MapUtils.isEmpty(hostMap)) { - return null; + HostInfo tmpHostInfo; + Map<String, HostInfo> hostMap = new HashMap<>(); + for (DataProxyNodeInfo proxy : nodeList) { + if (ObjectUtils.isEmpty(proxy.getId()) + || StringUtils.isEmpty(proxy.getIp()) + || ObjectUtils.isEmpty(proxy.getPort()) + || proxy.getPort() < 0) { + if (exptCounter.shouldPrint()) { + logger.warn("Invalid proxy node: groupId={}, id={}, ip={}, port={}", + clientConfig.getInlongGroupId(), proxy.getId(), proxy.getIp(), proxy.getPort()); + } + continue; + } + tmpHostInfo = new HostInfo(proxy.getIp(), proxy.getPort()); + hostMap.put(tmpHostInfo.getReferenceName(), tmpHostInfo); } - + if (hostMap.isEmpty()) { + return new Tuple2<>(null, "no valid nodeList records!"); + } + // parse clusterId int clusterId = -1; if (ObjectUtils.isNotEmpty(proxyCluster.getClusterId())) { clusterId = proxyCluster.getClusterId(); } + // parse load int load = ConfigConstants.LOAD_THRESHOLD; if (ObjectUtils.isNotEmpty(proxyCluster.getLoad())) { load = proxyCluster.getLoad() > 200 ? 200 : (Math.max(proxyCluster.getLoad(), 0)); } + // parse isIntranet boolean isIntranet = true; - if (ObjectUtils.isNotEmpty(proxyCluster.getIsSwitch())) { - isIntranet = proxyCluster.getIsIntranet() == 1 ? true : false; + if (ObjectUtils.isNotEmpty(proxyCluster.getIsIntranet())) { + isIntranet = proxyCluster.getIsIntranet() == 1; } + // parse isSwitch int isSwitch = 0; if (ObjectUtils.isNotEmpty(proxyCluster.getIsSwitch())) { isSwitch = proxyCluster.getIsSwitch(); } + // build ProxyConfigEntry ProxyConfigEntry proxyEntry = new ProxyConfigEntry(); proxyEntry.setClusterId(clusterId); proxyEntry.setGroupId(clientConfig.getInlongGroupId()); @@ -635,114 +864,8 @@ public class ProxyConfigManager extends Thread { proxyEntry.setHostMap(hostMap); proxyEntry.setSwitchStat(isSwitch); proxyEntry.setLoad(load); - proxyEntry.setSize(nodeList.size()); proxyEntry.setMaxPacketLength( proxyCluster.getMaxPacketLength() != null ? proxyCluster.getMaxPacketLength() : -1); - return proxyEntry; - } - - private Map<String, HostInfo> formatHostInfoMap(List<DataProxyNodeInfo> nodeList) { - HostInfo tmpHostInfo; - Map<String, HostInfo> hostMap = new HashMap<>(); - for (DataProxyNodeInfo proxy : nodeList) { - if (ObjectUtils.isEmpty(proxy.getId()) || StringUtils.isEmpty(proxy.getIp()) || ObjectUtils - .isEmpty(proxy.getPort()) || proxy.getPort() < 0) { - logger.error("invalid proxy node, id:{}, ip:{}, port:{}", proxy.getId(), proxy.getIp(), - proxy.getPort()); - continue; - } - tmpHostInfo = new HostInfo(proxy.getIp(), proxy.getPort()); - hostMap.put(tmpHostInfo.getReferenceName(), tmpHostInfo); - - } - if (hostMap.isEmpty()) { - logger.error("Parse proxyList failure: address is empty for response from manager!"); - return null; - } - return hostMap; - } - - /* Request new configurations from Manager. */ - private String requestConfiguration(String url, List<BasicNameValuePair> params) { - if (StringUtils.isBlank(url)) { - logger.error("request url is null"); - return null; - } - HttpPost httpPost = null; - HttpParams myParams = new BasicHttpParams(); - HttpConnectionParams.setConnectionTimeout(myParams, 10000); - HttpConnectionParams.setSoTimeout(myParams, clientConfig.getManagerSocketTimeout()); - CloseableHttpClient httpClient; - if (this.clientConfig.isRequestByHttp()) { - httpClient = new DefaultHttpClient(myParams); - } else { - try { - httpClient = getCloseableHttpClient(params); - } catch (Throwable eHttps) { - logger.error("Create Https cliet failure, error 1 is ", eHttps); - eHttps.printStackTrace(); - return null; - } - } - logger.info("Request url : {}, params : {}", url, params); - try { - httpPost = new HttpPost(url); - httpPost.addHeader(BasicAuth.BASIC_AUTH_HEADER, - BasicAuth.genBasicAuthCredential(clientConfig.getAuthSecretId(), - clientConfig.getAuthSecretKey())); - UrlEncodedFormEntity urlEncodedFormEntity = new UrlEncodedFormEntity(params, "UTF-8"); - httpPost.setEntity(urlEncodedFormEntity); - HttpResponse response = httpClient.execute(httpPost); - String returnStr = EntityUtils.toString(response.getEntity()); - if (StringUtils.isNotBlank(returnStr) - && response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { - logger.info("Get configure from manager is {}", returnStr); - return returnStr; - } - return null; - } catch (Throwable e) { - logger.error("Connect Manager error, message: {}, url is {}", e.getMessage(), url); - return null; - } finally { - if (httpPost != null) { - httpPost.releaseConnection(); - } - if (httpClient != null) { - httpClient.getConnectionManager().shutdown(); - } - } - } - - private StringEntity getEntity(List<BasicNameValuePair> params) throws UnsupportedEncodingException { - JsonObject jsonObject = new JsonObject(); - for (BasicNameValuePair pair : params) { - jsonObject.addProperty(pair.getName(), pair.getValue()); - } - StringEntity se = new StringEntity(jsonObject.toString()); - se.setContentType(APPLICATION_JSON); - return se; - } - - private CloseableHttpClient getCloseableHttpClient(List<BasicNameValuePair> params) - throws NoSuchAlgorithmException, KeyManagementException { - CloseableHttpClient httpClient; - ArrayList<Header> headers = new ArrayList<Header>(); - for (BasicNameValuePair paramItem : params) { - headers.add(new BasicHeader(paramItem.getName(), paramItem.getValue())); - } - RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(10000) - .setSocketTimeout(clientConfig.getManagerSocketTimeout()).build(); - SSLContext sslContext = SSLContexts.custom().build(); - SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, - new String[]{clientConfig.getTlsVersion()}, null, - SSLConnectionSocketFactory.getDefaultHostnameVerifier()); - httpClient = HttpClients.custom().setDefaultHeaders(headers).setDefaultRequestConfig(requestConfig) - .setSSLSocketFactory(sslsf).build(); - return httpClient; - } - - public void updateHashRing(List<HostInfo> newHosts) { - this.hashRing.updateNode(newHosts); - logger.debug("update hash ring {}", hashRing.getVirtualNode2RealNode()); + return new Tuple2<>(proxyEntry, "ok"); } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java index 3999390f9b..2ba1938409 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java @@ -54,9 +54,8 @@ public class HttpClientExample { proxyConfig = new ProxyClientConfig(localIP, requestByHttp, inLongManagerAddr, Integer.valueOf(inLongManagerPort), inlongGroupId, "admin", "inlong");// user and password of manager - proxyConfig.setInlongGroupId(inlongGroupId); - proxyConfig.setConfStoreBasePath(configBasePath); - proxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal); + proxyConfig.setConfigStoreBasePath(configBasePath); + proxyConfig.setOnlyUseLocalProxyConfig(isReadProxyIPFromLocal); proxyConfig.setDiscardOldMessage(true); proxyConfig.setProtocolType(ProtocolType.HTTP); sender = new HttpProxySender(proxyConfig); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java index eda90bdbca..85012af172 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java @@ -70,9 +70,9 @@ public class TcpClientExample { dataProxyConfig = new ProxyClientConfig(localIP, requestByHttp, inLongManagerAddr, Integer.valueOf(inLongManagerPort), inlongGroupId, "admin", "inlong"); if (StringUtils.isNotEmpty(configBasePath)) { - dataProxyConfig.setConfStoreBasePath(configBasePath); + dataProxyConfig.setConfigStoreBasePath(configBasePath); } - dataProxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal); + dataProxyConfig.setOnlyUseLocalProxyConfig(isReadProxyIPFromLocal); dataProxyConfig.setProtocolType(ProtocolType.TCP); dataProxyConfig.setRequestTimeoutMs(20000L); messageSender = DefaultMessageSender.generateSenderByClusterId(dataProxyConfig); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java index 45c95e042d..55a26918a0 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java @@ -27,6 +27,7 @@ import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry; import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager; import org.apache.inlong.sdk.dataproxy.utils.ConsistencyHashUtil; import org.apache.inlong.sdk.dataproxy.utils.EventLoopUtil; +import org.apache.inlong.sdk.dataproxy.utils.Tuple2; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -84,7 +85,7 @@ public class ClientMgr { private int aliveConnections; private int realSize; private SendHBThread sendHBThread; - private ProxyConfigManager ipManager; + private ProxyConfigManager configManager; private int groupIdNum = 0; private String groupId = ""; private Map<String, Integer> streamIdMap = new HashMap<String, Integer>(); @@ -115,10 +116,9 @@ public class ClientMgr { bootstrap.option(ChannelOption.SO_SNDBUF, ConfigConstants.DEFAULT_SEND_BUFFER_SIZE); bootstrap.handler(new ClientPipelineFactory(this, sender)); /* ready to Start the thread which refreshes the proxy list. */ - ipManager = new ProxyConfigManager(configure, this); - ipManager.setName("proxyConfigManager"); + configManager = new ProxyConfigManager(sender.getInstanceId(), configure, this); + configManager.setName("proxyConfigManager"); if (configure.getInlongGroupId() != null) { - ipManager.setInlongGroupId(configure.getInlongGroupId()); groupId = configure.getInlongGroupId(); } @@ -131,13 +131,13 @@ public class ClientMgr { this.loadBalance = configure.getLoadBalance(); try { - ipManager.doProxyEntryQueryWork(); + configManager.doProxyEntryQueryWork(); } catch (IOException e) { e.printStackTrace(); logger.info(e.getMessage()); } - ipManager.setDaemon(true); - ipManager.start(); + configManager.setDaemon(true); + configManager.start(); this.sendHBThread = new SendHBThread(); this.sendHBThread.setName("SendHBThread"); @@ -181,7 +181,13 @@ public class ClientMgr { } public EncryptConfigEntry getEncryptConfigEntry() { - return this.ipManager.getEncryptConfigEntry(configure.getUserName()); + Tuple2<EncryptConfigEntry, String> result; + try { + result = configManager.getEncryptConfigure(false); + return result.getF0(); + } catch (Throwable ex) { + return null; + } } public List<HostInfo> getProxyInfoList() { @@ -273,7 +279,12 @@ public class ClientMgr { } public ProxyConfigEntry getGroupIdConfigureInfo() throws Exception { - return ipManager.getGroupIdConfigure(); + Tuple2<ProxyConfigEntry, String> result = + configManager.getGroupIdConfigure(true); + if (result.getF0() == null) { + throw new Exception(result.getF1()); + } + return result.getF0(); } /** @@ -531,7 +542,7 @@ public class ClientMgr { public void shutDown() { bootstrap.config().group().shutdownGracefully(); - ipManager.shutDown(); + configManager.shutDown(); // connectionCheckThread.shutDown(); sendHBThread.shutDown(); @@ -851,9 +862,9 @@ public class ClientMgr { Collections.singletonList(hbMsg.getBytes(StandardCharsets.UTF_8)), 8, false, false, false, System.currentTimeMillis() / 1000, 1, "", "", ""); try { - if (configure.isNeedAuthentication()) { - encodeObject.setAuth(configure.isNeedAuthentication(), - configure.getUserName(), configure.getSecretKey()); + if (configure.isEnableAuthentication()) { + encodeObject.setAuth(configure.isEnableAuthentication(), + configure.getAuthSecretId(), configure.getAuthSecretKey()); } client.write(encodeObject); } catch (Throwable e) { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java index 5d31f05f8a..1cf9939ff9 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java @@ -26,6 +26,7 @@ import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry; import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager; import org.apache.inlong.sdk.dataproxy.http.InternalHttpSender; import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet; +import org.apache.inlong.sdk.dataproxy.utils.Tuple2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,15 +76,14 @@ public class HttpProxySender extends Thread { private void initTDMClientAndRequest(ProxyClientConfig configure) throws Exception { try { - proxyConfigManager = new ProxyConfigManager(configure, null); - proxyConfigManager.setInlongGroupId(configure.getInlongGroupId()); + proxyConfigManager = new ProxyConfigManager(configure); ProxyConfigEntry proxyConfigEntry = retryGettingProxyConfig(); hostList.addAll(proxyConfigEntry.getHostMap().values()); this.setDaemon(true); this.start(); } catch (Throwable e) { - if (configure.isReadProxyIPFromLocal()) { + if (configure.isOnlyUseLocalProxyConfig()) { throw new Exception("Get local proxy configure failure! e = {}", e); } else { throw new Exception("Visit TDManager error! e = {}", e); @@ -98,7 +98,9 @@ public class HttpProxySender extends Thread { * @return proxy config entry. */ private ProxyConfigEntry retryGettingProxyConfig() throws Exception { - return proxyConfigManager.getGroupIdConfigure(); + Tuple2<ProxyConfigEntry, String> result = + proxyConfigManager.getGroupIdConfigure(true); + return result.getF0(); } /** @@ -112,9 +114,13 @@ public class HttpProxySender extends Thread { int randSleepTime = proxyClientConfig.getProxyHttpUpdateIntervalMinutes() * 60 + rand; TimeUnit.MILLISECONDS.sleep(randSleepTime * 1000); if (proxyConfigManager != null) { - ProxyConfigEntry proxyConfigEntry = proxyConfigManager.getGroupIdConfigure(); - hostList.addAll(proxyConfigEntry.getHostMap().values()); - hostList.retainAll(proxyConfigEntry.getHostMap().values()); + Tuple2<ProxyConfigEntry, String> result = + proxyConfigManager.getGroupIdConfigure(false); + if (result.getF0() == null) { + throw new Exception(result.getF1()); + } + hostList.addAll(result.getF0().getHostMap().values()); + hostList.retainAll(result.getF0().getHostMap().values()); } else { logger.error("manager is null, please check it!"); } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java index 9f001205e3..e4f820e3ed 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java @@ -43,12 +43,13 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; public class Sender { private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class); - + private static final AtomicLong senderIdGen = new AtomicLong(0L); /* Store the callback used by asynchronously message sending. */ private final ConcurrentHashMap<Channel, ConcurrentHashMap<String, QueueObject>> callbacks = new ConcurrentHashMap<>(); @@ -61,6 +62,7 @@ public class Sender { private final AtomicInteger currentBufferSize = new AtomicInteger(0); private final TimeoutScanThread scanThread; private final ClientMgr clientMgr; + private final String instanceId; private final ProxyClientConfig configure; private MetricWorkerThread metricWorker = null; private int clusterId = -1; @@ -74,6 +76,7 @@ public class Sender { */ public Sender(ProxyClientConfig configure, ThreadFactory selfDefineFactory) throws Exception { this.configure = configure; + this.instanceId = "sender-" + senderIdGen.incrementAndGet(); this.asyncCallbackMaxSize = configure.getTotalAsyncCallbackSize(); this.threadPool = Executors.newCachedThreadPool(); this.clientMgr = new ClientMgr(configure, this, selfDefineFactory); @@ -82,14 +85,14 @@ public class Sender { proxyConfigEntry = this.clientMgr.getGroupIdConfigureInfo(); setClusterId(proxyConfigEntry.getClusterId()); } catch (Throwable e) { - if (configure.isReadProxyIPFromLocal()) { + if (configure.isOnlyUseLocalProxyConfig()) { throw new Exception("Get local proxy configure failure!", e.getCause()); } else { throw new Exception("Visit manager error!", e.getCause()); } } if (!proxyConfigEntry.isInterVisit()) { - if (!configure.isNeedAuthentication()) { + if (!configure.isEnableAuthentication()) { throw new Exception("In OutNetwork isNeedAuthentication must be true!"); } if (!configure.isNeedDataEncry()) { @@ -200,7 +203,8 @@ public class Sender { } } if (this.configure.isNeedDataEncry()) { - encodeObject.setEncryptEntry(true, configure.getUserName(), clientMgr.getEncryptConfigEntry()); + encodeObject.setEncryptEntry(true, + configure.getAuthSecretId(), clientMgr.getEncryptConfigEntry()); } else { encodeObject.setEncryptEntry(false, null, null); } @@ -371,7 +375,8 @@ public class Sender { } } if (this.configure.isNeedDataEncry()) { - encodeObject.setEncryptEntry(true, configure.getUserName(), clientMgr.getEncryptConfigEntry()); + encodeObject.setEncryptEntry(true, + configure.getAuthSecretId(), clientMgr.getEncryptConfigEntry()); } else { encodeObject.setEncryptEntry(false, null, null); } @@ -498,6 +503,10 @@ public class Sender { this.clusterId = clusterId; } + public String getInstanceId() { + return instanceId; + } + /** * check whether clientChannel is idle; if idle, need send hb to keep alive * diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java index 371dcf661c..b7bd42ab2b 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java @@ -150,11 +150,11 @@ public class ProxyUtils { * @param clientConfig */ public static void validClientConfig(ProxyClientConfig clientConfig) { - if (clientConfig.isNeedAuthentication()) { - if (StringUtils.isBlank(clientConfig.getUserName())) { - throw new IllegalArgumentException("Authentication require userName not Blank!"); + if (clientConfig.isEnableAuthentication()) { + if (StringUtils.isBlank(clientConfig.getAuthSecretId())) { + throw new IllegalArgumentException("Authentication require secretId not Blank!"); } - if (StringUtils.isBlank(clientConfig.getSecretKey())) { + if (StringUtils.isBlank(clientConfig.getAuthSecretKey())) { throw new IllegalArgumentException("Authentication require secretKey not Blank!"); } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/Tuple2.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/Tuple2.java new file mode 100644 index 0000000000..e5ba8c2bc2 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/Tuple2.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.utils; + +public class Tuple2<T0, T1> { + + /** Field 0 of the tuple. */ + private T0 f0 = null; + /** Field 1 of the tuple. */ + private T1 f1 = null; + + /** + * Creates a new tuple where all fields are null. + */ + public Tuple2() { + + } + + /** + * Creates a new tuple with field 0 specified. + * + * @param value0 The value for field 0 + */ + public Tuple2(T0 value0) { + this.f0 = value0; + } + + /** + * Creates a new tuple and assigns the given values to the tuple's fields. + * + * @param value0 The value for field 0 + * @param value1 The value for field 1 + */ + public Tuple2(T0 value0, T1 value1) { + setF0AndF1(value0, value1); + } + + public T0 getF0() { + return f0; + } + + public T1 getF1() { + return f1; + } + + /** + * Set all field values + * + * @param value0 The value for field 0 + * @param value1 The value for field 1 + */ + public void setF0AndF1(T0 value0, T1 value1) { + this.f0 = value0; + this.f1 = value1; + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java index 5f3ba92c75..40fec3b57d 100644 --- a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java +++ b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java @@ -20,6 +20,7 @@ package org.apache.inlong.sdk.dataproxy; import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry; import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager; import org.apache.inlong.sdk.dataproxy.network.ClientMgr; +import org.apache.inlong.sdk.dataproxy.utils.Tuple2; import org.junit.Assert; import org.junit.Test; @@ -36,14 +37,18 @@ public class ProxyConfigManagerTest { .toString(); private final ProxyClientConfig clientConfig = PowerMockito.mock(ProxyClientConfig.class); private final ClientMgr clientMgr = PowerMockito.mock(ClientMgr.class); - private final ProxyConfigManager proxyConfigManager = new ProxyConfigManager(clientConfig, clientMgr); + private final ProxyConfigManager proxyConfigManager; public ProxyConfigManagerTest() throws URISyntaxException { + clientConfig.setConfigStoreBasePath(localFile); + proxyConfigManager = + new ProxyConfigManager("test", clientConfig, clientMgr); } @Test public void testProxyConfigParse() throws Exception { - ProxyConfigEntry proxyEntry = proxyConfigManager.getLocalProxyListFromFile(localFile); + Tuple2<ProxyConfigEntry, String> result = proxyConfigManager.getLocalProxyListFromFile(localFile); + ProxyConfigEntry proxyEntry = result.getF0(); Assert.assertEquals(proxyEntry.isInterVisit(), false); Assert.assertEquals(proxyEntry.getLoad(), 12); Assert.assertEquals(proxyEntry.getClusterId(), 1); diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java index 74cfcffa21..1965ef37e3 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java @@ -62,7 +62,7 @@ public class InlongSdkDirtySender { ProxyClientConfig proxyClientConfig = new ProxyClientConfig(InetAddress.getLocalHost().getHostAddress(), true, inlongManagerAddr, inlongManagerPort, inlongGroupId, authId, authKey); - proxyClientConfig.setReadProxyIPFromLocal(false); + proxyClientConfig.setOnlyUseLocalProxyConfig(false); proxyClientConfig.setAsyncCallbackSize(maxCallbackSize); this.sender = DefaultMessageSender.generateSenderByClusterId(proxyClientConfig); this.sender.setMsgtype(7);