This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

The following commit(s) were added to refs/heads/master by this push:
     new 36a9017c1f [INLONG-11589][SDK] Optimize the implementation of proxy 
configuration management (#11590)
36a9017c1f is described below

commit 36a9017c1f92e8a907b7654f2e997ce7287e259a
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Mon Dec 9 11:36:06 2024 +0800

    [INLONG-11589][SDK] Optimize the implementation of proxy configuration 
management (#11590)
    
    
    
    Co-authored-by: gosonzhang <gosonzh...@tencent.com>
---
 .../inlong/sdk/dataproxy/ConfigConstants.java      |   48 +-
 .../inlong/sdk/dataproxy/DefaultMessageSender.java |   16 +-
 .../inlong/sdk/dataproxy/ProxyClientConfig.java    |  328 +++---
 .../sdk/dataproxy/config/ProxyConfigEntry.java     |   33 +-
 .../sdk/dataproxy/config/ProxyConfigManager.java   | 1111 +++++++++++---------
 .../sdk/dataproxy/example/HttpClientExample.java   |    5 +-
 .../sdk/dataproxy/example/TcpClientExample.java    |    4 +-
 .../inlong/sdk/dataproxy/network/ClientMgr.java    |   37 +-
 .../sdk/dataproxy/network/HttpProxySender.java     |   20 +-
 .../inlong/sdk/dataproxy/network/Sender.java       |   19 +-
 .../inlong/sdk/dataproxy/utils/ProxyUtils.java     |    8 +-
 .../apache/inlong/sdk/dataproxy/utils/Tuple2.java  |   71 ++
 .../sdk/dataproxy/ProxyConfigManagerTest.java      |    9 +-
 .../inlong/sdk/dirtydata/InlongSdkDirtySender.java |    2 +-
 14 files changed, 995 insertions(+), 716 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
index 26f8d131b4..81b9f0dad0 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
@@ -21,6 +21,42 @@ public class ConfigConstants {
 
     public static final String PROXY_SDK_VERSION = "1.2.11";
 
+    public static String HTTP = "http://";;
+    public static String HTTPS = "https://";;
+
+    // dataproxy node config
+    public static final String MANAGER_DATAPROXY_API = 
"/inlong/manager/openapi/dataproxy/getIpList/";
+    public static final String META_STORE_SUB_DIR = "/.inlong/";
+    public static final String LOCAL_DP_CONFIG_FILE_SUFFIX = ".local";
+    public static final String REMOTE_DP_CACHE_FILE_SUFFIX = ".proxyip";
+    public static final String REMOTE_ENCRYPT_CACHE_FILE_SUFFIX = ".pubKey";
+    // authorization key
+    public static final String BASIC_AUTH_HEADER = "authorization";
+
+    // config info sync interval in minutes
+    public static final int VAL_DEF_CONFIG_SYNC_INTERVAL_MIN = 3;
+    public static final int VAL_MIN_CONFIG_SYNC_INTERVAL_MIN = 1;
+    public static final int VAL_MAX_CONFIG_SYNC_INTERVAL_MIN = 30;
+    public static final long VAL_UNIT_MIN_TO_MS = 60 * 1000L;
+    // config info sync max retry if failure
+    public static final int VAL_DEF_RETRY_IF_CONFIG_SYNC_FAIL = 3;
+    public static final int VAL_MAX_RETRY_IF_CONFIG_SYNC_FAIL = 5;
+    // cache config expired time in ms
+    public static final long VAL_DEF_CACHE_CONFIG_EXPIRED_MS = 20 * 60 * 1000L;
+    // node force choose interval in ms
+    public static final long VAL_DEF_FORCE_CHOOSE_INR_MS = 10 * 60 * 1000L;
+    public static final long VAL_MIN_FORCE_CHOOSE_INR_MS = 30 * 1000L;
+
+    // connection timeout in milliseconds
+    public static final int VAL_DEF_CONNECT_TIMEOUT_MS = 10000;
+    public static final int VAL_MIN_CONNECT_TIMEOUT_MS = 2000;
+    public static final int VAL_MAX_CONNECT_TIMEOUT_MS = 60000;
+    public static final int VAL_DEF_CONNECT_CLOSE_DELAY_MS = 500;
+    // socket timeout in milliseconds
+    public static final int VAL_DEF_SOCKET_TIMEOUT_MS = 20000;
+    public static final int VAL_MIN_SOCKET_TIMEOUT_MS = 2000;
+    public static final int VAL_MAX_SOCKET_TIMEOUT_MS = 60000;
+
     public static final int ALIVE_CONNECTIONS = 3;
     public static final int MAX_TIMEOUT_CNT = 3;
     public static final int LOAD_THRESHOLD = 0;
@@ -43,17 +79,11 @@ public class ConfigConstants {
     /* one hour interval */
     public static final int PROXY_HTTP_UPDATE_INTERVAL_MINUTES = 60;
 
-    public static final int PROXY_UPDATE_MAX_RETRY = 10;
-
     public static final int MAX_LINE_CNT = 30;
 
-    // connection timeout in milliseconds
-    public static final long VAL_DEF_CONNECT_TIMEOUT_MS = 20000L;
-    public static final long VAL_MIN_CONNECT_TIMEOUT_MS = 1L;
-    public static final long VAL_DEF_CONNECT_CLOSE_DELAY_MS = 500L;
     // request timeout in milliseconds
     public static final long VAL_DEF_REQUEST_TIMEOUT_MS = 10000L;
-    public static final long VAL_MIN_REQUEST_TIMEOUT_MS = 1L;
+    public static final long VAL_MIN_REQUEST_TIMEOUT_MS = 500L;
 
     public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216;
     public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216;
@@ -65,14 +95,10 @@ public class ConfigConstants {
     public static final int FLAG_ALLOW_ENCRYPT = 1 << 6;
     public static final int FLAG_ALLOW_COMPRESS = 1 << 5;
 
-    public static final String MANAGER_DATAPROXY_API = 
"/inlong/manager/openapi/dataproxy/getIpList/";
     public static LoadBalance DEFAULT_LOAD_BALANCE = LoadBalance.ROBIN;
     public static int DEFAULT_VIRTUAL_NODE = 1000;
     public static int DEFAULT_RANDOM_MAX_RETRY = 1000;
 
-    public static String HTTP = "http://";;
-    public static String HTTPS = "https://";;
-
     public static int DEFAULT_SENDER_MAX_ATTEMPT = 1;
 
     /* Reserved attribute data size(bytes). */
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index dd22e63cce..b99f3726db 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -30,6 +30,7 @@ import org.apache.inlong.sdk.dataproxy.network.Sender;
 import org.apache.inlong.sdk.dataproxy.network.SequentialID;
 import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
 import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -107,17 +108,20 @@ public class DefaultMessageSender implements 
MessageSender {
         }
         LOGGER.info("Initial tcp sender, configure is {}", configure);
         // initial sender object
-        ProxyConfigManager proxyConfigManager = new 
ProxyConfigManager(configure, null);
-        proxyConfigManager.setInlongGroupId(configure.getInlongGroupId());
-        ProxyConfigEntry entry = proxyConfigManager.getGroupIdConfigure();
-        DefaultMessageSender sender = CACHE_SENDER.get(entry.getClusterId());
+        ProxyConfigManager proxyConfigManager = new 
ProxyConfigManager(configure);
+        Tuple2<ProxyConfigEntry, String> result =
+                proxyConfigManager.getGroupIdConfigure(true);
+        if (result.getF0() == null) {
+            throw new Exception(result.getF1());
+        }
+        DefaultMessageSender sender = 
CACHE_SENDER.get(result.getF0().getClusterId());
         if (sender != null) {
             return sender;
         } else {
             DefaultMessageSender tmpMessageSender =
                     new DefaultMessageSender(configure, selfDefineFactory);
-            tmpMessageSender.setMaxPacketLength(entry.getMaxPacketLength());
-            CACHE_SENDER.put(entry.getClusterId(), tmpMessageSender);
+            
tmpMessageSender.setMaxPacketLength(result.getF0().getMaxPacketLength());
+            CACHE_SENDER.put(result.getF0().getClusterId(), tmpMessageSender);
             return tmpMessageSender;
         }
     }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index ce8a3b3b39..412b1950c5 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -27,41 +27,40 @@ import org.apache.commons.lang3.StringUtils;
 @Data
 public class ProxyClientConfig {
 
+    private String managerIP = "";
+    private int managerPort = 8099;
+    private boolean visitManagerByHttp = true;
+    private boolean onlyUseLocalProxyConfig = false;
+    private int managerConnTimeoutMs = 
ConfigConstants.VAL_DEF_CONNECT_TIMEOUT_MS;
+    private int managerSocketTimeoutMs = 
ConfigConstants.VAL_DEF_SOCKET_TIMEOUT_MS;
+    private long managerConfigSyncInrMs =
+            ConfigConstants.VAL_DEF_CONFIG_SYNC_INTERVAL_MIN * 
ConfigConstants.VAL_UNIT_MIN_TO_MS;
+    private int configSyncMaxRetryIfFail = 
ConfigConstants.VAL_DEF_RETRY_IF_CONFIG_SYNC_FAIL;
+    private String configStoreBasePath = System.getProperty("user.dir");
+    // max expired time for config cache.
+    private long configCacheExpiredMs = 
ConfigConstants.VAL_DEF_CACHE_CONFIG_EXPIRED_MS;
+    // nodes force choose interval ms
+    private long forceReChooseInrMs = 
ConfigConstants.VAL_DEF_FORCE_CHOOSE_INR_MS;
+    private boolean enableAuthentication = false;
+    private String authSecretId = "";
+    private String authSecretKey = "";
+
     private int aliveConnections;
     private int syncThreadPoolSize;
     private int asyncCallbackSize;
-    private int managerPort = 8099;
-    private String managerIP = "";
-    private String managerAddress;
 
-    private String managerUrl = "";
-    private int proxyUpdateIntervalMinutes;
-    private int proxyUpdateMaxRetry;
     private String inlongGroupId;
-    private boolean requestByHttp = true;
     private boolean isNeedDataEncry = false;
-    private boolean needAuthentication = false;
-    private String userName = "";
-    private String secretKey = "";
     private String rsaPubKeyUrl = "";
-    private String confStoreBasePath = System.getProperty("user.dir") + 
"/.inlong/";
     private String tlsServerCertFilePathAndName;
     private String tlsServerKey;
     private String tlsVersion = "TLSv1.2";
     private int maxTimeoutCnt = ConfigConstants.MAX_TIMEOUT_CNT;
-    private String authSecretId;
-    private String authSecretKey;
     private String protocolType;
 
     // metric configure
     private MetricConfig metricConfig = new MetricConfig();
 
-    private int managerConnectionTimeout = 10000;
-    // http socket timeout in milliseconds
-    private int managerSocketTimeout = 30 * 1000;
-
-    private boolean readProxyIPFromLocal = false;
-
     // connect timeout in milliseconds
     private long connectTimeoutMs = ConfigConstants.VAL_DEF_CONNECT_TIMEOUT_MS;
     // request timeout in milliseconds
@@ -79,8 +78,6 @@ public class ProxyClientConfig {
     // interval for async worker in microseconds.
     private int asyncWorkerInterval = 500;
     private boolean cleanHttpCacheWhenClosing = false;
-    // max cache time for proxy config.
-    private long maxProxyCacheTimeInMs = 30 * 60 * 1000;
 
     private int ioThreadNum = Runtime.getRuntime().availableProcessors();
     private boolean enableBusyWait = false;
@@ -93,32 +90,30 @@ public class ProxyClientConfig {
     private int senderMaxAttempt = ConfigConstants.DEFAULT_SENDER_MAX_ATTEMPT;
 
     /* pay attention to the last url parameter ip */
-    public ProxyClientConfig(String localHost, boolean requestByHttp, String 
managerIp,
+    public ProxyClientConfig(String localHost, boolean visitManagerByHttp, 
String managerIp,
             int managerPort, String inlongGroupId, String authSecretId, String 
authSecretKey,
             LoadBalance loadBalance, int virtualNode, int maxRetry) throws 
ProxysdkException {
         if (StringUtils.isBlank(localHost)) {
             throw new ProxysdkException("localHost is blank!");
         }
         if (StringUtils.isBlank(managerIp)) {
-            throw new IllegalArgumentException("managerIp is Blank!");
+            throw new ProxysdkException("managerIp is Blank!");
+        }
+        if (managerPort <= 0) {
+            throw new ProxysdkException("managerPort <= 0!");
         }
         if (StringUtils.isBlank(inlongGroupId)) {
             throw new ProxysdkException("groupId is blank!");
         }
-        this.inlongGroupId = inlongGroupId;
-        this.requestByHttp = requestByHttp;
+        this.inlongGroupId = inlongGroupId.trim();
+        this.visitManagerByHttp = visitManagerByHttp;
         this.managerPort = managerPort;
         this.managerIP = managerIp;
-        this.managerAddress = getManagerAddress(managerIp, managerPort, 
requestByHttp);
-        this.managerUrl =
-                getManagerUrl(managerAddress, inlongGroupId);
         IpUtils.validLocalIp(localHost);
         this.aliveConnections = ConfigConstants.ALIVE_CONNECTIONS;
         this.syncThreadPoolSize = ConfigConstants.SYNC_THREAD_POOL_SIZE;
         this.asyncCallbackSize = ConfigConstants.ASYNC_CALLBACK_SIZE;
-        this.proxyUpdateIntervalMinutes = 
ConfigConstants.PROXY_UPDATE_INTERVAL_MINUTES;
         this.proxyHttpUpdateIntervalMinutes = 
ConfigConstants.PROXY_HTTP_UPDATE_INTERVAL_MINUTES;
-        this.proxyUpdateMaxRetry = ConfigConstants.PROXY_UPDATE_MAX_RETRY;
         this.authSecretId = authSecretId;
         this.authSecretKey = authSecretKey;
         this.loadBalance = loadBalance;
@@ -129,25 +124,15 @@ public class ProxyClientConfig {
     /* pay attention to the last url parameter ip */
     public ProxyClientConfig(String managerAddress, String inlongGroupId, 
String authSecretId, String authSecretKey,
             LoadBalance loadBalance, int virtualNode, int maxRetry) throws 
ProxysdkException {
-        if (StringUtils.isBlank(managerAddress) || 
(!managerAddress.startsWith(ConfigConstants.HTTP)
-                && !managerAddress.startsWith(ConfigConstants.HTTPS))) {
-            throw new ProxysdkException("managerAddress is blank or missing 
http/https protocol ");
-        }
+        checkAndParseAddress(managerAddress);
         if (StringUtils.isBlank(inlongGroupId)) {
             throw new ProxysdkException("groupId is blank!");
         }
-        if (managerAddress.startsWith(ConfigConstants.HTTPS)) {
-            this.requestByHttp = false;
-        }
-        this.managerAddress = managerAddress;
-        this.managerUrl = getManagerUrl(managerAddress, inlongGroupId);
-        this.inlongGroupId = inlongGroupId;
+        this.inlongGroupId = inlongGroupId.trim();
         this.aliveConnections = ConfigConstants.ALIVE_CONNECTIONS;
         this.syncThreadPoolSize = ConfigConstants.SYNC_THREAD_POOL_SIZE;
         this.asyncCallbackSize = ConfigConstants.ASYNC_CALLBACK_SIZE;
-        this.proxyUpdateIntervalMinutes = 
ConfigConstants.PROXY_UPDATE_INTERVAL_MINUTES;
         this.proxyHttpUpdateIntervalMinutes = 
ConfigConstants.PROXY_HTTP_UPDATE_INTERVAL_MINUTES;
-        this.proxyUpdateMaxRetry = ConfigConstants.PROXY_UPDATE_MAX_RETRY;
         this.authSecretId = authSecretId;
         this.authSecretKey = authSecretKey;
         this.loadBalance = loadBalance;
@@ -155,21 +140,9 @@ public class ProxyClientConfig {
         this.maxRetry = maxRetry;
     }
 
-    private String getManagerUrl(String managerAddress, String inlongGroupId) {
-        return managerAddress + ConfigConstants.MANAGER_DATAPROXY_API + 
inlongGroupId;
-    }
-
-    private String getManagerAddress(String managerIp, int managerPort, 
boolean requestByHttp) {
-        String protocolType = ConfigConstants.HTTPS;
-        if (requestByHttp) {
-            protocolType = ConfigConstants.HTTP;
-        }
-        return protocolType + managerIp + ":" + managerPort;
-    }
-
-    public ProxyClientConfig(String localHost, boolean requestByHttp, String 
managerIp, int managerPort,
+    public ProxyClientConfig(String localHost, boolean visitManagerByHttp, 
String managerIp, int managerPort,
             String inlongGroupId, String authSecretId, String authSecretKey) 
throws ProxysdkException {
-        this(localHost, requestByHttp, managerIp, managerPort, inlongGroupId, 
authSecretId, authSecretKey,
+        this(localHost, visitManagerByHttp, managerIp, managerPort, 
inlongGroupId, authSecretId, authSecretKey,
                 ConfigConstants.DEFAULT_LOAD_BALANCE, 
ConfigConstants.DEFAULT_VIRTUAL_NODE,
                 ConfigConstants.DEFAULT_RANDOM_MAX_RETRY);
     }
@@ -181,40 +154,131 @@ public class ProxyClientConfig {
                 ConfigConstants.DEFAULT_RANDOM_MAX_RETRY);
     }
 
-    public String getTlsServerCertFilePathAndName() {
-        return tlsServerCertFilePathAndName;
+    public String getManagerIP() {
+        return managerIP;
     }
 
-    public String getTlsServerKey() {
-        return tlsServerKey;
+    public int getManagerPort() {
+        return managerPort;
     }
 
-    public boolean isRequestByHttp() {
-        return requestByHttp;
+    public boolean isVisitManagerByHttp() {
+        return visitManagerByHttp;
     }
 
-    public String getInlongGroupId() {
-        return inlongGroupId;
+    public boolean isOnlyUseLocalProxyConfig() {
+        return onlyUseLocalProxyConfig;
     }
 
-    public void setInlongGroupId(String inlongGroupId) {
-        this.inlongGroupId = inlongGroupId;
+    public void setOnlyUseLocalProxyConfig(boolean onlyUseLocalProxyConfig) {
+        this.onlyUseLocalProxyConfig = onlyUseLocalProxyConfig;
     }
 
-    public int getManagerPort() {
-        return managerPort;
+    public boolean isEnableAuthentication() {
+        return this.enableAuthentication;
     }
 
-    public String getManagerIP() {
-        return managerIP;
+    public String getAuthSecretId() {
+        return authSecretId;
     }
 
-    public String getConfStoreBasePath() {
-        return confStoreBasePath;
+    public String getAuthSecretKey() {
+        return authSecretKey;
     }
 
-    public void setConfStoreBasePath(String confStoreBasePath) {
-        this.confStoreBasePath = confStoreBasePath;
+    public void setAuthenticationInfo(boolean needAuthentication, String 
secretId, String secretKey) {
+        this.enableAuthentication = needAuthentication;
+        if (!this.enableAuthentication) {
+            return;
+        }
+        if (StringUtils.isBlank(secretId)) {
+            throw new IllegalArgumentException("secretId is Blank!");
+        }
+        if (StringUtils.isBlank(secretKey)) {
+            throw new IllegalArgumentException("secretKey is Blank!");
+        }
+        this.authSecretId = secretId.trim();
+        this.authSecretKey = secretKey.trim();
+    }
+
+    public long getManagerConfigSyncInrMs() {
+        return managerConfigSyncInrMs;
+    }
+
+    public void setManagerConfigSyncInrMin(int managerConfigSyncInrMin) {
+        int tmpValue =
+                Math.min(ConfigConstants.VAL_MAX_CONFIG_SYNC_INTERVAL_MIN,
+                        
Math.max(ConfigConstants.VAL_MIN_CONFIG_SYNC_INTERVAL_MIN, 
managerConfigSyncInrMin));
+        this.managerConfigSyncInrMs = tmpValue * 
ConfigConstants.VAL_UNIT_MIN_TO_MS;
+    }
+
+    public int getManagerConnTimeoutMs() {
+        return managerConnTimeoutMs;
+    }
+
+    public void setManagerConnTimeoutMs(int managerConnTimeoutMs) {
+        this.managerConnTimeoutMs =
+                Math.min(ConfigConstants.VAL_MAX_CONNECT_TIMEOUT_MS,
+                        Math.max(ConfigConstants.VAL_MIN_CONNECT_TIMEOUT_MS, 
managerConnTimeoutMs));
+    }
+
+    public int getManagerSocketTimeoutMs() {
+        return managerSocketTimeoutMs;
+    }
+
+    public void setManagerSocketTimeoutMs(int managerSocketTimeoutMs) {
+        this.managerSocketTimeoutMs =
+                Math.min(ConfigConstants.VAL_MAX_SOCKET_TIMEOUT_MS,
+                        Math.max(ConfigConstants.VAL_MIN_SOCKET_TIMEOUT_MS, 
managerSocketTimeoutMs));
+    }
+
+    public int getConfigSyncMaxRetryIfFail() {
+        return configSyncMaxRetryIfFail;
+    }
+
+    public void setConfigSyncMaxRetryIfFail(int configSyncMaxRetryIfFail) {
+        this.configSyncMaxRetryIfFail =
+                Math.min(configSyncMaxRetryIfFail, 
ConfigConstants.VAL_MAX_RETRY_IF_CONFIG_SYNC_FAIL);
+    }
+
+    public String getConfigStoreBasePath() {
+        return configStoreBasePath;
+    }
+
+    public void setConfigStoreBasePath(String configStoreBasePath) {
+        if (StringUtils.isBlank(configStoreBasePath)) {
+            return;
+        }
+        this.configStoreBasePath = configStoreBasePath.trim();
+    }
+
+    public long getConfigCacheExpiredMs() {
+        return configCacheExpiredMs;
+    }
+
+    public void setConfigCacheExpiredMs(long configCacheExpiredMs) {
+        this.configCacheExpiredMs = configCacheExpiredMs;
+    }
+
+    public long getForceReChooseInrMs() {
+        return forceReChooseInrMs;
+    }
+
+    public void setForceReChooseInrMs(long forceReChooseInrMs) {
+        this.forceReChooseInrMs =
+                Math.max(ConfigConstants.VAL_MIN_FORCE_CHOOSE_INR_MS, 
forceReChooseInrMs);
+    }
+
+    public String getTlsServerCertFilePathAndName() {
+        return tlsServerCertFilePathAndName;
+    }
+
+    public String getTlsServerKey() {
+        return tlsServerKey;
+    }
+
+    public String getInlongGroupId() {
+        return inlongGroupId;
     }
 
     public int getAliveConnections() {
@@ -244,10 +308,6 @@ public class ProxyClientConfig {
         this.asyncCallbackSize = asyncCallbackSize;
     }
 
-    public String getManagerUrl() {
-        return managerUrl;
-    }
-
     public int getMaxTimeoutCnt() {
         return maxTimeoutCnt;
     }
@@ -259,22 +319,6 @@ public class ProxyClientConfig {
         this.maxTimeoutCnt = maxTimeoutCnt;
     }
 
-    public int getProxyUpdateIntervalMinutes() {
-        return proxyUpdateIntervalMinutes;
-    }
-
-    public void setProxyUpdateIntervalMinutes(int proxyUpdateIntervalMinutes) {
-        this.proxyUpdateIntervalMinutes = proxyUpdateIntervalMinutes;
-    }
-
-    public int getProxyUpdateMaxRetry() {
-        return proxyUpdateMaxRetry;
-    }
-
-    public void setProxyUpdateMaxRetry(int proxyUpdateMaxRetry) {
-        this.proxyUpdateMaxRetry = proxyUpdateMaxRetry;
-    }
-
     public long getConnectTimeoutMs() {
         return connectTimeoutMs;
     }
@@ -315,26 +359,6 @@ public class ProxyClientConfig {
         return isNeedDataEncry;
     }
 
-    public boolean isNeedAuthentication() {
-        return this.needAuthentication;
-    }
-
-    public void setAuthenticationInfo(boolean needAuthentication, boolean 
needDataEncry,
-            final String userName, final String secretKey) {
-        this.needAuthentication = needAuthentication;
-        this.isNeedDataEncry = needDataEncry;
-        if (this.needAuthentication || this.isNeedDataEncry) {
-            if (StringUtils.isBlank(userName)) {
-                throw new IllegalArgumentException("userName is Blank!");
-            }
-            if (StringUtils.isBlank(secretKey)) {
-                throw new IllegalArgumentException("secretKey is Blank!");
-            }
-        }
-        this.userName = userName.trim();
-        this.secretKey = secretKey.trim();
-    }
-
     public void setHttpsInfo(String tlsServerCertFilePathAndName, String 
tlsServerKey) {
         if (StringUtils.isBlank(tlsServerCertFilePathAndName)) {
             throw new IllegalArgumentException("tlsServerCertFilePathAndName 
is Blank!");
@@ -354,22 +378,6 @@ public class ProxyClientConfig {
         this.tlsVersion = tlsVersion;
     }
 
-    public String getUserName() {
-        return userName;
-    }
-
-    public String getSecretKey() {
-        return secretKey;
-    }
-
-    public boolean isReadProxyIPFromLocal() {
-        return readProxyIPFromLocal;
-    }
-
-    public void setReadProxyIPFromLocal(boolean readProxyIPFromLocal) {
-        this.readProxyIPFromLocal = readProxyIPFromLocal;
-    }
-
     public int getProxyHttpUpdateIntervalMinutes() {
         return proxyHttpUpdateIntervalMinutes;
     }
@@ -402,14 +410,6 @@ public class ProxyClientConfig {
         this.asyncWorkerInterval = asyncWorkerInterval;
     }
 
-    public int getManagerSocketTimeout() {
-        return managerSocketTimeout;
-    }
-
-    public void setManagerSocketTimeout(int managerSocketTimeout) {
-        this.managerSocketTimeout = managerSocketTimeout;
-    }
-
     public boolean isCleanHttpCacheWhenClosing() {
         return cleanHttpCacheWhenClosing;
     }
@@ -418,22 +418,6 @@ public class ProxyClientConfig {
         this.cleanHttpCacheWhenClosing = cleanHttpCacheWhenClosing;
     }
 
-    public long getMaxProxyCacheTimeInMs() {
-        return maxProxyCacheTimeInMs;
-    }
-
-    public void setMaxProxyCacheTimeInMs(long maxProxyCacheTimeInMs) {
-        this.maxProxyCacheTimeInMs = maxProxyCacheTimeInMs;
-    }
-
-    public int getManagerConnectionTimeout() {
-        return managerConnectionTimeout;
-    }
-
-    public void setManagerConnectionTimeout(int managerConnectionTimeout) {
-        this.managerConnectionTimeout = managerConnectionTimeout;
-    }
-
     public MetricConfig getMetricConfig() {
         return metricConfig;
     }
@@ -495,4 +479,40 @@ public class ProxyClientConfig {
     public void setSenderAttempt(int senderMaxAttempt) {
         this.senderMaxAttempt = senderMaxAttempt;
     }
+
+    private void checkAndParseAddress(String managerAddress) throws 
ProxysdkException {
+        if (StringUtils.isBlank(managerAddress)
+                || (!managerAddress.startsWith(ConfigConstants.HTTP)
+                        && !managerAddress.startsWith(ConfigConstants.HTTPS))) 
{
+            throw new ProxysdkException("managerAddress is blank or missing 
http/https protocol");
+        }
+        String hostPortInfo;
+        if (managerAddress.startsWith(ConfigConstants.HTTPS)) {
+            this.visitManagerByHttp = false;
+            hostPortInfo = 
managerAddress.substring(ConfigConstants.HTTPS.length() + 1);
+        } else {
+            hostPortInfo = 
managerAddress.substring(ConfigConstants.HTTP.length() + 1);
+        }
+        if (StringUtils.isBlank(hostPortInfo)) {
+            throw new ProxysdkException("managerAddress must include host:port 
info!");
+        }
+        String[] fields = hostPortInfo.split(":");
+        if (fields.length == 1) {
+            throw new ProxysdkException("managerAddress must include port 
info!");
+        } else if (fields.length > 2) {
+            throw new ProxysdkException("managerAddress must only include 
host:port info!");
+        }
+        if (StringUtils.isBlank(fields[0])) {
+            throw new ProxysdkException("managerAddress's host is blank!");
+        }
+        this.managerIP = fields[0].trim();
+        if (StringUtils.isBlank(fields[1])) {
+            throw new ProxysdkException("managerAddress's port is blank!");
+        }
+        try {
+            this.managerPort = Integer.parseInt(fields[1]);
+        } catch (Throwable ex) {
+            throw new ProxysdkException("managerAddress's port must be 
number!");
+        }
+    }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
index 441c62abe5..e37b9b2c0a 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
@@ -17,13 +17,14 @@
 
 package org.apache.inlong.sdk.dataproxy.config;
 
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
 import java.util.Map;
 
 public class ProxyConfigEntry implements java.io.Serializable {
 
     private int clusterId;
     private String groupId;
-    private int size;
     private Map<String, HostInfo> hostMap;
     private int load;
     private int switchStat;
@@ -59,16 +60,14 @@ public class ProxyConfigEntry implements 
java.io.Serializable {
     }
 
     public void setHostMap(Map<String, HostInfo> hostMap) {
-        this.size = hostMap.size();
         this.hostMap = hostMap;
     }
-
-    public int getSize() {
-        return size;
+    public boolean isNodesEmpty() {
+        return this.hostMap.isEmpty();
     }
 
-    public void setSize(int size) {
-        this.size = size;
+    public int getSize() {
+        return hostMap.size();
     }
 
     public String getGroupId() {
@@ -87,13 +86,6 @@ public class ProxyConfigEntry implements 
java.io.Serializable {
         isInterVisit = interVisit;
     }
 
-    @Override
-    public String toString() {
-        return "ProxyConfigEntry [hostMap=" + hostMap + ", load=" + load + ", 
size=" + size + ", isInterVisit="
-                + isInterVisit + ", groupId=" + groupId + ", switch=" + 
switchStat + ", maxPacketLength="
-                + maxPacketLength + "]";
-    }
-
     public int getClusterId() {
         return clusterId;
     }
@@ -101,4 +93,17 @@ public class ProxyConfigEntry implements 
java.io.Serializable {
     public void setClusterId(int clusterId) {
         this.clusterId = clusterId;
     }
+
+    @Override
+    public String toString() {
+        return new ToStringBuilder(this)
+                .append("clusterId", clusterId)
+                .append("groupId", groupId)
+                .append("hostMap", hostMap)
+                .append("load", load)
+                .append("switchStat", switchStat)
+                .append("isInterVisit", isInterVisit)
+                .append("maxPacketLength", maxPacketLength)
+                .toString();
+    }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index d259cdff08..986c59457e 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -21,20 +21,18 @@ import 
org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
 import org.apache.inlong.common.util.BasicAuth;
 import org.apache.inlong.sdk.dataproxy.ConfigConstants;
-import org.apache.inlong.sdk.dataproxy.LoadBalance;
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
-import org.apache.inlong.sdk.dataproxy.network.HashRing;
 import org.apache.inlong.sdk.dataproxy.network.IpUtils;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
 
 import com.google.gson.Gson;
-import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import com.google.gson.stream.JsonReader;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.Header;
@@ -44,7 +42,6 @@ import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.entity.UrlEncodedFormEntity;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
-import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.http.impl.client.HttpClients;
@@ -67,7 +64,7 @@ import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.security.KeyManagementException;
@@ -77,8 +74,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
@@ -89,161 +87,199 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  */
 public class ProxyConfigManager extends Thread {
 
-    public static final String APPLICATION_JSON = "application/json";
     private static final Logger logger = 
LoggerFactory.getLogger(ProxyConfigManager.class);
-    private final ProxyClientConfig clientConfig;
-    private final ClientMgr clientManager;
-    private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
+    private static final LogCounter exptCounter = new LogCounter(10, 100000, 
60 * 1000L);
+    private static final LogCounter parseCounter = new LogCounter(10, 100000, 
60 * 1000L);
+    private static final ReentrantReadWriteLock fileRw = new 
ReentrantReadWriteLock();
+
+    private final String callerId;
+    private ProxyClientConfig clientConfig;
     private final Gson gson = new Gson();
-    private final HashRing hashRing = HashRing.getInstance();
-    private List<HostInfo> proxyInfoList = new ArrayList<HostInfo>();
-    /* the status of the cluster.if this value is changed,we need rechoose 
three proxy */
+    private final ClientMgr clientManager;
+    private final ThreadLocalRandom random = ThreadLocalRandom.current();
+    private final AtomicBoolean shutDown = new AtomicBoolean(false);
+    // proxy configure info
+    private String localProxyConfigStoreFile;
+    private String proxyConfigVisitUrl;
+    private String proxyConfigCacheFile;
+    private List<HostInfo> proxyInfoList = new ArrayList<>();
     private int oldStat = 0;
-    private String inlongGroupId;
     private String localMd5;
-    private boolean bShutDown = false;
-    private long lstUpdatedTime = 0;
+    private long lstUpdateTime = 0;
+    // encrypt configure info
+    private String encryptConfigVisitUrl;
+    private String encryptConfigCacheFile;
     private EncryptConfigEntry userEncryptConfigEntry;
 
-    public ProxyConfigManager(final ProxyClientConfig configure, final 
ClientMgr clientManager) {
-        this.clientConfig = configure;
-        this.clientManager = clientManager;
-        this.hashRing.setVirtualNode(configure.getVirtualNode());
+    public ProxyConfigManager(ProxyClientConfig configure) {
+        this("MetaQuery", configure, null);
     }
 
-    public String getInlongGroupId() {
-        return inlongGroupId;
+    public ProxyConfigManager(String callerId, ProxyClientConfig configure, 
ClientMgr clientManager) {
+        this.callerId = callerId;
+        this.clientManager = clientManager;
+        this.storeAndBuildMetaConfigure(configure);
+        if (this.clientManager != null) {
+            this.setName("ConfigManager-" + this.callerId);
+            logger.info("ConfigManager({}) started, groupId={}",
+                    this.callerId, clientConfig.getInlongGroupId());
+        }
     }
 
-    public void setInlongGroupId(String inlongGroupId) {
-        this.inlongGroupId = inlongGroupId;
+    /**
+     * Update proxy client configure for query case
+     *
+     * @param configure  proxy client configure
+     * @throws Exception exception
+     */
+    public void updProxyClientConfig(ProxyClientConfig configure) throws 
Exception {
+        if (configure == null) {
+            throw new Exception("ProxyClientConfig is null");
+        }
+        if (this.clientManager != null) {
+            throw new Exception("Not allowed for non meta-query case!");
+        }
+        if (shutDown.get()) {
+            return;
+        }
+        this.storeAndBuildMetaConfigure(configure);
     }
 
     public void shutDown() {
-        logger.info("Begin to shut down ProxyConfigManager!");
-        bShutDown = true;
-    }
-
-    @Override
-    public void run() {
-        while (!bShutDown) {
-            try {
-                doProxyEntryQueryWork();
-                updateEncryptConfigEntry();
-                logger.info("ProxyConf update!");
-            } catch (Throwable e) {
-                logger.error("Refresh proxy ip list runs into exception {}, 
{}", e.toString(), e.getStackTrace());
-                e.printStackTrace();
-            }
-
-            /* Sleep some time.240-360s */
-            try {
-                Random random = new Random();
-                int proxyUpdateIntervalSec = 
this.clientConfig.getProxyUpdateIntervalMinutes() * 60;
-
-                int sleepTimeSec = proxyUpdateIntervalSec;
-                if (proxyUpdateIntervalSec > 5) {
-                    sleepTimeSec = proxyUpdateIntervalSec + random.nextInt() % 
(proxyUpdateIntervalSec / 5);
-                }
-                logger.info("sleep time {}", sleepTimeSec);
-                Thread.sleep(sleepTimeSec * 1000);
-            } catch (Throwable e2) {
-                //
-            }
+        if (clientManager == null) {
+            return;
+        }
+        if (shutDown.compareAndSet(false, true)) {
+            this.interrupt();
+            logger.info("ConfigManager({}) begin to shutdown, groupId={}!",
+                    this.callerId, clientConfig.getInlongGroupId());
         }
-        logger.info("ProxyConfigManager worker existed!");
     }
 
     /**
-     * try to read cache of proxy entry
+     * get groupId config
      *
-     * @return
+     * @return proxyConfigEntry
+     * @throws Exception ex
      */
-    private ProxyConfigEntry tryToReadCacheProxyEntry(String configCachePath) {
-        rw.readLock().lock();
-        try {
-            File file = new File(configCachePath);
-            long diffTime = System.currentTimeMillis() - file.lastModified();
-
-            if (diffTime < clientConfig.getMaxProxyCacheTimeInMs()) {
-                JsonReader reader = new JsonReader(new 
FileReader(configCachePath));
-                ProxyConfigEntry proxyConfigEntry = gson.fromJson(reader, 
ProxyConfigEntry.class);
-                logger.info("{} has a backup! {}", inlongGroupId, 
proxyConfigEntry);
-                return proxyConfigEntry;
-            }
-        } catch (Exception ex) {
-            logger.warn("try to read local cache, caught {}", ex.getMessage());
-        } finally {
-            rw.readLock().unlock();
+    public Tuple2<ProxyConfigEntry, String> getGroupIdConfigure(boolean 
needRetry) throws Exception {
+        if (shutDown.get()) {
+            return new Tuple2<>(null, "SDK has shutdown!");
         }
-        return null;
-    }
-
-    private void tryToWriteCacheProxyEntry(ProxyConfigEntry entry, String 
configCachePath) {
-        rw.writeLock().lock();
-        try {
-            File file = new File(configCachePath);
-            if (!file.getParentFile().exists()) {
-                // try to create parent
-                file.getParentFile().mkdirs();
+        if (clientConfig.isOnlyUseLocalProxyConfig()) {
+            return getLocalProxyListFromFile(this.localProxyConfigStoreFile);
+        } else {
+            boolean readFromRmt = false;
+            Tuple2<ProxyConfigEntry, String> result;
+            result = tryToReadCacheProxyEntry();
+            if (result.getF0() == null) {
+                int retryCount = 0;
+                do {
+                    result = requestProxyEntryQuietly();
+                    if (result.getF0() != null || !needRetry || 
shutDown.get()) {
+                        if (result.getF0() != null) {
+                            readFromRmt = true;
+                        }
+                        break;
+                    }
+                    // sleep then retry
+                    TimeUnit.MILLISECONDS.sleep(500);
+                } while (++retryCount < 
clientConfig.getConfigSyncMaxRetryIfFail());
             }
-            logger.info("try to write {}} to local cache {}", entry, 
configCachePath);
-            FileWriter fileWriter = new FileWriter(configCachePath);
-            gson.toJson(entry, fileWriter);
-            fileWriter.flush();
-            fileWriter.close();
-        } catch (Exception ex) {
-            logger.warn("try to write local cache, caught {}", 
ex.getMessage());
-        } finally {
-            rw.writeLock().unlock();
-        }
-    }
-
-    private ProxyConfigEntry requestProxyEntryQuietly() {
-        try {
-            return requestProxyList(this.clientConfig.getManagerUrl());
-        } catch (Exception e) {
-            logger.warn("try to request proxy list by http, caught {}", 
e.getMessage());
+            if (shutDown.get()) {
+                return new Tuple2<>(null, "SDK has shutdown!");
+            }
+            if (result.getF0() == null) {
+                return new Tuple2<>(null, "Visit manager error:" + 
result.getF1());
+            } else if (readFromRmt) {
+                tryToWriteCacheProxyEntry(result.getF0());
+            }
+            return result;
         }
-        return null;
     }
 
     /**
-     * get groupId config
+     * get encrypt config
      *
      * @return proxyConfigEntry
-     * @throws Exception
+     * @throws Exception ex
      */
-    public ProxyConfigEntry getGroupIdConfigure() throws Exception {
-        ProxyConfigEntry proxyEntry;
-        String configAddr = clientConfig.getConfStoreBasePath() + 
inlongGroupId;
-        if (this.clientConfig.isReadProxyIPFromLocal()) {
-            configAddr = configAddr + ".local";
-            proxyEntry = getLocalProxyListFromFile(configAddr);
-        } else {
-            configAddr = configAddr + ".proxyip";
-
-            proxyEntry = tryToReadCacheProxyEntry(configAddr);
-            if (proxyEntry == null) {
-                proxyEntry = requestProxyEntryQuietly();
-                int requestCount = 0;
-
-                while (requestCount < 3 && proxyEntry == null) {
-                    proxyEntry = requestProxyEntryQuietly();
-                    requestCount += 1;
-                    if (proxyEntry == null) {
-                        // sleep then retry
-                        TimeUnit.MILLISECONDS.sleep(500);
+    public Tuple2<EncryptConfigEntry, String> getEncryptConfigure(boolean 
needRetry) throws Exception {
+        if (!clientConfig.isNeedDataEncry()) {
+            return new Tuple2<>(null, "Not need data encrypt!");
+        }
+        if (shutDown.get()) {
+            return new Tuple2<>(null, "SDK has shutdown!");
+        }
+        EncryptConfigEntry encryptEntry = this.userEncryptConfigEntry;
+        if (encryptEntry != null) {
+            return new Tuple2<>(encryptEntry, "Ok");
+        }
+        boolean readFromRmt = false;
+        Tuple2<EncryptConfigEntry, String> result = readCachedPubKeyEntry();
+        if (result.getF0() == null) {
+            int retryCount = 0;
+            do {
+                result = requestPubKeyFromManager();
+                if (result.getF0() != null || !needRetry || shutDown.get()) {
+                    if (result.getF0() != null) {
+                        readFromRmt = true;
                     }
+                    break;
+                }
+                // sleep then retry
+                TimeUnit.MILLISECONDS.sleep(500);
+            } while (++retryCount < 
clientConfig.getConfigSyncMaxRetryIfFail());
+        }
+        if (shutDown.get()) {
+            return new Tuple2<>(null, "SDK has shutdown!");
+        }
+        if (result.getF0() == null) {
+            return new Tuple2<>(null, "Visit manager error:" + result.getF1());
+        } else if (readFromRmt) {
+            updateEncryptConfigEntry(result.getF0());
+            writeCachePubKeyEntryFile(result.getF0());
+        }
+        return result;
+    }
+
+    @Override
+    public void run() {
+        logger.info("ConfigManager({}) thread start, groupId={}",
+                this.callerId, clientConfig.getInlongGroupId());
+        while (!shutDown.get()) {
+            // update proxy nodes meta configures
+            try {
+                doProxyEntryQueryWork();
+            } catch (Throwable ex) {
+                if (exptCounter.shouldPrint()) {
+                    logger.warn("ConfigManager({}) refresh proxy configure 
exception, groupId={}",
+                            this.callerId, clientConfig.getInlongGroupId(), 
ex);
                 }
             }
-            if (proxyEntry == null) {
-                throw new Exception("Visit manager error, please check log!");
-            } else {
-                tryToWriteCacheProxyEntry(proxyEntry, configAddr);
+            // update encrypt configure
+            if (clientConfig.isNeedDataEncry()) {
+                try {
+                    doEncryptConfigEntryQueryWork();
+                } catch (Throwable ex) {
+                    if (exptCounter.shouldPrint()) {
+                        logger.warn("ConfigManager({}) refresh encrypt info 
exception, groupId={}",
+                                this.callerId, 
clientConfig.getInlongGroupId(), ex);
+                    }
+                }
+            }
+            if (shutDown.get()) {
+                break;
+            }
+            // sleep some time
+            try {
+                Thread.sleep(clientConfig.getManagerConfigSyncInrMs() + 
random.nextInt(100) * 100);
+            } catch (Throwable e2) {
+                //
             }
         }
-        return proxyEntry;
+        logger.info("ConfigManager({}) worker existed, groupId={}",
+                this.callerId, this.clientConfig.getInlongGroupId());
     }
 
     /**
@@ -252,55 +288,140 @@ public class ProxyConfigManager extends Thread {
      * @throws Exception
      */
     public void doProxyEntryQueryWork() throws Exception {
+        if (shutDown.get()) {
+            return;
+        }
         /* Request the configuration from manager. */
         if (localMd5 == null) {
             localMd5 = calcHostInfoMd5(proxyInfoList);
         }
-        ProxyConfigEntry proxyEntry = null;
-        String configAddr = clientConfig.getConfStoreBasePath() + 
inlongGroupId;
-        if (clientConfig.isReadProxyIPFromLocal()) {
-            configAddr = configAddr + ".local";
-            proxyEntry = getLocalProxyListFromFile(configAddr);
+        Tuple2<ProxyConfigEntry, String> result;
+        if (clientConfig.isOnlyUseLocalProxyConfig()) {
+            result = getLocalProxyListFromFile(this.localProxyConfigStoreFile);
         } else {
-            /* Do a compare and see if it needs to re-choose the channel. */
-            configAddr = configAddr + ".managerip";
-            int retryCount = 1;
-            while (proxyEntry == null && retryCount < 
this.clientConfig.getProxyUpdateMaxRetry()) {
-                proxyEntry = requestProxyEntryQuietly();
-                retryCount++;
-                if (proxyEntry == null) {
-                    // sleep then retry.
-                    TimeUnit.SECONDS.sleep(1);
+            int retryCnt = 0;
+            do {
+                result = requestProxyEntryQuietly();
+                if (result.getF0() != null || shutDown.get()) {
+                    break;
                 }
+                // sleep then retry.
+                TimeUnit.SECONDS.sleep(2);
+            } while (++retryCnt < 
this.clientConfig.getConfigSyncMaxRetryIfFail() && !shutDown.get());
+            if (shutDown.get()) {
+                return;
             }
-            if (proxyEntry != null) {
-                tryToWriteCacheProxyEntry(proxyEntry, configAddr);
+            if (result.getF0() != null) {
+                tryToWriteCacheProxyEntry(result.getF0());
             }
-            /* We should exit if no local IP list and can't request it from 
manager. */
-            if (localMd5 == null && proxyEntry == null) {
-                logger.error("Can't connect manager at the start of proxy API 
{}",
-                        this.clientConfig.getManagerUrl());
-                proxyEntry = tryToReadCacheProxyEntry(configAddr);
+            /* We should exit if no local IP list and can't request it from 
TDManager. */
+            if (localMd5 == null && result.getF0() == null) {
+                if (exptCounter.shouldPrint()) {
+                    logger.warn("ConfigManager({}) connect manager({}) 
failure, get cached configure, groupId={}",
+                            this.callerId, this.proxyConfigVisitUrl, 
this.clientConfig.getInlongGroupId());
+                }
+                result = tryToReadCacheProxyEntry();
             }
-            if (localMd5 != null && proxyEntry == null && proxyInfoList != 
null) {
-                StringBuffer s = new StringBuffer();
-                for (HostInfo tmp : proxyInfoList) {
-                    
s.append(tmp.getHostName()).append(";").append(tmp.getPortNumber())
-                            .append(",");
+            if (localMd5 != null && result.getF0() == null && proxyInfoList != 
null) {
+                if (exptCounter.shouldPrint()) {
+                    logger.warn("ConfigManager({}) connect manager({}) 
failure, using the last configure, groupId={}",
+                            this.callerId, this.proxyConfigVisitUrl, 
this.clientConfig.getInlongGroupId());
                 }
-                logger.warn("Backup proxyEntry [{}]", s);
             }
         }
-        if (localMd5 == null && proxyEntry == null && proxyInfoList == null) {
-            if (clientConfig.isReadProxyIPFromLocal()) {
-                throw new Exception("Local proxy address configure "
-                        + "read failure, please check first!");
+        if (localMd5 == null && result.getF0() == null && proxyInfoList == 
null) {
+            if (clientConfig.isOnlyUseLocalProxyConfig()) {
+                throw new Exception("Read local proxy configure failure, 
please check first!");
             } else {
                 throw new Exception("Connect Manager failure, please check 
first!");
             }
         }
-        compareProxyList(proxyEntry);
+        compareAndUpdateProxyList(result.getF0());
+    }
+
+    private void doEncryptConfigEntryQueryWork() throws Exception {
+        if (shutDown.get()) {
+            return;
+        }
+        int retryCount = 0;
+        Tuple2<EncryptConfigEntry, String> result;
+        do {
+            result = requestPubKeyFromManager();
+            if (result.getF0() != null || shutDown.get()) {
+                break;
+            }
+            // sleep then retry
+            TimeUnit.MILLISECONDS.sleep(500);
+        } while (++retryCount < clientConfig.getConfigSyncMaxRetryIfFail());
+        if (shutDown.get()) {
+            return;
+        }
+        if (result.getF0() == null) {
+            if (this.userEncryptConfigEntry != null) {
+                logger.warn("ConfigManager({}) connect manager({}) failure, 
using the last pubKey, secretId={}",
+                        this.callerId, this.encryptConfigVisitUrl, 
this.clientConfig.getAuthSecretId());
+                return;
+            }
+            throw new Exception("Visit manager error:" + result.getF1());
+        }
+        updateEncryptConfigEntry(result.getF0());
+        writeCachePubKeyEntryFile(result.getF0());
+    }
+
+    public Tuple2<ProxyConfigEntry, String> getLocalProxyListFromFile(String 
filePath) {
+        String strRet;
+        try {
+            byte[] fileBytes = Files.readAllBytes(Paths.get(filePath));
+            strRet = new String(fileBytes);
+        } catch (Throwable ex) {
+            return new Tuple2<>(null, "Read local configure failure from "
+                    + filePath + ", reason is " + ex.getMessage());
+        }
+        if (StringUtils.isBlank(strRet)) {
+            return new Tuple2<>(null, "Blank configure local file from " + 
filePath);
+        }
+        return getProxyConfigEntry(strRet);
+    }
 
+    private Tuple2<ProxyConfigEntry, String> requestProxyEntryQuietly() {
+        List<BasicNameValuePair> params = buildProxyNodeQueryParams();
+        // request meta info from manager
+        logger.debug("ConfigManager({}) request configure to manager({}), 
param={}",
+                this.callerId, this.proxyConfigVisitUrl, params);
+        Tuple2<Boolean, String> queryResult = 
requestConfiguration(this.proxyConfigVisitUrl, params);
+        if (!queryResult.getF0()) {
+            return new Tuple2<>(null, queryResult.getF1());
+        }
+        // parse result
+        logger.debug("ConfigManager({}) received configure, from manager({}), 
groupId={}, result={}",
+                callerId, proxyConfigVisitUrl, 
clientConfig.getInlongGroupId(), queryResult.getF1());
+        try {
+            return getProxyConfigEntry(queryResult.getF1());
+        } catch (Throwable ex) {
+            if (exptCounter.shouldPrint()) {
+                logger.warn("ConfigManager({}) parse failure, from 
manager({}), groupId={}, result={}",
+                        callerId, proxyConfigVisitUrl, 
clientConfig.getInlongGroupId(), queryResult.getF1(), ex);
+            }
+            return new Tuple2<>(null, ex.getMessage());
+        }
+    }
+
+    private String calcHostInfoMd5(List<HostInfo> hostInfoList) {
+        if (hostInfoList == null || hostInfoList.isEmpty()) {
+            return null;
+        }
+        Collections.sort(hostInfoList);
+        StringBuilder hostInfoMd5 = new StringBuilder();
+        for (HostInfo hostInfo : hostInfoList) {
+            if (hostInfo == null) {
+                continue;
+            }
+            hostInfoMd5.append(hostInfo.getHostName());
+            hostInfoMd5.append(":");
+            hostInfoMd5.append(hostInfo.getPortNumber());
+            hostInfoMd5.append(";");
+        }
+        return DigestUtils.md5Hex(hostInfoMd5.toString());
     }
 
     /**
@@ -308,139 +429,206 @@ public class ProxyConfigManager extends Thread {
      *
      * @param proxyEntry
      */
-    private void compareProxyList(ProxyConfigEntry proxyEntry) {
-        if (proxyEntry != null) {
-            logger.info("{}", proxyEntry.toString());
-            if (proxyEntry.getSize() != 0) {
-                /* Initialize the current proxy information list first. */
-                clientManager.setLoadThreshold(proxyEntry.getLoad());
-
-                List<HostInfo> newProxyInfoList = new ArrayList<HostInfo>();
-                for (Map.Entry<String, HostInfo> entry : 
proxyEntry.getHostMap().entrySet()) {
-                    newProxyInfoList.add(entry.getValue());
-                }
-
-                String newMd5 = calcHostInfoMd5(newProxyInfoList);
-                String oldMd5 = calcHostInfoMd5(proxyInfoList);
-                if (newMd5 != null && !newMd5.equals(oldMd5)) {
-                    /* Choose random alive connections to send messages. */
-                    logger.info("old md5 {} new md5 {}", oldMd5, newMd5);
-                    proxyInfoList.clear();
-                    proxyInfoList = newProxyInfoList;
-                    clientManager.setProxyInfoList(proxyInfoList);
-                    lstUpdatedTime = System.currentTimeMillis();
-                } else if (proxyEntry.getSwitchStat() != oldStat) {
-                    /* judge cluster's switch state */
-                    oldStat = proxyEntry.getSwitchStat();
-                    if ((System.currentTimeMillis() - lstUpdatedTime) > 3 * 60 
* 1000) {
-                        logger.info("switch the cluster!");
-                        proxyInfoList.clear();
-                        proxyInfoList = newProxyInfoList;
-                        clientManager.setProxyInfoList(proxyInfoList);
-                    } else {
-                        logger.info("only change oldStat ");
-                    }
-                } else {
-                    newProxyInfoList.clear();
-                    logger.info("proxy IP list doesn't change, load {}", 
proxyEntry.getLoad());
-                }
-                if (clientConfig.getLoadBalance() == 
LoadBalance.CONSISTENCY_HASH) {
-                    updateHashRing(proxyInfoList);
-                }
-            } else {
-                logger.error("proxyEntry's size is zero");
+    private void compareAndUpdateProxyList(ProxyConfigEntry proxyEntry) {
+        if ((proxyEntry == null || proxyEntry.isNodesEmpty())
+                && (proxyInfoList.isEmpty()
+                        || (System.currentTimeMillis() - lstUpdateTime) < 
clientConfig.getForceReChooseInrMs())) {
+            return;
+        }
+        int newSwitchStat;
+        List<HostInfo> newBusInfoList;
+        if (proxyEntry == null || proxyEntry.isNodesEmpty()) {
+            newSwitchStat = oldStat;
+            newBusInfoList = new ArrayList<>(proxyInfoList.size());
+            newBusInfoList.addAll(proxyInfoList);
+        } else {
+            /* Initialize the current nodes information list first. */
+            clientManager.setLoadThreshold(proxyEntry.getLoad());
+            newSwitchStat = proxyEntry.getSwitchStat();
+            newBusInfoList = new ArrayList<>(proxyEntry.getSize());
+            for (Map.Entry<String, HostInfo> entry : 
proxyEntry.getHostMap().entrySet()) {
+                newBusInfoList.add(entry.getValue());
             }
         }
+        String newMd5 = calcHostInfoMd5(newBusInfoList);
+        String oldMd5 = calcHostInfoMd5(proxyInfoList);
+        boolean nodeChanged = newMd5 != null && !newMd5.equals(oldMd5);
+        if (nodeChanged || newSwitchStat != oldStat
+                || (System.currentTimeMillis() - lstUpdateTime) >= 
clientConfig.getForceReChooseInrMs()) {
+            proxyInfoList = newBusInfoList;
+            clientManager.setProxyInfoList(proxyInfoList);
+            lstUpdateTime = System.currentTimeMillis();
+            oldStat = newSwitchStat;
+        }
     }
 
-    public EncryptConfigEntry getEncryptConfigEntry(final String userName) {
-        if (StringUtils.isBlank(userName)) {
-            return null;
+    private void tryToWriteCacheProxyEntry(ProxyConfigEntry entry) {
+        logger.debug("ConfigManager({}) write {} to cache file ({})",
+                this.callerId, entry, this.proxyConfigCacheFile);
+        fileRw.writeLock().lock();
+        try {
+            File file = new File(this.proxyConfigCacheFile);
+            if (!file.getParentFile().exists()) {
+                // try to create parent
+                file.getParentFile().mkdirs();
+            }
+            FileWriter fileWriter = new FileWriter(this.proxyConfigCacheFile);
+            gson.toJson(entry, fileWriter);
+            fileWriter.flush();
+            fileWriter.close();
+        } catch (Throwable ex) {
+            if (exptCounter.shouldPrint()) {
+                logger.warn("ConfigManager({}) write cache file({}) exception, 
groupId={}, data={}",
+                        this.callerId, this.clientConfig.getInlongGroupId(),
+                        this.proxyConfigCacheFile, entry.toString(), ex);
+            }
+        } finally {
+            fileRw.writeLock().unlock();
         }
-        EncryptConfigEntry encryptEntry = this.userEncryptConfigEntry;
-        if (encryptEntry == null) {
-            int retryCount = 0;
-            encryptEntry = requestPubKey(this.clientConfig.getRsaPubKeyUrl(), 
userName, false);
-            while (encryptEntry == null && retryCount < 
this.clientConfig.getProxyUpdateMaxRetry()) {
-                encryptEntry = 
requestPubKey(this.clientConfig.getRsaPubKeyUrl(), userName, false);
-                retryCount++;
-            }
-            if (encryptEntry == null) {
-                encryptEntry = getStoredPubKeyEntry(userName);
-                if (encryptEntry != null) {
-                    encryptEntry.getRsaEncryptedKey();
-                    synchronized (this) {
-                        if (this.userEncryptConfigEntry == null) {
-                            this.userEncryptConfigEntry = encryptEntry;
-                        } else {
-                            encryptEntry = this.userEncryptConfigEntry;
-                        }
-                    }
+    }
+
+    /**
+     * try to read cache of proxy entry
+     *
+     * @return read result
+     */
+    private Tuple2<ProxyConfigEntry, String> tryToReadCacheProxyEntry() {
+        fileRw.readLock().lock();
+        try {
+            File file = new File(this.proxyConfigCacheFile);
+            if (file.exists()) {
+                long diffTime = System.currentTimeMillis() - 
file.lastModified();
+                if (clientConfig.getConfigCacheExpiredMs() > 0
+                        && diffTime < clientConfig.getConfigCacheExpiredMs()) {
+                    JsonReader reader = new JsonReader(new 
FileReader(this.proxyConfigCacheFile));
+                    ProxyConfigEntry proxyConfigEntry = gson.fromJson(reader, 
ProxyConfigEntry.class);
+                    return new Tuple2<>(proxyConfigEntry, "Ok");
                 }
+                return new Tuple2<>(null, "cache configure expired!");
             } else {
-                synchronized (this) {
-                    if (this.userEncryptConfigEntry == null || 
this.userEncryptConfigEntry != encryptEntry) {
-                        storePubKeyEntry(encryptEntry);
-                        encryptEntry.getRsaEncryptedKey();
-                        this.userEncryptConfigEntry = encryptEntry;
-                    } else {
-                        encryptEntry = this.userEncryptConfigEntry;
-                    }
-                }
+                return new Tuple2<>(null, "no cache configure!");
+            }
+        } catch (Throwable ex) {
+            if (exptCounter.shouldPrint()) {
+                logger.warn("ConfigManager({}) read cache file({}) exception, 
groupId={}",
+                        this.callerId, this.proxyConfigCacheFile, 
this.clientConfig.getInlongGroupId(), ex);
             }
+            return new Tuple2<>(null, "read cache configure failure:" + 
ex.getMessage());
+        } finally {
+            fileRw.readLock().unlock();
         }
-        return encryptEntry;
     }
 
-    private void updateEncryptConfigEntry() {
-        if (StringUtils.isBlank(this.clientConfig.getUserName())) {
-            return;
+    private Tuple2<EncryptConfigEntry, String> requestPubKeyFromManager() {
+        List<BasicNameValuePair> params = buildPubKeyQueryParams();
+        // request meta info from manager
+        logger.debug("ConfigManager({}) request pubkey to manager({}), 
param={}",
+                this.callerId, this.encryptConfigVisitUrl, params);
+        Tuple2<Boolean, String> queryResult = 
requestConfiguration(this.encryptConfigVisitUrl, params);
+        if (!queryResult.getF0()) {
+            return new Tuple2<>(null, queryResult.getF1());
+        }
+        logger.debug("ConfigManager({}) received pubkey from manager({}), 
result={}",
+                this.callerId, this.encryptConfigVisitUrl, 
queryResult.getF1());
+        JsonObject pubKeyConf;
+        try {
+            pubKeyConf = 
JsonParser.parseString(queryResult.getF1()).getAsJsonObject();
+        } catch (Throwable ex) {
+            if (parseCounter.shouldPrint()) {
+                logger.warn("ConfigManager({}) parse failure, secretId={}, 
config={}!",
+                        this.callerId, this.clientConfig.getAuthSecretId(), 
queryResult.getF1());
+            }
+            return new Tuple2<>(null, "parse pubkey failure:" + 
ex.getMessage());
         }
-        int retryCount = 0;
-        EncryptConfigEntry encryptConfigEntry = 
requestPubKey(this.clientConfig.getRsaPubKeyUrl(),
-                this.clientConfig.getUserName(), false);
-        while (encryptConfigEntry == null && retryCount < 
this.clientConfig.getProxyUpdateMaxRetry()) {
-            encryptConfigEntry = 
requestPubKey(this.clientConfig.getRsaPubKeyUrl(),
-                    this.clientConfig.getUserName(), false);
-            retryCount++;
-        }
-        if (encryptConfigEntry == null) {
-            return;
+        if (pubKeyConf == null) {
+            return new Tuple2<>(null, "No public key information");
+        }
+        if (!pubKeyConf.has("resultCode")) {
+            if (parseCounter.shouldPrint()) {
+                logger.warn("ConfigManager({}) config failure: resultCode 
field not exist, secretId={}, config={}!",
+                        this.callerId, this.clientConfig.getAuthSecretId(), 
queryResult.getF1());
+            }
+            return new Tuple2<>(null, "resultCode field not exist");
+        }
+        int resultCode = pubKeyConf.get("resultCode").getAsInt();
+        if (resultCode != 0) {
+            if (parseCounter.shouldPrint()) {
+                logger.warn("ConfigManager({}) config failure: resultCode != 
0, secretId={}, config={}!",
+                        this.callerId, this.clientConfig.getAuthSecretId(), 
queryResult.getF1());
+            }
+            return new Tuple2<>(null, "resultCode != 0!");
         }
-        synchronized (this) {
-            if (this.userEncryptConfigEntry == null || 
this.userEncryptConfigEntry != encryptConfigEntry) {
-                storePubKeyEntry(encryptConfigEntry);
-                encryptConfigEntry.getRsaEncryptedKey();
-                this.userEncryptConfigEntry = encryptConfigEntry;
+        if (!pubKeyConf.has("resultData")) {
+            if (parseCounter.shouldPrint()) {
+                logger.warn("ConfigManager({}) config failure: resultData 
field not exist, secretId={}, config={}!",
+                        this.callerId, this.clientConfig.getAuthSecretId(), 
queryResult.getF1());
             }
+            return new Tuple2<>(null, "resultData field not exist");
         }
-        return;
+        JsonObject resultData = pubKeyConf.get("resultData").getAsJsonObject();
+        if (resultData != null) {
+            String publicKey = resultData.get("publicKey").getAsString();
+            if (StringUtils.isBlank(publicKey)) {
+                if (parseCounter.shouldPrint()) {
+                    logger.warn("ConfigManager({}) config failure: publicKey 
is blank, secretId={}, config={}!",
+                            this.callerId, 
this.clientConfig.getAuthSecretId(), queryResult.getF1());
+                }
+                return new Tuple2<>(null, "publicKey is blank!");
+            }
+            String username = resultData.get("username").getAsString();
+            if (StringUtils.isBlank(username)) {
+                if (parseCounter.shouldPrint()) {
+                    logger.warn("ConfigManager({}) config failure: username is 
blank, secretId={}, config={}!",
+                            this.callerId, 
this.clientConfig.getAuthSecretId(), queryResult.getF1());
+                }
+                return new Tuple2<>(null, "username is blank!");
+            }
+            String versionStr = resultData.get("version").getAsString();
+            if (StringUtils.isBlank(versionStr)) {
+                if (parseCounter.shouldPrint()) {
+                    logger.warn("ConfigManager({}) config failure: version is 
blank, secretId={}, config={}!",
+                            this.callerId, 
this.clientConfig.getAuthSecretId(), queryResult.getF1());
+                }
+                return new Tuple2<>(null, "version is blank!");
+            }
+            return new Tuple2<>(new EncryptConfigEntry(username, versionStr, 
publicKey), "Ok");
+        }
+        return new Tuple2<>(null, "resultData value is null!");
     }
 
-    private EncryptConfigEntry getStoredPubKeyEntry(String userName) {
-        if (StringUtils.isBlank(userName)) {
-            logger.warn(" userName(" + userName + ") is not available");
-            return null;
-        }
-        EncryptConfigEntry entry;
+    private void updateEncryptConfigEntry(EncryptConfigEntry newEncryptEntry) {
+        newEncryptEntry.getRsaEncryptedKey();
+        this.userEncryptConfigEntry = newEncryptEntry;
+    }
+
+    private Tuple2<EncryptConfigEntry, String> readCachedPubKeyEntry() {
+        ObjectInputStream is;
         FileInputStream fis = null;
-        ObjectInputStream is = null;
-        rw.readLock().lock();
+        EncryptConfigEntry entry;
+        fileRw.readLock().lock();
         try {
-            File file = new File(clientConfig.getConfStoreBasePath() + 
userName + ".pubKey");
+            File file = new File(this.encryptConfigCacheFile);
             if (file.exists()) {
-                fis = new FileInputStream(file);
-                is = new ObjectInputStream(fis);
-                entry = (EncryptConfigEntry) is.readObject();
-                // is.close();
-                fis.close();
-                return entry;
+                long diffTime = System.currentTimeMillis() - 
file.lastModified();
+                if (clientConfig.getConfigCacheExpiredMs() > 0
+                        && diffTime < clientConfig.getConfigCacheExpiredMs()) {
+                    fis = new FileInputStream(file);
+                    is = new ObjectInputStream(fis);
+                    entry = (EncryptConfigEntry) is.readObject();
+                    // is.close();
+                    fis.close();
+                    return new Tuple2<>(entry, "Ok");
+                }
+                return new Tuple2<>(null, "cache PubKeyEntry expired!");
             } else {
-                return null;
+                return new Tuple2<>(null, "no PubKeyEntry file!");
             }
-        } catch (Throwable e1) {
-            logger.error("Read " + userName + " stored PubKeyEntry error ", 
e1);
-            return null;
+        } catch (Throwable ex) {
+            if (exptCounter.shouldPrint()) {
+                logger.warn("ConfigManager({}) read({}) file exception, 
secretId={}",
+                        callerId, encryptConfigCacheFile, 
clientConfig.getAuthSecretId(), ex);
+            }
+            return new Tuple2<>(null, "read PubKeyEntry file failure:" + 
ex.getMessage());
         } finally {
             if (fis != null) {
                 try {
@@ -449,16 +637,16 @@ public class ProxyConfigManager extends Thread {
                     //
                 }
             }
-            rw.readLock().unlock();
+            fileRw.readLock().unlock();
         }
     }
 
-    private void storePubKeyEntry(EncryptConfigEntry entry) {
+    private void writeCachePubKeyEntryFile(EncryptConfigEntry entry) {
+        ObjectOutputStream p;
         FileOutputStream fos = null;
-        ObjectOutputStream p = null;
-        rw.writeLock().lock();
+        fileRw.writeLock().lock();
         try {
-            File file = new File(clientConfig.getConfStoreBasePath() + 
entry.getUserName() + ".pubKey");
+            File file = new File(this.encryptConfigCacheFile);
             if (!file.getParentFile().exists()) {
                 file.getParentFile().mkdir();
             }
@@ -470,9 +658,11 @@ public class ProxyConfigManager extends Thread {
             p.writeObject(entry);
             p.flush();
             // p.close();
-        } catch (Throwable e) {
-            logger.error("store EncryptConfigEntry " + entry.toString() + " 
exception ", e);
-            e.printStackTrace();
+        } catch (Throwable ex) {
+            if (exptCounter.shouldPrint()) {
+                logger.warn("ConfigManager({}) write file({}) exception, 
secretId={}, content={}",
+                        callerId, encryptConfigCacheFile, 
clientConfig.getAuthSecretId(), entry.toString(), ex);
+            }
         } finally {
             if (fos != null) {
                 try {
@@ -481,153 +671,192 @@ public class ProxyConfigManager extends Thread {
                     //
                 }
             }
-            rw.writeLock().unlock();
+            fileRw.writeLock().unlock();
         }
     }
 
-    private String calcHostInfoMd5(List<HostInfo> hostInfoList) {
-        if (hostInfoList == null || hostInfoList.isEmpty()) {
-            return null;
-        }
-        Collections.sort(hostInfoList);
-        StringBuffer hostInfoMd5 = new StringBuffer();
-        for (HostInfo hostInfo : hostInfoList) {
-            if (hostInfo == null) {
-                continue;
+    /* Request new configurations from Manager. */
+    private Tuple2<Boolean, String> requestConfiguration(String url, 
List<BasicNameValuePair> params) {
+        HttpParams myParams = new BasicHttpParams();
+        HttpConnectionParams.setConnectionTimeout(myParams, 
clientConfig.getManagerConnTimeoutMs());
+        HttpConnectionParams.setSoTimeout(myParams, 
clientConfig.getManagerSocketTimeoutMs());
+        CloseableHttpClient httpClient;
+        // build http(s) client
+        try {
+            if (this.clientConfig.isVisitManagerByHttp()) {
+                httpClient = new DefaultHttpClient(myParams);
+            } else {
+                httpClient = getCloseableHttpClient(params);
             }
-            hostInfoMd5.append(hostInfo.getHostName());
-            hostInfoMd5.append(";");
-            hostInfoMd5.append(hostInfo.getPortNumber());
-            hostInfoMd5.append(";");
-        }
-
-        return DigestUtils.md5Hex(hostInfoMd5.toString());
-    }
-
-    private EncryptConfigEntry requestPubKey(String pubKeyUrl, String 
userName, boolean needGet) {
-        if (StringUtils.isBlank(userName)) {
-            logger.error("Queried userName is null!");
-            return null;
-        }
-        List<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
-        params.add(new BasicNameValuePair("operation", "query"));
-        params.add(new BasicNameValuePair("username", userName));
-        String returnStr = requestConfiguration(pubKeyUrl, params);
-        if (StringUtils.isBlank(returnStr)) {
-            logger.info("No public key information returned from manager");
-            return null;
-        }
-        JsonObject pubKeyConf = 
JsonParser.parseString(returnStr).getAsJsonObject();
-        if (pubKeyConf == null) {
-            logger.info("No public key information returned from manager");
-            return null;
-        }
-        if (!pubKeyConf.has("resultCode")) {
-            logger.info("Parse pubKeyConf failure: No resultCode key 
information returned from manager");
-            return null;
-        }
-        int resultCode = pubKeyConf.get("resultCode").getAsInt();
-        if (resultCode != 0) {
-            logger.info("query pubKeyConf failure, error code is " + 
resultCode + ", errInfo is "
-                    + pubKeyConf.get("message").getAsString());
-            return null;
-        }
-        if (!pubKeyConf.has("resultData")) {
-            logger.info("Parse pubKeyConf failure: No resultData key 
information returned from manager");
-            return null;
+        } catch (Throwable eHttp) {
+            if (exptCounter.shouldPrint()) {
+                logger.warn("ConfigManager({}) create Http(s) client failure, 
url={}, params={}",
+                        this.callerId, url, params, eHttp);
+            }
+            return new Tuple2<>(false, eHttp.getMessage());
         }
-        JsonObject resultData = pubKeyConf.get("resultData").getAsJsonObject();
-        if (resultData != null) {
-            String publicKey = resultData.get("publicKey").getAsString();
-            if (StringUtils.isBlank(publicKey)) {
-                return null;
+        // post request and get response
+        HttpPost httpPost = null;
+        try {
+            httpPost = new HttpPost(url);
+            this.addAuthorizationInfo(httpPost);
+            UrlEncodedFormEntity urlEncodedFormEntity =
+                    new UrlEncodedFormEntity(params, StandardCharsets.UTF_8);
+            httpPost.setEntity(urlEncodedFormEntity);
+            HttpResponse response = httpClient.execute(httpPost);
+            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+                return new Tuple2<>(false, 
response.getStatusLine().getStatusCode()
+                        + ":" + response.getStatusLine().getStatusCode());
             }
-            String username = resultData.get("username").getAsString();
-            if (StringUtils.isBlank(username)) {
-                return null;
+            String returnStr = EntityUtils.toString(response.getEntity());
+            if (StringUtils.isBlank(returnStr)) {
+                return new Tuple2<>(false, "query result is blank!");
             }
-            String versionStr = resultData.get("version").getAsString();
-            if (StringUtils.isBlank(versionStr)) {
-                return null;
+            return new Tuple2<>(true, returnStr);
+        } catch (Throwable ex) {
+            if (exptCounter.shouldPrint()) {
+                logger.warn("ConfigManager({}) connect manager({}) exception, 
params={}",
+                        this.callerId, url, params, ex);
+            }
+            return new Tuple2<>(false, ex.getMessage());
+        } finally {
+            if (httpPost != null) {
+                httpPost.releaseConnection();
+            }
+            if (httpClient != null) {
+                httpClient.getConnectionManager().shutdown();
             }
-            return new EncryptConfigEntry(username, versionStr, publicKey);
         }
-        return null;
     }
 
-    public ProxyConfigEntry getLocalProxyListFromFile(String filePath) throws 
Exception {
-        DataProxyNodeResponse proxyCluster;
-        try {
-            byte[] fileBytes = Files.readAllBytes(Paths.get(filePath));
-            proxyCluster = gson.fromJson(new String(fileBytes), 
DataProxyNodeResponse.class);
-        } catch (Throwable e) {
-            throw new Exception("Read local proxyList File failure by " + 
filePath + ", reason is " + e.getCause());
-        }
-        if (ObjectUtils.isEmpty(proxyCluster)) {
-            logger.warn("no proxyCluster configure from local file");
-            return null;
+    private CloseableHttpClient 
getCloseableHttpClient(List<BasicNameValuePair> params)
+            throws NoSuchAlgorithmException, KeyManagementException {
+        CloseableHttpClient httpClient;
+        ArrayList<Header> headers = new ArrayList<>();
+        for (BasicNameValuePair paramItem : params) {
+            headers.add(new BasicHeader(paramItem.getName(), 
paramItem.getValue()));
         }
+        RequestConfig requestConfig = RequestConfig.custom()
+                .setConnectTimeout(clientConfig.getManagerConnTimeoutMs())
+                
.setSocketTimeout(clientConfig.getManagerSocketTimeoutMs()).build();
+        SSLContext sslContext = SSLContexts.custom().build();
+        SSLConnectionSocketFactory sslSf = new 
SSLConnectionSocketFactory(sslContext,
+                new String[]{clientConfig.getTlsVersion()}, null,
+                SSLConnectionSocketFactory.getDefaultHostnameVerifier());
+        httpClient = 
HttpClients.custom().setDefaultHeaders(headers).setDefaultRequestConfig(requestConfig)
+                .setSSLSocketFactory(sslSf).build();
+        return httpClient;
+    }
 
-        return getProxyConfigEntry(proxyCluster);
+    private void storeAndBuildMetaConfigure(ProxyClientConfig config) {
+        this.clientConfig = config;
+        StringBuilder strBuff = new StringBuilder(512);
+        this.proxyConfigVisitUrl = strBuff
+                .append(clientConfig.isVisitManagerByHttp() ? 
ConfigConstants.HTTP : ConfigConstants.HTTPS)
+                
.append(clientConfig.getManagerIP()).append(":").append(clientConfig.getManagerPort())
+                
.append(ConfigConstants.MANAGER_DATAPROXY_API).append(clientConfig.getInlongGroupId())
+                .toString();
+        strBuff.delete(0, strBuff.length());
+        this.localProxyConfigStoreFile = strBuff
+                .append(clientConfig.getConfigStoreBasePath())
+                .append(ConfigConstants.META_STORE_SUB_DIR)
+                .append(clientConfig.getInlongGroupId())
+                .append(ConfigConstants.LOCAL_DP_CONFIG_FILE_SUFFIX)
+                .toString();
+        strBuff.delete(0, strBuff.length());
+        this.proxyConfigCacheFile = strBuff
+                .append(clientConfig.getConfigStoreBasePath())
+                .append(ConfigConstants.META_STORE_SUB_DIR)
+                .append(clientConfig.getInlongGroupId())
+                .append(ConfigConstants.REMOTE_DP_CACHE_FILE_SUFFIX)
+                .toString();
+        strBuff.delete(0, strBuff.length());
+        this.encryptConfigVisitUrl = clientConfig.getRsaPubKeyUrl();
+        this.encryptConfigCacheFile = strBuff
+                .append(clientConfig.getConfigStoreBasePath())
+                .append(ConfigConstants.META_STORE_SUB_DIR)
+                .append(clientConfig.getAuthSecretId())
+                .append(ConfigConstants.REMOTE_ENCRYPT_CACHE_FILE_SUFFIX)
+                .toString();
+        strBuff.delete(0, strBuff.length());
     }
 
-    private Map<String, Integer> getStreamIdMap(JsonObject localProxyAddrJson) 
{
-        Map<String, Integer> streamIdMap = new HashMap<String, Integer>();
-        if (localProxyAddrJson.has("tsn")) {
-            JsonArray jsonStreamId = localProxyAddrJson.getAsJsonArray("tsn");
-            for (int i = 0; i < jsonStreamId.size(); i++) {
-                JsonObject jsonItem = jsonStreamId.get(i).getAsJsonObject();
-                if (jsonItem != null && jsonItem.has("streamId") && 
jsonItem.has("sn")) {
-                    streamIdMap.put(jsonItem.get("streamId").getAsString(), 
jsonItem.get("sn").getAsInt());
-                }
-            }
-        }
-        return streamIdMap;
+    private void addAuthorizationInfo(HttpPost httpPost) {
+        httpPost.addHeader(BasicAuth.BASIC_AUTH_HEADER,
+                
BasicAuth.genBasicAuthCredential(clientConfig.getAuthSecretId(),
+                        clientConfig.getAuthSecretKey()));
     }
 
-    public ProxyConfigEntry requestProxyList(String url) {
-        ArrayList<BasicNameValuePair> params = new 
ArrayList<BasicNameValuePair>();
+    private List<BasicNameValuePair> buildProxyNodeQueryParams() {
+        ArrayList<BasicNameValuePair> params = new ArrayList<>();
         params.add(new BasicNameValuePair("ip", IpUtils.getLocalIp()));
         params.add(new BasicNameValuePair("protocolType", 
clientConfig.getProtocolType()));
-        logger.info("Begin to get configure from manager {}, param is {}", 
url, params);
-
-        String resultStr = requestConfiguration(url, params);
-        ProxyClusterConfig clusterConfig = gson.fromJson(resultStr, 
ProxyClusterConfig.class);
-        if (clusterConfig == null || !clusterConfig.isSuccess() || 
clusterConfig.getData() == null) {
-            return null;
-        }
+        return params;
+    }
 
-        DataProxyNodeResponse proxyCluster = clusterConfig.getData();
-        return getProxyConfigEntry(proxyCluster);
+    private List<BasicNameValuePair> buildPubKeyQueryParams() {
+        List<BasicNameValuePair> params = new ArrayList<>();
+        params.add(new BasicNameValuePair("operation", "query"));
+        params.add(new BasicNameValuePair("username", 
clientConfig.getAuthSecretId()));
+        return params;
     }
 
-    private ProxyConfigEntry getProxyConfigEntry(DataProxyNodeResponse 
proxyCluster) {
+    private Tuple2<ProxyConfigEntry, String> getProxyConfigEntry(String 
strRet) {
+        DataProxyNodeResponse proxyCluster;
+        try {
+            proxyCluster = gson.fromJson(strRet, DataProxyNodeResponse.class);
+        } catch (Throwable ex) {
+            if (parseCounter.shouldPrint()) {
+                logger.warn("ConfigManager({}) parse exception, groupId={}, 
config={}",
+                        this.callerId, clientConfig.getInlongGroupId(), 
strRet, ex);
+            }
+            return new Tuple2<>(null, "parse failure:" + ex.getMessage());
+        }
+        // parse nodeList
         List<DataProxyNodeInfo> nodeList = proxyCluster.getNodeList();
         if (CollectionUtils.isEmpty(nodeList)) {
-            logger.error("dataproxy nodeList is empty in 
DataProxyNodeResponse!");
-            return null;
+            return new Tuple2<>(null, "nodeList is empty!");
         }
-        Map<String, HostInfo> hostMap = formatHostInfoMap(nodeList);
-        if (MapUtils.isEmpty(hostMap)) {
-            return null;
+        HostInfo tmpHostInfo;
+        Map<String, HostInfo> hostMap = new HashMap<>();
+        for (DataProxyNodeInfo proxy : nodeList) {
+            if (ObjectUtils.isEmpty(proxy.getId())
+                    || StringUtils.isEmpty(proxy.getIp())
+                    || ObjectUtils.isEmpty(proxy.getPort())
+                    || proxy.getPort() < 0) {
+                if (exptCounter.shouldPrint()) {
+                    logger.warn("Invalid proxy node: groupId={}, id={}, ip={}, 
port={}",
+                            clientConfig.getInlongGroupId(), proxy.getId(), 
proxy.getIp(), proxy.getPort());
+                }
+                continue;
+            }
+            tmpHostInfo = new HostInfo(proxy.getIp(), proxy.getPort());
+            hostMap.put(tmpHostInfo.getReferenceName(), tmpHostInfo);
         }
-
+        if (hostMap.isEmpty()) {
+            return new Tuple2<>(null, "no valid nodeList records!");
+        }
+        // parse clusterId
         int clusterId = -1;
         if (ObjectUtils.isNotEmpty(proxyCluster.getClusterId())) {
             clusterId = proxyCluster.getClusterId();
         }
+        // parse load
         int load = ConfigConstants.LOAD_THRESHOLD;
         if (ObjectUtils.isNotEmpty(proxyCluster.getLoad())) {
             load = proxyCluster.getLoad() > 200 ? 200 : 
(Math.max(proxyCluster.getLoad(), 0));
         }
+        // parse isIntranet
         boolean isIntranet = true;
-        if (ObjectUtils.isNotEmpty(proxyCluster.getIsSwitch())) {
-            isIntranet = proxyCluster.getIsIntranet() == 1 ? true : false;
+        if (ObjectUtils.isNotEmpty(proxyCluster.getIsIntranet())) {
+            isIntranet = proxyCluster.getIsIntranet() == 1;
         }
+        // parse isSwitch
         int isSwitch = 0;
         if (ObjectUtils.isNotEmpty(proxyCluster.getIsSwitch())) {
             isSwitch = proxyCluster.getIsSwitch();
         }
+        // build ProxyConfigEntry
         ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
         proxyEntry.setClusterId(clusterId);
         proxyEntry.setGroupId(clientConfig.getInlongGroupId());
@@ -635,114 +864,8 @@ public class ProxyConfigManager extends Thread {
         proxyEntry.setHostMap(hostMap);
         proxyEntry.setSwitchStat(isSwitch);
         proxyEntry.setLoad(load);
-        proxyEntry.setSize(nodeList.size());
         proxyEntry.setMaxPacketLength(
                 proxyCluster.getMaxPacketLength() != null ? 
proxyCluster.getMaxPacketLength() : -1);
-        return proxyEntry;
-    }
-
-    private Map<String, HostInfo> formatHostInfoMap(List<DataProxyNodeInfo> 
nodeList) {
-        HostInfo tmpHostInfo;
-        Map<String, HostInfo> hostMap = new HashMap<>();
-        for (DataProxyNodeInfo proxy : nodeList) {
-            if (ObjectUtils.isEmpty(proxy.getId()) || 
StringUtils.isEmpty(proxy.getIp()) || ObjectUtils
-                    .isEmpty(proxy.getPort()) || proxy.getPort() < 0) {
-                logger.error("invalid proxy node, id:{}, ip:{}, port:{}", 
proxy.getId(), proxy.getIp(),
-                        proxy.getPort());
-                continue;
-            }
-            tmpHostInfo = new HostInfo(proxy.getIp(), proxy.getPort());
-            hostMap.put(tmpHostInfo.getReferenceName(), tmpHostInfo);
-
-        }
-        if (hostMap.isEmpty()) {
-            logger.error("Parse proxyList failure: address is empty for 
response from manager!");
-            return null;
-        }
-        return hostMap;
-    }
-
-    /* Request new configurations from Manager. */
-    private String requestConfiguration(String url, List<BasicNameValuePair> 
params) {
-        if (StringUtils.isBlank(url)) {
-            logger.error("request url is null");
-            return null;
-        }
-        HttpPost httpPost = null;
-        HttpParams myParams = new BasicHttpParams();
-        HttpConnectionParams.setConnectionTimeout(myParams, 10000);
-        HttpConnectionParams.setSoTimeout(myParams, 
clientConfig.getManagerSocketTimeout());
-        CloseableHttpClient httpClient;
-        if (this.clientConfig.isRequestByHttp()) {
-            httpClient = new DefaultHttpClient(myParams);
-        } else {
-            try {
-                httpClient = getCloseableHttpClient(params);
-            } catch (Throwable eHttps) {
-                logger.error("Create Https cliet failure, error 1 is ", 
eHttps);
-                eHttps.printStackTrace();
-                return null;
-            }
-        }
-        logger.info("Request url : {}, params : {}", url, params);
-        try {
-            httpPost = new HttpPost(url);
-            httpPost.addHeader(BasicAuth.BASIC_AUTH_HEADER,
-                    
BasicAuth.genBasicAuthCredential(clientConfig.getAuthSecretId(),
-                            clientConfig.getAuthSecretKey()));
-            UrlEncodedFormEntity urlEncodedFormEntity = new 
UrlEncodedFormEntity(params, "UTF-8");
-            httpPost.setEntity(urlEncodedFormEntity);
-            HttpResponse response = httpClient.execute(httpPost);
-            String returnStr = EntityUtils.toString(response.getEntity());
-            if (StringUtils.isNotBlank(returnStr)
-                    && response.getStatusLine().getStatusCode() == 
HttpStatus.SC_OK) {
-                logger.info("Get configure from manager is {}", returnStr);
-                return returnStr;
-            }
-            return null;
-        } catch (Throwable e) {
-            logger.error("Connect Manager error, message: {}, url is {}", 
e.getMessage(), url);
-            return null;
-        } finally {
-            if (httpPost != null) {
-                httpPost.releaseConnection();
-            }
-            if (httpClient != null) {
-                httpClient.getConnectionManager().shutdown();
-            }
-        }
-    }
-
-    private StringEntity getEntity(List<BasicNameValuePair> params) throws 
UnsupportedEncodingException {
-        JsonObject jsonObject = new JsonObject();
-        for (BasicNameValuePair pair : params) {
-            jsonObject.addProperty(pair.getName(), pair.getValue());
-        }
-        StringEntity se = new StringEntity(jsonObject.toString());
-        se.setContentType(APPLICATION_JSON);
-        return se;
-    }
-
-    private CloseableHttpClient 
getCloseableHttpClient(List<BasicNameValuePair> params)
-            throws NoSuchAlgorithmException, KeyManagementException {
-        CloseableHttpClient httpClient;
-        ArrayList<Header> headers = new ArrayList<Header>();
-        for (BasicNameValuePair paramItem : params) {
-            headers.add(new BasicHeader(paramItem.getName(), 
paramItem.getValue()));
-        }
-        RequestConfig requestConfig = 
RequestConfig.custom().setConnectTimeout(10000)
-                
.setSocketTimeout(clientConfig.getManagerSocketTimeout()).build();
-        SSLContext sslContext = SSLContexts.custom().build();
-        SSLConnectionSocketFactory sslsf = new 
SSLConnectionSocketFactory(sslContext,
-                new String[]{clientConfig.getTlsVersion()}, null,
-                SSLConnectionSocketFactory.getDefaultHostnameVerifier());
-        httpClient = 
HttpClients.custom().setDefaultHeaders(headers).setDefaultRequestConfig(requestConfig)
-                .setSSLSocketFactory(sslsf).build();
-        return httpClient;
-    }
-
-    public void updateHashRing(List<HostInfo> newHosts) {
-        this.hashRing.updateNode(newHosts);
-        logger.debug("update hash ring {}", 
hashRing.getVirtualNode2RealNode());
+        return new Tuple2<>(proxyEntry, "ok");
     }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
index 3999390f9b..2ba1938409 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java
@@ -54,9 +54,8 @@ public class HttpClientExample {
             proxyConfig = new ProxyClientConfig(localIP, requestByHttp, 
inLongManagerAddr,
                     Integer.valueOf(inLongManagerPort),
                     inlongGroupId, "admin", "inlong");// user and password of 
manager
-            proxyConfig.setInlongGroupId(inlongGroupId);
-            proxyConfig.setConfStoreBasePath(configBasePath);
-            proxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
+            proxyConfig.setConfigStoreBasePath(configBasePath);
+            proxyConfig.setOnlyUseLocalProxyConfig(isReadProxyIPFromLocal);
             proxyConfig.setDiscardOldMessage(true);
             proxyConfig.setProtocolType(ProtocolType.HTTP);
             sender = new HttpProxySender(proxyConfig);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
index eda90bdbca..85012af172 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
@@ -70,9 +70,9 @@ public class TcpClientExample {
             dataProxyConfig = new ProxyClientConfig(localIP, requestByHttp, 
inLongManagerAddr,
                     Integer.valueOf(inLongManagerPort), inlongGroupId, 
"admin", "inlong");
             if (StringUtils.isNotEmpty(configBasePath)) {
-                dataProxyConfig.setConfStoreBasePath(configBasePath);
+                dataProxyConfig.setConfigStoreBasePath(configBasePath);
             }
-            dataProxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
+            dataProxyConfig.setOnlyUseLocalProxyConfig(isReadProxyIPFromLocal);
             dataProxyConfig.setProtocolType(ProtocolType.TCP);
             dataProxyConfig.setRequestTimeoutMs(20000L);
             messageSender = 
DefaultMessageSender.generateSenderByClusterId(dataProxyConfig);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
index 45c95e042d..55a26918a0 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
@@ -27,6 +27,7 @@ import 
org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
 import org.apache.inlong.sdk.dataproxy.utils.ConsistencyHashUtil;
 import org.apache.inlong.sdk.dataproxy.utils.EventLoopUtil;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
@@ -84,7 +85,7 @@ public class ClientMgr {
     private int aliveConnections;
     private int realSize;
     private SendHBThread sendHBThread;
-    private ProxyConfigManager ipManager;
+    private ProxyConfigManager configManager;
     private int groupIdNum = 0;
     private String groupId = "";
     private Map<String, Integer> streamIdMap = new HashMap<String, Integer>();
@@ -115,10 +116,9 @@ public class ClientMgr {
         bootstrap.option(ChannelOption.SO_SNDBUF, 
ConfigConstants.DEFAULT_SEND_BUFFER_SIZE);
         bootstrap.handler(new ClientPipelineFactory(this, sender));
         /* ready to Start the thread which refreshes the proxy list. */
-        ipManager = new ProxyConfigManager(configure, this);
-        ipManager.setName("proxyConfigManager");
+        configManager = new ProxyConfigManager(sender.getInstanceId(), 
configure, this);
+        configManager.setName("proxyConfigManager");
         if (configure.getInlongGroupId() != null) {
-            ipManager.setInlongGroupId(configure.getInlongGroupId());
             groupId = configure.getInlongGroupId();
         }
 
@@ -131,13 +131,13 @@ public class ClientMgr {
         this.loadBalance = configure.getLoadBalance();
 
         try {
-            ipManager.doProxyEntryQueryWork();
+            configManager.doProxyEntryQueryWork();
         } catch (IOException e) {
             e.printStackTrace();
             logger.info(e.getMessage());
         }
-        ipManager.setDaemon(true);
-        ipManager.start();
+        configManager.setDaemon(true);
+        configManager.start();
 
         this.sendHBThread = new SendHBThread();
         this.sendHBThread.setName("SendHBThread");
@@ -181,7 +181,13 @@ public class ClientMgr {
     }
 
     public EncryptConfigEntry getEncryptConfigEntry() {
-        return this.ipManager.getEncryptConfigEntry(configure.getUserName());
+        Tuple2<EncryptConfigEntry, String> result;
+        try {
+            result = configManager.getEncryptConfigure(false);
+            return result.getF0();
+        } catch (Throwable ex) {
+            return null;
+        }
     }
 
     public List<HostInfo> getProxyInfoList() {
@@ -273,7 +279,12 @@ public class ClientMgr {
     }
 
     public ProxyConfigEntry getGroupIdConfigureInfo() throws Exception {
-        return ipManager.getGroupIdConfigure();
+        Tuple2<ProxyConfigEntry, String> result =
+                configManager.getGroupIdConfigure(true);
+        if (result.getF0() == null) {
+            throw new Exception(result.getF1());
+        }
+        return result.getF0();
     }
 
     /**
@@ -531,7 +542,7 @@ public class ClientMgr {
     public void shutDown() {
         bootstrap.config().group().shutdownGracefully();
 
-        ipManager.shutDown();
+        configManager.shutDown();
 
         // connectionCheckThread.shutDown();
         sendHBThread.shutDown();
@@ -851,9 +862,9 @@ public class ClientMgr {
                 
Collections.singletonList(hbMsg.getBytes(StandardCharsets.UTF_8)),
                 8, false, false, false, System.currentTimeMillis() / 1000, 1, 
"", "", "");
         try {
-            if (configure.isNeedAuthentication()) {
-                encodeObject.setAuth(configure.isNeedAuthentication(),
-                        configure.getUserName(), configure.getSecretKey());
+            if (configure.isEnableAuthentication()) {
+                encodeObject.setAuth(configure.isEnableAuthentication(),
+                        configure.getAuthSecretId(), 
configure.getAuthSecretKey());
             }
             client.write(encodeObject);
         } catch (Throwable e) {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
index 5d31f05f8a..1cf9939ff9 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
@@ -26,6 +26,7 @@ import 
org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
 import org.apache.inlong.sdk.dataproxy.http.InternalHttpSender;
 import org.apache.inlong.sdk.dataproxy.utils.ConcurrentHashSet;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,15 +76,14 @@ public class HttpProxySender extends Thread {
     private void initTDMClientAndRequest(ProxyClientConfig configure) throws 
Exception {
 
         try {
-            proxyConfigManager = new ProxyConfigManager(configure, null);
-            proxyConfigManager.setInlongGroupId(configure.getInlongGroupId());
+            proxyConfigManager = new ProxyConfigManager(configure);
             ProxyConfigEntry proxyConfigEntry = retryGettingProxyConfig();
             hostList.addAll(proxyConfigEntry.getHostMap().values());
 
             this.setDaemon(true);
             this.start();
         } catch (Throwable e) {
-            if (configure.isReadProxyIPFromLocal()) {
+            if (configure.isOnlyUseLocalProxyConfig()) {
                 throw new Exception("Get local proxy configure failure! e = 
{}", e);
             } else {
                 throw new Exception("Visit TDManager error! e = {}", e);
@@ -98,7 +98,9 @@ public class HttpProxySender extends Thread {
      * @return proxy config entry.
      */
     private ProxyConfigEntry retryGettingProxyConfig() throws Exception {
-        return proxyConfigManager.getGroupIdConfigure();
+        Tuple2<ProxyConfigEntry, String> result =
+                proxyConfigManager.getGroupIdConfigure(true);
+        return result.getF0();
     }
 
     /**
@@ -112,9 +114,13 @@ public class HttpProxySender extends Thread {
                 int randSleepTime = 
proxyClientConfig.getProxyHttpUpdateIntervalMinutes() * 60 + rand;
                 TimeUnit.MILLISECONDS.sleep(randSleepTime * 1000);
                 if (proxyConfigManager != null) {
-                    ProxyConfigEntry proxyConfigEntry = 
proxyConfigManager.getGroupIdConfigure();
-                    hostList.addAll(proxyConfigEntry.getHostMap().values());
-                    hostList.retainAll(proxyConfigEntry.getHostMap().values());
+                    Tuple2<ProxyConfigEntry, String> result =
+                            proxyConfigManager.getGroupIdConfigure(false);
+                    if (result.getF0() == null) {
+                        throw new Exception(result.getF1());
+                    }
+                    hostList.addAll(result.getF0().getHostMap().values());
+                    hostList.retainAll(result.getF0().getHostMap().values());
                 } else {
                     logger.error("manager is null, please check it!");
                 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 9f001205e3..e4f820e3ed 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -43,12 +43,13 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
 public class Sender {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
-
+    private static final AtomicLong senderIdGen = new AtomicLong(0L);
     /* Store the callback used by asynchronously message sending. */
     private final ConcurrentHashMap<Channel, ConcurrentHashMap<String, 
QueueObject>> callbacks =
             new ConcurrentHashMap<>();
@@ -61,6 +62,7 @@ public class Sender {
     private final AtomicInteger currentBufferSize = new AtomicInteger(0);
     private final TimeoutScanThread scanThread;
     private final ClientMgr clientMgr;
+    private final String instanceId;
     private final ProxyClientConfig configure;
     private MetricWorkerThread metricWorker = null;
     private int clusterId = -1;
@@ -74,6 +76,7 @@ public class Sender {
      */
     public Sender(ProxyClientConfig configure, ThreadFactory 
selfDefineFactory) throws Exception {
         this.configure = configure;
+        this.instanceId = "sender-" + senderIdGen.incrementAndGet();
         this.asyncCallbackMaxSize = configure.getTotalAsyncCallbackSize();
         this.threadPool = Executors.newCachedThreadPool();
         this.clientMgr = new ClientMgr(configure, this, selfDefineFactory);
@@ -82,14 +85,14 @@ public class Sender {
             proxyConfigEntry = this.clientMgr.getGroupIdConfigureInfo();
             setClusterId(proxyConfigEntry.getClusterId());
         } catch (Throwable e) {
-            if (configure.isReadProxyIPFromLocal()) {
+            if (configure.isOnlyUseLocalProxyConfig()) {
                 throw new Exception("Get local proxy configure failure!", 
e.getCause());
             } else {
                 throw new Exception("Visit manager error!", e.getCause());
             }
         }
         if (!proxyConfigEntry.isInterVisit()) {
-            if (!configure.isNeedAuthentication()) {
+            if (!configure.isEnableAuthentication()) {
                 throw new Exception("In OutNetwork isNeedAuthentication must 
be true!");
             }
             if (!configure.isNeedDataEncry()) {
@@ -200,7 +203,8 @@ public class Sender {
             }
         }
         if (this.configure.isNeedDataEncry()) {
-            encodeObject.setEncryptEntry(true, configure.getUserName(), 
clientMgr.getEncryptConfigEntry());
+            encodeObject.setEncryptEntry(true,
+                    configure.getAuthSecretId(), 
clientMgr.getEncryptConfigEntry());
         } else {
             encodeObject.setEncryptEntry(false, null, null);
         }
@@ -371,7 +375,8 @@ public class Sender {
             }
         }
         if (this.configure.isNeedDataEncry()) {
-            encodeObject.setEncryptEntry(true, configure.getUserName(), 
clientMgr.getEncryptConfigEntry());
+            encodeObject.setEncryptEntry(true,
+                    configure.getAuthSecretId(), 
clientMgr.getEncryptConfigEntry());
         } else {
             encodeObject.setEncryptEntry(false, null, null);
         }
@@ -498,6 +503,10 @@ public class Sender {
         this.clusterId = clusterId;
     }
 
+    public String getInstanceId() {
+        return instanceId;
+    }
+
     /**
      * check whether clientChannel is idle; if idle, need send hb to keep alive
      *
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index 371dcf661c..b7bd42ab2b 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -150,11 +150,11 @@ public class ProxyUtils {
      * @param clientConfig
      */
     public static void validClientConfig(ProxyClientConfig clientConfig) {
-        if (clientConfig.isNeedAuthentication()) {
-            if (StringUtils.isBlank(clientConfig.getUserName())) {
-                throw new IllegalArgumentException("Authentication require 
userName not Blank!");
+        if (clientConfig.isEnableAuthentication()) {
+            if (StringUtils.isBlank(clientConfig.getAuthSecretId())) {
+                throw new IllegalArgumentException("Authentication require 
secretId not Blank!");
             }
-            if (StringUtils.isBlank(clientConfig.getSecretKey())) {
+            if (StringUtils.isBlank(clientConfig.getAuthSecretKey())) {
                 throw new IllegalArgumentException("Authentication require 
secretKey not Blank!");
             }
         }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/Tuple2.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/Tuple2.java
new file mode 100644
index 0000000000..e5ba8c2bc2
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/Tuple2.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.utils;
+
+public class Tuple2<T0, T1> {
+
+    /** Field 0 of the tuple. */
+    private T0 f0 = null;
+    /** Field 1 of the tuple. */
+    private T1 f1 = null;
+
+    /**
+     * Creates a new tuple where all fields are null.
+     */
+    public Tuple2() {
+
+    }
+
+    /**
+     * Creates a new tuple with field 0 specified.
+     *
+     * @param value0 The value for field 0
+     */
+    public Tuple2(T0 value0) {
+        this.f0 = value0;
+    }
+
+    /**
+     * Creates a new tuple and assigns the given values to the tuple's fields.
+     *
+     * @param value0 The value for field 0
+     * @param value1 The value for field 1
+     */
+    public Tuple2(T0 value0, T1 value1) {
+        setF0AndF1(value0, value1);
+    }
+
+    public T0 getF0() {
+        return f0;
+    }
+
+    public T1 getF1() {
+        return f1;
+    }
+
+    /**
+     * Set all field values
+     *
+     * @param value0 The value for field 0
+     * @param value1 The value for field 1
+     */
+    public void setF0AndF1(T0 value0, T1 value1) {
+        this.f0 = value0;
+        this.f1 = value1;
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
index 5f3ba92c75..40fec3b57d 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sdk.dataproxy;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
 import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -36,14 +37,18 @@ public class ProxyConfigManagerTest {
             .toString();
     private final ProxyClientConfig clientConfig = 
PowerMockito.mock(ProxyClientConfig.class);
     private final ClientMgr clientMgr = PowerMockito.mock(ClientMgr.class);
-    private final ProxyConfigManager proxyConfigManager = new 
ProxyConfigManager(clientConfig, clientMgr);
+    private final ProxyConfigManager proxyConfigManager;
 
     public ProxyConfigManagerTest() throws URISyntaxException {
+        clientConfig.setConfigStoreBasePath(localFile);
+        proxyConfigManager =
+                new ProxyConfigManager("test", clientConfig, clientMgr);
     }
 
     @Test
     public void testProxyConfigParse() throws Exception {
-        ProxyConfigEntry proxyEntry = 
proxyConfigManager.getLocalProxyListFromFile(localFile);
+        Tuple2<ProxyConfigEntry, String> result = 
proxyConfigManager.getLocalProxyListFromFile(localFile);
+        ProxyConfigEntry proxyEntry = result.getF0();
         Assert.assertEquals(proxyEntry.isInterVisit(), false);
         Assert.assertEquals(proxyEntry.getLoad(), 12);
         Assert.assertEquals(proxyEntry.getClusterId(), 1);
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
index 74cfcffa21..1965ef37e3 100644
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
@@ -62,7 +62,7 @@ public class InlongSdkDirtySender {
         ProxyClientConfig proxyClientConfig =
                 new 
ProxyClientConfig(InetAddress.getLocalHost().getHostAddress(), true,
                         inlongManagerAddr, inlongManagerPort, inlongGroupId, 
authId, authKey);
-        proxyClientConfig.setReadProxyIPFromLocal(false);
+        proxyClientConfig.setOnlyUseLocalProxyConfig(false);
         proxyClientConfig.setAsyncCallbackSize(maxCallbackSize);
         this.sender = 
DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
         this.sender.setMsgtype(7);

Reply via email to