This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c9dd7df9d [INLONG-11599][SDK] Optimize the configuration related 
content in the ProxyClientConfig class (#11600)
1c9dd7df9d is described below

commit 1c9dd7df9db2e5679457c9cee6acca7171f0bf45
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Mon Dec 16 10:06:18 2024 +0800

    [INLONG-11599][SDK] Optimize the configuration related content in the 
ProxyClientConfig class (#11600)
    
    
    
    Co-authored-by: gosonzhang <gosonzh...@tencent.com>
---
 .../plugin/sinks/filecollect/SenderManager.java    |   7 +-
 .../sinks/filecollect/TestSenderManager.java       |   1 -
 .../inlong/sdk/dataproxy/ConfigConstants.java      |   7 +-
 .../inlong/sdk/dataproxy/DefaultMessageSender.java | 117 +++-----
 .../apache/inlong/sdk/dataproxy/MessageSender.java |   3 +-
 .../inlong/sdk/dataproxy/ProxyClientConfig.java    |  63 ++++-
 .../sdk/dataproxy/config/ProxyConfigEntry.java     |   1 +
 .../sdk/dataproxy/config/ProxyConfigManager.java   | 301 +++++++++++++++------
 .../inlong/sdk/dataproxy/network/ClientMgr.java    |   2 +-
 .../inlong/sdk/dataproxy/network/QueueObject.java  |   5 +-
 .../inlong/sdk/dataproxy/network/Sender.java       |  39 +--
 .../sdk/dataproxy/network/SyncMessageCallable.java |  11 +-
 .../sdk/dataproxy/threads/MetricWorkerThread.java  |   3 +-
 .../sdk/dataproxy/threads/TimeoutScanThread.java   |  10 +-
 14 files changed, 356 insertions(+), 214 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index ec4502a7fb..9ac9083ad8 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -203,6 +203,7 @@ public class SenderManager {
                 authSecretKey);
         proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
         proxyClientConfig.setAliveConnections(aliveConnectionNum);
+        proxyClientConfig.setRequestTimeoutMs(maxSenderTimeout * 1000L);
 
         proxyClientConfig.setIoThreadNum(ioThreadNum);
         proxyClientConfig.setEnableBusyWait(enableBusyWait);
@@ -242,7 +243,7 @@ public class SenderManager {
                         message.getTotalSize(), auditVersion);
                 asyncSendByMessageSender(cb, message.getDataList(), 
message.getGroupId(),
                         message.getStreamId(), message.getDataTime(), 
SEQUENTIAL_ID.getNextUuid(),
-                        maxSenderTimeout, TimeUnit.SECONDS, 
message.getExtraMap(), proxySend);
+                        message.getExtraMap(), proxySend);
                 getMetricItem(message.getGroupId(), 
message.getStreamId()).pluginSendCount.addAndGet(
                         message.getMsgCnt());
                 suc = true;
@@ -270,11 +271,9 @@ public class SenderManager {
 
     private void asyncSendByMessageSender(SendMessageCallback cb,
             List<byte[]> bodyList, String groupId, String streamId, long 
dataTime, String msgUUID,
-            long timeout, TimeUnit timeUnit,
             Map<String, String> extraAttrMap, boolean isProxySend) throws 
ProxysdkException {
         sender.asyncSendMessage(cb, bodyList, groupId,
-                streamId, dataTime, msgUUID,
-                timeout, timeUnit, extraAttrMap, isProxySend);
+                streamId, dataTime, msgUUID, extraAttrMap, isProxySend);
     }
 
     /**
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index 9655e757ef..508e21588f 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -96,7 +96,6 @@ public class TestSenderManager {
                 return null;
             }).when(senderManager, "asyncSendByMessageSender", Mockito.any(),
                     Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.anyLong(), Mockito.any(),
-                    Mockito.anyLong(), Mockito.any(),
                     Mockito.any(), Mockito.anyBoolean());
             senderManager.Start();
             Long offset = 0L;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
index 7adc5087af..0216b77c2c 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
@@ -32,7 +32,8 @@ public class ConfigConstants {
     public static final String REMOTE_ENCRYPT_CACHE_FILE_SUFFIX = ".pubKey";
     // authorization key
     public static final String BASIC_AUTH_HEADER = "authorization";
-
+    // default region name
+    public static final String VAL_DEF_REGION_NAME = "";
     // config info sync interval in minutes
     public static final int VAL_DEF_CONFIG_SYNC_INTERVAL_MIN = 3;
     public static final int VAL_MIN_CONFIG_SYNC_INTERVAL_MIN = 1;
@@ -43,6 +44,10 @@ public class ConfigConstants {
     public static final int VAL_MAX_RETRY_IF_CONFIG_SYNC_FAIL = 5;
     // cache config expired time in ms
     public static final long VAL_DEF_CACHE_CONFIG_EXPIRED_MS = 20 * 60 * 1000L;
+    // cache config fail status expired time in ms
+    public static final long VAL_DEF_CONFIG_FAIL_STATUS_EXPIRED_MS = 1000L;
+    public static final long VAL_MAX_CONFIG_FAIL_STATUS_EXPIRED_MS = 3 * 60 * 
1000L;
+
     // node force choose interval in ms
     public static final long VAL_DEF_FORCE_CHOOSE_INR_MS = 10 * 60 * 1000L;
     public static final long VAL_MIN_FORCE_CHOOSE_INR_MS = 30 * 1000L;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 61a68e0d9c..02157fe95a 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -40,15 +40,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
 public class DefaultMessageSender implements MessageSender {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultMessageSender.class);
-    private static final long DEFAULT_SEND_TIMEOUT = 100;
-    private static final TimeUnit DEFAULT_SEND_TIMEUNIT = 
TimeUnit.MILLISECONDS;
     private static final ConcurrentHashMap<Integer, DefaultMessageSender> 
CACHE_SENDER =
             new ConcurrentHashMap<>();
     private static final AtomicBoolean MANAGER_FETCHER_THREAD_STARTED = new 
AtomicBoolean(false);
@@ -137,6 +134,20 @@ public class DefaultMessageSender implements MessageSender 
{
         CACHE_SENDER.clear();
     }
 
+    @Override
+    public void close() {
+        LOGGER.info("ready to close resources, may need five minutes !");
+        if (sender.getClusterId() != -1) {
+            CACHE_SENDER.remove(sender.getClusterId());
+        }
+        sender.close();
+        shutdownInternalThreads();
+    }
+
+    public ProxyClientConfig getProxyClientConfig() {
+        return sender.getConfigure();
+    }
+
     public boolean isSupportLF() {
         return isSupportLF;
     }
@@ -433,25 +444,25 @@ public class DefaultMessageSender implements 
MessageSender {
     @Override
     public void asyncSendMessage(SendMessageCallback callback, byte[] body, 
String groupId, String streamId, long dt,
             String msgUUID, Map<String, String> extraAttrMap) throws 
ProxysdkException {
-
+        asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, 
extraAttrMap, false);
     }
 
     @Override
     public void asyncSendMessage(SendMessageCallback callback, byte[] body, 
String groupId, String streamId, long dt,
             String msgUUID) throws ProxysdkException {
-
+        asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, 
false);
     }
 
     @Override
     public void asyncSendMessage(SendMessageCallback callback, List<byte[]> 
bodyList, String groupId, String streamId,
             long dt, String msgUUID) throws ProxysdkException {
-
+        asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, 
false);
     }
 
     @Override
     public void asyncSendMessage(SendMessageCallback callback, List<byte[]> 
bodyList, String groupId, String streamId,
             long dt, String msgUUID, Map<String, String> extraAttrMap) throws 
ProxysdkException {
-
+        asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, 
extraAttrMap, false);
     }
 
     /**
@@ -508,11 +519,6 @@ public class DefaultMessageSender implements MessageSender 
{
         return null;
     }
 
-    public void asyncSendMessage(SendMessageCallback callback, byte[] body, 
String groupId, String streamId, long dt,
-            String msgUUID, long timeout, TimeUnit timeUnit) throws 
ProxysdkException {
-        asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, 
timeout, timeUnit, false);
-    }
-
     /**
      * async send single message
      *
@@ -522,13 +528,11 @@ public class DefaultMessageSender implements 
MessageSender {
      * @param streamId streamId
      * @param dt data report timestamp
      * @param msgUUID msg uuid
-     * @param timeout
-     * @param timeUnit
      * @param isProxySend true: dataproxy doesn't return response message 
until data is sent to MQ
      * @throws ProxysdkException
      */
-    public void asyncSendMessage(SendMessageCallback callback, byte[] body, 
String groupId, String streamId, long dt,
-            String msgUUID, long timeout, TimeUnit timeUnit, boolean 
isProxySend) throws ProxysdkException {
+    public void asyncSendMessage(SendMessageCallback callback, byte[] body, 
String groupId,
+            String streamId, long dt, String msgUUID, boolean isProxySend) 
throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             throw new 
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -549,7 +553,7 @@ public class DefaultMessageSender implements MessageSender {
                             isGroupIdTransfer, dt / 1000, 
idGenerator.getNextInt(),
                             groupId, streamId, proxySend);
             encodeObject.setSupportLF(isSupportLF);
-            sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, 
timeUnit);
+            sender.asyncSendMessage(encodeObject, callback, msgUUID);
         } else if (msgtype == 3 || msgtype == 5) {
             if (isCompressEnd) {
                 if (isProxySend) {
@@ -558,23 +562,15 @@ public class DefaultMessageSender implements 
MessageSender {
                 sender.asyncSendMessage(new 
EncodeObject(Collections.singletonList(body), "groupId="
                         + groupId + "&streamId=" + streamId + "&dt=" + dt + 
"&cp=snappy" + proxySend,
                         idGenerator.getNextId(), this.getMsgtype(), true, 
groupId),
-                        callback, msgUUID, timeout, timeUnit);
+                        callback, msgUUID);
             } else {
                 sender.asyncSendMessage(
                         new EncodeObject(Collections.singletonList(body), 
"groupId=" + groupId + "&streamId="
                                 + streamId + "&dt=" + dt + proxySend, 
idGenerator.getNextId(),
                                 this.getMsgtype(), false, groupId),
-                        callback,
-                        msgUUID, timeout, timeUnit);
+                        callback, msgUUID);
             }
         }
-
-    }
-
-    public void asyncSendMessage(SendMessageCallback callback, byte[] body, 
String groupId, String streamId, long dt,
-            String msgUUID, long timeout, TimeUnit timeUnit, Map<String, 
String> extraAttrMap)
-            throws ProxysdkException {
-        asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, 
timeout, timeUnit, extraAttrMap, false);
     }
 
     /**
@@ -586,15 +582,12 @@ public class DefaultMessageSender implements 
MessageSender {
      * @param streamId streamId
      * @param dt data report timestamp
      * @param msgUUID msg uuid
-     * @param timeout
-     * @param timeUnit
      * @param extraAttrMap extra attributes
      * @param isProxySend true: dataproxy doesn't return response message 
until data is sent to MQ
      * @throws ProxysdkException
      */
     public void asyncSendMessage(SendMessageCallback callback, byte[] body, 
String groupId, String streamId, long dt,
-            String msgUUID, long timeout, TimeUnit timeUnit, Map<String, 
String> extraAttrMap, boolean isProxySend)
-            throws ProxysdkException {
+            String msgUUID, Map<String, String> extraAttrMap, boolean 
isProxySend) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || 
!ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             throw new 
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -615,28 +608,23 @@ public class DefaultMessageSender implements 
MessageSender {
                             isReport, isGroupIdTransfer, dt / 1000, 
idGenerator.getNextInt(),
                             groupId, streamId, attrs.toString());
             encodeObject.setSupportLF(isSupportLF);
-            sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, 
timeUnit);
+            sender.asyncSendMessage(encodeObject, callback, msgUUID);
         } else if (msgtype == 3 || msgtype == 5) {
             
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
             if (isCompressEnd) {
                 attrs.append("&cp=snappy");
                 sender.asyncSendMessage(new 
EncodeObject(Collections.singletonList(body), attrs.toString(),
                         idGenerator.getNextId(), this.getMsgtype(), true, 
groupId),
-                        callback, msgUUID, timeout, timeUnit);
+                        callback, msgUUID);
             } else {
                 sender.asyncSendMessage(
                         new EncodeObject(Collections.singletonList(body), 
attrs.toString(), idGenerator.getNextId(),
                                 this.getMsgtype(), false, groupId),
-                        callback, msgUUID, timeout, timeUnit);
+                        callback, msgUUID);
             }
         }
     }
 
-    public void asyncSendMessage(SendMessageCallback callback, List<byte[]> 
bodyList, String groupId, String streamId,
-            long dt, String msgUUID, long timeout, TimeUnit timeUnit) throws 
ProxysdkException {
-        asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, 
timeout, timeUnit, false);
-    }
-
     /**
      * async send a batch of messages
      *
@@ -646,14 +634,11 @@ public class DefaultMessageSender implements 
MessageSender {
      * @param streamId streamId
      * @param dt data report time
      * @param msgUUID msg uuid
-     * @param timeout
-     * @param timeUnit
      * @param isProxySend true: dataproxy doesn't return response message 
until data is sent to MQ
      * @throws ProxysdkException
      */
     public void asyncSendMessage(SendMessageCallback callback, List<byte[]> 
bodyList,
-            String groupId, String streamId, long dt, String msgUUID,
-            long timeout, TimeUnit timeUnit, boolean isProxySend) throws 
ProxysdkException {
+            String groupId, String streamId, long dt, String msgUUID, boolean 
isProxySend) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
             throw new 
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -671,7 +656,7 @@ public class DefaultMessageSender implements MessageSender {
                     isReport, isGroupIdTransfer, dt / 1000, 
idGenerator.getNextInt(),
                     groupId, streamId, proxySend);
             encodeObject.setSupportLF(isSupportLF);
-            sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, 
timeUnit);
+            sender.asyncSendMessage(encodeObject, callback, msgUUID);
         } else if (msgtype == 3 || msgtype == 5) {
             if (isProxySend) {
                 proxySend = "&" + proxySend;
@@ -682,24 +667,18 @@ public class DefaultMessageSender implements 
MessageSender {
                                 + "&dt=" + dt + "&cp=snappy" + "&cnt=" + 
bodyList.size() + proxySend,
                                 idGenerator.getNextId(),
                                 this.getMsgtype(), true, groupId),
-                        callback, msgUUID, timeout, timeUnit);
+                        callback, msgUUID);
             } else {
                 sender.asyncSendMessage(
                         new EncodeObject(bodyList,
                                 "groupId=" + groupId + "&streamId=" + streamId 
+ "&dt=" + dt + "&cnt=" + bodyList.size()
                                         + proxySend,
                                 idGenerator.getNextId(), this.getMsgtype(), 
false, groupId),
-                        callback, msgUUID, timeout, timeUnit);
+                        callback, msgUUID);
             }
         }
     }
 
-    public void asyncSendMessage(SendMessageCallback callback,
-            List<byte[]> bodyList, String groupId, String streamId, long dt, 
String msgUUID,
-            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) 
throws ProxysdkException {
-        asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, 
timeout, timeUnit, extraAttrMap, false);
-    }
-
     /**
      * async send a batch of messages
      *
@@ -709,15 +688,12 @@ public class DefaultMessageSender implements 
MessageSender {
      * @param streamId streamId
      * @param dt data report time
      * @param msgUUID msg uuid
-     * @param timeout
-     * @param timeUnit
      * @param extraAttrMap extra attributes
      * @param isProxySend true: dataproxy doesn't return response message 
until data is sent to MQ
      * @throws ProxysdkException
      */
     public void asyncSendMessage(SendMessageCallback callback,
             List<byte[]> bodyList, String groupId, String streamId, long dt, 
String msgUUID,
-            long timeout, TimeUnit timeUnit,
             Map<String, String> extraAttrMap, boolean isProxySend) throws 
ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || 
!ProxyUtils.isAttrKeysValid(
@@ -739,17 +715,17 @@ public class DefaultMessageSender implements 
MessageSender {
                     isCompress, isReport, isGroupIdTransfer, dt / 1000, 
idGenerator.getNextInt(),
                     groupId, streamId, attrs.toString());
             encodeObject.setSupportLF(isSupportLF);
-            sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, 
timeUnit);
+            sender.asyncSendMessage(encodeObject, callback, msgUUID);
         } else if (msgtype == 3 || msgtype == 5) {
             
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
                     
.append("&dt=").append(dt).append("&cnt=").append(bodyList.size());
             if (isCompress) {
                 attrs.append("&cp=snappy");
                 sender.asyncSendMessage(new EncodeObject(bodyList, 
attrs.toString(), idGenerator.getNextId(),
-                        this.getMsgtype(), true, groupId), callback, msgUUID, 
timeout, timeUnit);
+                        this.getMsgtype(), true, groupId), callback, msgUUID);
             } else {
                 sender.asyncSendMessage(new EncodeObject(bodyList, 
attrs.toString(), idGenerator.getNextId(),
-                        this.getMsgtype(), false, groupId), callback, msgUUID, 
timeout, timeUnit);
+                        this.getMsgtype(), false, groupId), callback, msgUUID);
             }
         }
 
@@ -767,8 +743,8 @@ public class DefaultMessageSender implements MessageSender {
     @Override
     public void asyncSendMessage(String inlongGroupId, String inlongStreamId, 
byte[] body, SendMessageCallback callback)
             throws ProxysdkException {
-        this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, 
System.currentTimeMillis(),
-                idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, 
DEFAULT_SEND_TIMEUNIT);
+        this.asyncSendMessage(callback, body, inlongGroupId,
+                inlongStreamId, System.currentTimeMillis(), 
idGenerator.getNextId());
     }
 
     /**
@@ -783,8 +759,8 @@ public class DefaultMessageSender implements MessageSender {
      */
     public void asyncSendMessage(String inlongGroupId, String inlongStreamId, 
byte[] body, SendMessageCallback callback,
             boolean isProxySend) throws ProxysdkException {
-        this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, 
System.currentTimeMillis(),
-                idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, 
DEFAULT_SEND_TIMEUNIT, isProxySend);
+        this.asyncSendMessage(callback, body, inlongGroupId,
+                inlongStreamId, System.currentTimeMillis(), 
idGenerator.getNextId(), isProxySend);
     }
 
     /**
@@ -799,8 +775,8 @@ public class DefaultMessageSender implements MessageSender {
     @Override
     public void asyncSendMessage(String inlongGroupId, String inlongStreamId, 
List<byte[]> bodyList,
             SendMessageCallback callback) throws ProxysdkException {
-        this.asyncSendMessage(callback, bodyList, inlongGroupId, 
inlongStreamId, System.currentTimeMillis(),
-                idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, 
DEFAULT_SEND_TIMEUNIT);
+        this.asyncSendMessage(callback, bodyList, inlongGroupId,
+                inlongStreamId, System.currentTimeMillis(), 
idGenerator.getNextId());
     }
 
     /**
@@ -815,8 +791,8 @@ public class DefaultMessageSender implements MessageSender {
      */
     public void asyncSendMessage(String inlongGroupId, String inlongStreamId, 
List<byte[]> bodyList,
             SendMessageCallback callback, boolean isProxySend) throws 
ProxysdkException {
-        this.asyncSendMessage(callback, bodyList, inlongGroupId, 
inlongStreamId, System.currentTimeMillis(),
-                idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, 
DEFAULT_SEND_TIMEUNIT, isProxySend);
+        this.asyncSendMessage(callback, bodyList, inlongGroupId,
+                inlongStreamId, System.currentTimeMillis(), 
idGenerator.getNextId(), isProxySend);
     }
 
     private void addIndexCnt(String groupId, String streamId, long cnt) {
@@ -837,13 +813,4 @@ public class DefaultMessageSender implements MessageSender 
{
         indexCol.shutDown();
         MANAGER_FETCHER_THREAD_STARTED.set(false);
     }
-
-    public void close() {
-        LOGGER.info("ready to close resources, may need five minutes !");
-        if (sender.getClusterId() != -1) {
-            CACHE_SENDER.remove(sender.getClusterId());
-        }
-        sender.close();
-        shutdownInternalThreads();
-    }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
index e980e65974..2a4ae6313a 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
@@ -26,6 +26,8 @@ import java.util.Map;
 
 public interface MessageSender {
 
+    void close();
+
     /**
      * This method provides a synchronized  function which you want to send 
data  without packing
      *
@@ -137,5 +139,4 @@ public interface MessageSender {
     void asyncSendMessage(String inlongGroupId, String inlongStreamId, 
List<byte[]> bodyList,
             SendMessageCallback callback) throws ProxysdkException;
 
-    void close();
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index 7af91dba34..6a80fbf21c 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -39,19 +39,24 @@ public class ProxyClientConfig {
     private String configStoreBasePath = System.getProperty("user.dir");
     // max expired time for config cache.
     private long configCacheExpiredMs = 
ConfigConstants.VAL_DEF_CACHE_CONFIG_EXPIRED_MS;
+    // max expired time for config query failure status
+    private long configFailStatusExpiredMs = 
ConfigConstants.VAL_DEF_CONFIG_FAIL_STATUS_EXPIRED_MS;
     // nodes force choose interval ms
     private long forceReChooseInrMs = 
ConfigConstants.VAL_DEF_FORCE_CHOOSE_INR_MS;
     private boolean enableAuthentication = false;
     private String authSecretId = "";
     private String authSecretKey = "";
     private String inlongGroupId;
+    private String regionName = ConfigConstants.VAL_DEF_REGION_NAME;
     private int aliveConnections = ConfigConstants.VAL_DEF_ALIVE_CONNECTIONS;
+    // data encrypt info
+    private boolean enableDataEncrypt = false;
+    private String rsaPubKeyUrl = "";
+    private String userName = "";
 
     private int syncThreadPoolSize;
     private int asyncCallbackSize;
 
-    private boolean isNeedDataEncry = false;
-    private String rsaPubKeyUrl = "";
     private String tlsServerCertFilePathAndName;
     private String tlsServerKey;
     private String tlsVersion = "TLSv1.2";
@@ -240,6 +245,15 @@ public class ProxyClientConfig {
         this.configCacheExpiredMs = configCacheExpiredMs;
     }
 
+    public long getConfigFailStatusExpiredMs() {
+        return configFailStatusExpiredMs;
+    }
+
+    public void setConfigFailStatusExpiredMs(long configFailStatusExpiredMs) {
+        this.configFailStatusExpiredMs =
+                Math.min(configFailStatusExpiredMs, 
ConfigConstants.VAL_MAX_CONFIG_FAIL_STATUS_EXPIRED_MS);
+    }
+
     public long getForceReChooseInrMs() {
         return forceReChooseInrMs;
     }
@@ -253,6 +267,16 @@ public class ProxyClientConfig {
         return inlongGroupId;
     }
 
+    public String getRegionName() {
+        return regionName;
+    }
+
+    public void setRegionName(String regionName) {
+        if (StringUtils.isNotBlank(regionName)) {
+            this.regionName = regionName.trim();
+        }
+    }
+
     public int getAliveConnections() {
         return this.aliveConnections;
     }
@@ -262,6 +286,33 @@ public class ProxyClientConfig {
                 Math.max(ConfigConstants.VAL_MIN_ALIVE_CONNECTIONS, 
aliveConnections);
     }
 
+    public boolean isEnableDataEncrypt() {
+        return enableDataEncrypt;
+    }
+
+    public String getRsaPubKeyUrl() {
+        return rsaPubKeyUrl;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void enableDataEncrypt(boolean needDataEncrypt, String userName, 
String rsaPubKeyUrl) {
+        this.enableDataEncrypt = needDataEncrypt;
+        if (!this.enableDataEncrypt) {
+            return;
+        }
+        if (StringUtils.isBlank(userName)) {
+            throw new IllegalArgumentException("userName is Blank!");
+        }
+        if (StringUtils.isBlank(rsaPubKeyUrl)) {
+            throw new IllegalArgumentException("rsaPubKeyUrl is Blank!");
+        }
+        this.userName = userName.trim();
+        this.rsaPubKeyUrl = rsaPubKeyUrl.trim();
+    }
+
     public String getTlsServerCertFilePathAndName() {
         return tlsServerCertFilePathAndName;
     }
@@ -370,14 +421,6 @@ public class ProxyClientConfig {
         this.maxMsgInFlightPerConn = maxMsgInFlightPerConn;
     }
 
-    public String getRsaPubKeyUrl() {
-        return rsaPubKeyUrl;
-    }
-
-    public boolean isNeedDataEncry() {
-        return isNeedDataEncry;
-    }
-
     public void setHttpsInfo(String tlsServerCertFilePathAndName, String 
tlsServerKey) {
         if (StringUtils.isBlank(tlsServerCertFilePathAndName)) {
             throw new IllegalArgumentException("tlsServerCertFilePathAndName 
is Blank!");
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
index e37b9b2c0a..ebf76464ea 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
@@ -62,6 +62,7 @@ public class ProxyConfigEntry implements java.io.Serializable 
{
     public void setHostMap(Map<String, HostInfo> hostMap) {
         this.hostMap = hostMap;
     }
+
     public boolean isNodesEmpty() {
         return this.hostMap.isEmpty();
     }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index 033395667b..97952eb1ff 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -74,9 +74,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
@@ -90,17 +91,22 @@ public class ProxyConfigManager extends Thread {
     private static final Logger logger = 
LoggerFactory.getLogger(ProxyConfigManager.class);
     private static final LogCounter exptCounter = new LogCounter(10, 100000, 
60 * 1000L);
     private static final LogCounter parseCounter = new LogCounter(10, 100000, 
60 * 1000L);
+    private static final Map<String, Tuple2<AtomicLong, String>> 
fetchFailProxyMap =
+            new ConcurrentHashMap<>();
+    private static final Map<String, Tuple2<AtomicLong, String>> 
fetchFailEncryptMap =
+            new ConcurrentHashMap<>();
     private static final ReentrantReadWriteLock fileRw = new 
ReentrantReadWriteLock();
 
     private final String callerId;
-    private ProxyClientConfig clientConfig;
     private final Gson gson = new Gson();
     private final ClientMgr clientManager;
     private final ThreadLocalRandom random = ThreadLocalRandom.current();
     private final AtomicBoolean shutDown = new AtomicBoolean(false);
     // proxy configure info
+    private ProxyClientConfig clientConfig = null;
     private String localProxyConfigStoreFile;
     private String proxyConfigVisitUrl;
+    private String proxyQueryFailKey;
     private String proxyConfigCacheFile;
     private List<HostInfo> proxyInfoList = new ArrayList<>();
     private int oldStat = 0;
@@ -108,6 +114,7 @@ public class ProxyConfigManager extends Thread {
     private long lstUpdateTime = 0;
     // encrypt configure info
     private String encryptConfigVisitUrl;
+    private String encryptQueryFailKey;
     private String encryptConfigCacheFile;
     private EncryptConfigEntry userEncryptConfigEntry;
 
@@ -118,7 +125,9 @@ public class ProxyConfigManager extends Thread {
     public ProxyConfigManager(String callerId, ProxyClientConfig configure, 
ClientMgr clientManager) {
         this.callerId = callerId;
         this.clientManager = clientManager;
-        this.storeAndBuildMetaConfigure(configure);
+        if (configure != null) {
+            this.storeAndBuildMetaConfigure(configure);
+        }
         if (this.clientManager != null) {
             this.setName("ConfigManager-" + this.callerId);
             logger.info("ConfigManager({}) started, groupId={}",
@@ -130,19 +139,20 @@ public class ProxyConfigManager extends Thread {
      * Update proxy client configure for query case
      *
      * @param configure  proxy client configure
-     * @throws Exception exception
+     * @return process result
      */
-    public void updProxyClientConfig(ProxyClientConfig configure) throws 
Exception {
+    public Tuple2<Boolean, String> updProxyClientConfig(ProxyClientConfig 
configure) {
+        if (this.shutDown.get()) {
+            return new Tuple2<>(false, "SDK has shutdown!");
+        }
         if (configure == null) {
-            throw new Exception("ProxyClientConfig is null");
+            return new Tuple2<>(false, "ProxyClientConfig is null");
         }
         if (this.clientManager != null) {
-            throw new Exception("Not allowed for non meta-query case!");
-        }
-        if (shutDown.get()) {
-            return;
+            return new Tuple2<>(false, "Not allowed for non meta-query case!");
         }
         this.storeAndBuildMetaConfigure(configure);
+        return new Tuple2<>(true, "OK");
     }
 
     public void shutDown() {
@@ -166,6 +176,9 @@ public class ProxyConfigManager extends Thread {
         if (shutDown.get()) {
             return new Tuple2<>(null, "SDK has shutdown!");
         }
+        if (clientConfig == null) {
+            return new Tuple2<>(null, "Configure not initialized!");
+        }
         if (clientConfig.isOnlyUseLocalProxyConfig()) {
             return getLocalProxyListFromFile(this.localProxyConfigStoreFile);
         } else {
@@ -183,7 +196,7 @@ public class ProxyConfigManager extends Thread {
                         break;
                     }
                     // sleep then retry
-                    TimeUnit.MILLISECONDS.sleep(500);
+                    Thread.sleep(500L);
                 } while (++retryCount < 
clientConfig.getConfigSyncMaxRetryIfFail());
             }
             if (shutDown.get()) {
@@ -205,12 +218,15 @@ public class ProxyConfigManager extends Thread {
      * @throws Exception ex
      */
     public Tuple2<EncryptConfigEntry, String> getEncryptConfigure(boolean 
needRetry) throws Exception {
-        if (!clientConfig.isNeedDataEncry()) {
+        if (!clientConfig.isEnableDataEncrypt()) {
             return new Tuple2<>(null, "Not need data encrypt!");
         }
         if (shutDown.get()) {
             return new Tuple2<>(null, "SDK has shutdown!");
         }
+        if (clientConfig == null) {
+            return new Tuple2<>(null, "Configure not initialized!");
+        }
         EncryptConfigEntry encryptEntry = this.userEncryptConfigEntry;
         if (encryptEntry != null) {
             return new Tuple2<>(encryptEntry, "Ok");
@@ -228,7 +244,7 @@ public class ProxyConfigManager extends Thread {
                     break;
                 }
                 // sleep then retry
-                TimeUnit.MILLISECONDS.sleep(500);
+                Thread.sleep(500L);
             } while (++retryCount < 
clientConfig.getConfigSyncMaxRetryIfFail());
         }
         if (shutDown.get()) {
@@ -258,7 +274,7 @@ public class ProxyConfigManager extends Thread {
                 }
             }
             // update encrypt configure
-            if (clientConfig.isNeedDataEncry()) {
+            if (clientConfig.isEnableDataEncrypt()) {
                 try {
                     doEncryptConfigEntryQueryWork();
                 } catch (Throwable ex) {
@@ -288,7 +304,7 @@ public class ProxyConfigManager extends Thread {
      * @throws Exception
      */
     public void doProxyEntryQueryWork() throws Exception {
-        if (shutDown.get()) {
+        if (shutDown.get() || this.clientManager == null) {
             return;
         }
         /* Request the configuration from manager. */
@@ -306,7 +322,7 @@ public class ProxyConfigManager extends Thread {
                     break;
                 }
                 // sleep then retry.
-                TimeUnit.SECONDS.sleep(2);
+                Thread.sleep(2000L);
             } while (++retryCnt < 
this.clientConfig.getConfigSyncMaxRetryIfFail() && !shutDown.get());
             if (shutDown.get()) {
                 return;
@@ -340,7 +356,7 @@ public class ProxyConfigManager extends Thread {
     }
 
     private void doEncryptConfigEntryQueryWork() throws Exception {
-        if (shutDown.get()) {
+        if (shutDown.get() || this.clientManager == null) {
             return;
         }
         int retryCount = 0;
@@ -351,7 +367,7 @@ public class ProxyConfigManager extends Thread {
                 break;
             }
             // sleep then retry
-            TimeUnit.MILLISECONDS.sleep(500);
+            Thread.sleep(500L);
         } while (++retryCount < clientConfig.getConfigSyncMaxRetryIfFail());
         if (shutDown.get()) {
             return;
@@ -380,15 +396,21 @@ public class ProxyConfigManager extends Thread {
         if (StringUtils.isBlank(strRet)) {
             return new Tuple2<>(null, "Blank configure local file from " + 
filePath);
         }
-        return getProxyConfigEntry(strRet);
+        return getProxyConfigEntry(false, strRet);
     }
 
     private Tuple2<ProxyConfigEntry, String> requestProxyEntryQuietly() {
-        List<BasicNameValuePair> params = buildProxyNodeQueryParams();
+        // check cache failure
+        String qryResult = getManagerQryResultInFailStatus(true);
+        if (qryResult != null) {
+            return new Tuple2<>(null, "Query fail(" + qryResult + ") just now, 
please retry later!");
+        }
         // request meta info from manager
+        List<BasicNameValuePair> params = buildProxyNodeQueryParams();
         logger.debug("ConfigManager({}) request configure to manager({}), 
param={}",
                 this.callerId, this.proxyConfigVisitUrl, params);
-        Tuple2<Boolean, String> queryResult = 
requestConfiguration(this.proxyConfigVisitUrl, params);
+        Tuple2<Boolean, String> queryResult =
+                requestConfiguration(true, this.proxyConfigVisitUrl, params);
         if (!queryResult.getF0()) {
             return new Tuple2<>(null, queryResult.getF1());
         }
@@ -396,12 +418,20 @@ public class ProxyConfigManager extends Thread {
         logger.debug("ConfigManager({}) received configure, from manager({}), 
groupId={}, result={}",
                 callerId, proxyConfigVisitUrl, 
clientConfig.getInlongGroupId(), queryResult.getF1());
         try {
-            return getProxyConfigEntry(queryResult.getF1());
+            Tuple2<ProxyConfigEntry, String> parseResult =
+                    getProxyConfigEntry(true, queryResult.getF1());
+            if (parseResult.getF0() == null) {
+                bookManagerQryFailStatus(true, parseResult.getF1());
+            } else {
+                rmvManagerQryFailStatus(true);
+            }
+            return parseResult;
         } catch (Throwable ex) {
             if (exptCounter.shouldPrint()) {
                 logger.warn("ConfigManager({}) parse failure, from 
manager({}), groupId={}, result={}",
                         callerId, proxyConfigVisitUrl, 
clientConfig.getInlongGroupId(), queryResult.getF1(), ex);
             }
+            bookManagerQryFailStatus(true, ex.getMessage());
             return new Tuple2<>(null, ex.getMessage());
         }
     }
@@ -518,16 +548,23 @@ public class ProxyConfigManager extends Thread {
     }
 
     private Tuple2<EncryptConfigEntry, String> requestPubKeyFromManager() {
-        List<BasicNameValuePair> params = buildPubKeyQueryParams();
+        // check cache failure
+        String qryResult = getManagerQryResultInFailStatus(false);
+        if (qryResult != null) {
+            return new Tuple2<>(null, "Query fail(" + qryResult + ") just now, 
please retry later!");
+        }
         // request meta info from manager
+        List<BasicNameValuePair> params = buildPubKeyQueryParams();
         logger.debug("ConfigManager({}) request pubkey to manager({}), 
param={}",
                 this.callerId, this.encryptConfigVisitUrl, params);
-        Tuple2<Boolean, String> queryResult = 
requestConfiguration(this.encryptConfigVisitUrl, params);
+        Tuple2<Boolean, String> queryResult =
+                requestConfiguration(false, this.encryptConfigVisitUrl, 
params);
         if (!queryResult.getF0()) {
             return new Tuple2<>(null, queryResult.getF1());
         }
         logger.debug("ConfigManager({}) received pubkey from manager({}), 
result={}",
                 this.callerId, this.encryptConfigVisitUrl, 
queryResult.getF1());
+        String errorMsg;
         JsonObject pubKeyConf;
         try {
             pubKeyConf = 
JsonParser.parseString(queryResult.getF1()).getAsJsonObject();
@@ -536,62 +573,72 @@ public class ProxyConfigManager extends Thread {
                 logger.warn("ConfigManager({}) parse failure, secretId={}, 
config={}!",
                         this.callerId, this.clientConfig.getAuthSecretId(), 
queryResult.getF1());
             }
-            return new Tuple2<>(null, "parse pubkey failure:" + 
ex.getMessage());
+            errorMsg = "parse pubkey failure:" + ex.getMessage();
+            bookManagerQryFailStatus(false, errorMsg);
+            return new Tuple2<>(null, errorMsg);
         }
         if (pubKeyConf == null) {
-            return new Tuple2<>(null, "No public key information");
+            errorMsg = "No public key information";
+            bookManagerQryFailStatus(false, errorMsg);
+            return new Tuple2<>(null, errorMsg);
         }
-        if (!pubKeyConf.has("resultCode")) {
-            if (parseCounter.shouldPrint()) {
-                logger.warn("ConfigManager({}) config failure: resultCode 
field not exist, secretId={}, config={}!",
-                        this.callerId, this.clientConfig.getAuthSecretId(), 
queryResult.getF1());
-            }
-            return new Tuple2<>(null, "resultCode field not exist");
-        }
-        int resultCode = pubKeyConf.get("resultCode").getAsInt();
-        if (resultCode != 0) {
-            if (parseCounter.shouldPrint()) {
-                logger.warn("ConfigManager({}) config failure: resultCode != 
0, secretId={}, config={}!",
-                        this.callerId, this.clientConfig.getAuthSecretId(), 
queryResult.getF1());
-            }
-            return new Tuple2<>(null, "resultCode != 0!");
-        }
-        if (!pubKeyConf.has("resultData")) {
-            if (parseCounter.shouldPrint()) {
-                logger.warn("ConfigManager({}) config failure: resultData 
field not exist, secretId={}, config={}!",
-                        this.callerId, this.clientConfig.getAuthSecretId(), 
queryResult.getF1());
-            }
-            return new Tuple2<>(null, "resultData field not exist");
-        }
-        JsonObject resultData = pubKeyConf.get("resultData").getAsJsonObject();
-        if (resultData != null) {
-            String publicKey = resultData.get("publicKey").getAsString();
-            if (StringUtils.isBlank(publicKey)) {
+        try {
+            if (!pubKeyConf.has("resultCode")) {
                 if (parseCounter.shouldPrint()) {
-                    logger.warn("ConfigManager({}) config failure: publicKey 
is blank, secretId={}, config={}!",
+                    logger.warn("ConfigManager({}) config failure: resultCode 
field not exist, secretId={}, config={}!",
                             this.callerId, 
this.clientConfig.getAuthSecretId(), queryResult.getF1());
                 }
-                return new Tuple2<>(null, "publicKey is blank!");
+                throw new Exception("resultCode field not exist");
             }
-            String username = resultData.get("username").getAsString();
-            if (StringUtils.isBlank(username)) {
+            int resultCode = pubKeyConf.get("resultCode").getAsInt();
+            if (resultCode != 0) {
                 if (parseCounter.shouldPrint()) {
-                    logger.warn("ConfigManager({}) config failure: username is 
blank, secretId={}, config={}!",
+                    logger.warn("ConfigManager({}) config failure: resultCode 
!= 0, secretId={}, config={}!",
                             this.callerId, 
this.clientConfig.getAuthSecretId(), queryResult.getF1());
                 }
-                return new Tuple2<>(null, "username is blank!");
+                throw new Exception("resultCode != 0!");
             }
-            String versionStr = resultData.get("version").getAsString();
-            if (StringUtils.isBlank(versionStr)) {
+            if (!pubKeyConf.has("resultData")) {
                 if (parseCounter.shouldPrint()) {
-                    logger.warn("ConfigManager({}) config failure: version is 
blank, secretId={}, config={}!",
+                    logger.warn("ConfigManager({}) config failure: resultData 
field not exist, secretId={}, config={}!",
                             this.callerId, 
this.clientConfig.getAuthSecretId(), queryResult.getF1());
                 }
-                return new Tuple2<>(null, "version is blank!");
+                throw new Exception("resultData field not exist");
+            }
+            JsonObject resultData = 
pubKeyConf.get("resultData").getAsJsonObject();
+            if (resultData != null) {
+                String publicKey = resultData.get("publicKey").getAsString();
+                if (StringUtils.isBlank(publicKey)) {
+                    if (parseCounter.shouldPrint()) {
+                        logger.warn("ConfigManager({}) config failure: 
publicKey is blank, secretId={}, config={}!",
+                                this.callerId, 
this.clientConfig.getAuthSecretId(), queryResult.getF1());
+                    }
+                    throw new Exception("publicKey is blank!");
+                }
+                String username = resultData.get("username").getAsString();
+                if (StringUtils.isBlank(username)) {
+                    if (parseCounter.shouldPrint()) {
+                        logger.warn("ConfigManager({}) config failure: 
username is blank, secretId={}, config={}!",
+                                this.callerId, 
this.clientConfig.getAuthSecretId(), queryResult.getF1());
+                    }
+                    throw new Exception("username is blank!");
+                }
+                String versionStr = resultData.get("version").getAsString();
+                if (StringUtils.isBlank(versionStr)) {
+                    if (parseCounter.shouldPrint()) {
+                        logger.warn("ConfigManager({}) config failure: version 
is blank, secretId={}, config={}!",
+                                this.callerId, 
this.clientConfig.getAuthSecretId(), queryResult.getF1());
+                    }
+                    throw new Exception("version is blank!");
+                }
+                rmvManagerQryFailStatus(false);
+                return new Tuple2<>(new EncryptConfigEntry(username, 
versionStr, publicKey), "Ok");
             }
-            return new Tuple2<>(new EncryptConfigEntry(username, versionStr, 
publicKey), "Ok");
+            throw new Exception("resultData value is null!");
+        } catch (Throwable ex) {
+            bookManagerQryFailStatus(false, ex.getMessage());
+            return new Tuple2<>(null, ex.getMessage());
         }
-        return new Tuple2<>(null, "resultData value is null!");
     }
 
     private void updateEncryptConfigEntry(EncryptConfigEntry newEncryptEntry) {
@@ -674,7 +721,8 @@ public class ProxyConfigManager extends Thread {
     }
 
     /* Request new configurations from Manager. */
-    private Tuple2<Boolean, String> requestConfiguration(String url, 
List<BasicNameValuePair> params) {
+    private Tuple2<Boolean, String> requestConfiguration(
+            boolean queryProxyInfo, String url, List<BasicNameValuePair> 
params) {
         HttpParams myParams = new BasicHttpParams();
         HttpConnectionParams.setConnectionTimeout(myParams, 
clientConfig.getManagerConnTimeoutMs());
         HttpConnectionParams.setSoTimeout(myParams, 
clientConfig.getManagerSocketTimeoutMs());
@@ -702,13 +750,20 @@ public class ProxyConfigManager extends Thread {
                     new UrlEncodedFormEntity(params, StandardCharsets.UTF_8);
             httpPost.setEntity(urlEncodedFormEntity);
             HttpResponse response = httpClient.execute(httpPost);
+            String errMsg;
             if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
-                return new Tuple2<>(false, 
response.getStatusLine().getStatusCode()
-                        + ":" + response.getStatusLine().getStatusCode());
+                errMsg = response.getStatusLine().getStatusCode()
+                        + ":" + response.getStatusLine().getReasonPhrase();
+                if (response.getStatusLine().getStatusCode() >= 500) {
+                    bookManagerQryFailStatus(queryProxyInfo, errMsg);
+                }
+                return new Tuple2<>(false, errMsg);
             }
             String returnStr = EntityUtils.toString(response.getEntity());
             if (StringUtils.isBlank(returnStr)) {
-                return new Tuple2<>(false, "query result is blank!");
+                errMsg = "server return blank entity!";
+                bookManagerQryFailStatus(queryProxyInfo, errMsg);
+                return new Tuple2<>(false, errMsg);
             }
             return new Tuple2<>(true, returnStr);
         } catch (Throwable ex) {
@@ -755,6 +810,11 @@ public class ProxyConfigManager extends Thread {
                 
.append(ConfigConstants.MANAGER_DATAPROXY_API).append(clientConfig.getInlongGroupId())
                 .toString();
         strBuff.delete(0, strBuff.length());
+        this.proxyQueryFailKey = strBuff
+                .append("proxy:").append(clientConfig.getInlongGroupId())
+                .append("#").append(clientConfig.getRegionName())
+                .append("#").append(clientConfig.getProtocolType()).toString();
+        strBuff.delete(0, strBuff.length());
         this.localProxyConfigStoreFile = strBuff
                 .append(clientConfig.getConfigStoreBasePath())
                 .append(ConfigConstants.META_STORE_SUB_DIR)
@@ -770,6 +830,9 @@ public class ProxyConfigManager extends Thread {
                 .toString();
         strBuff.delete(0, strBuff.length());
         this.encryptConfigVisitUrl = clientConfig.getRsaPubKeyUrl();
+        this.encryptQueryFailKey = strBuff
+                
.append("encrypt:").append(clientConfig.getUserName()).toString();
+        strBuff.delete(0, strBuff.length());
         this.encryptConfigCacheFile = strBuff
                 .append(clientConfig.getConfigStoreBasePath())
                 .append(ConfigConstants.META_STORE_SUB_DIR)
@@ -795,23 +858,85 @@ public class ProxyConfigManager extends Thread {
     private List<BasicNameValuePair> buildPubKeyQueryParams() {
         List<BasicNameValuePair> params = new ArrayList<>();
         params.add(new BasicNameValuePair("operation", "query"));
-        params.add(new BasicNameValuePair("username", 
clientConfig.getAuthSecretId()));
+        params.add(new BasicNameValuePair("username", 
clientConfig.getUserName()));
         return params;
     }
 
-    private Tuple2<ProxyConfigEntry, String> getProxyConfigEntry(String 
strRet) {
-        DataProxyNodeResponse proxyCluster;
-        try {
-            proxyCluster = gson.fromJson(strRet, DataProxyNodeResponse.class);
-        } catch (Throwable ex) {
-            if (parseCounter.shouldPrint()) {
-                logger.warn("ConfigManager({}) parse exception, groupId={}, 
config={}",
-                        this.callerId, clientConfig.getInlongGroupId(), 
strRet, ex);
+    private void bookManagerQryFailStatus(boolean proxyQry, String errMsg) {
+        if (proxyQry) {
+            fetchFailProxyMap.put(proxyQueryFailKey,
+                    new Tuple2<>(new AtomicLong(System.currentTimeMillis()), 
errMsg));
+        } else {
+            fetchFailEncryptMap.put(encryptQueryFailKey,
+                    new Tuple2<>(new AtomicLong(System.currentTimeMillis()), 
errMsg));
+        }
+    }
+
+    private void rmvManagerQryFailStatus(boolean proxyQry) {
+        if (proxyQry) {
+            fetchFailProxyMap.remove(proxyQueryFailKey);
+        } else {
+            fetchFailEncryptMap.remove(encryptQueryFailKey);
+        }
+    }
+
+    private String getManagerQryResultInFailStatus(boolean proxyQry) {
+        if (clientConfig.getConfigFailStatusExpiredMs() <= 0) {
+            return null;
+        }
+        Tuple2<AtomicLong, String> queryResult;
+        if (proxyQry) {
+            queryResult = fetchFailProxyMap.get(proxyQueryFailKey);
+        } else {
+            queryResult = fetchFailEncryptMap.get(encryptQueryFailKey);
+        }
+        if (queryResult != null
+                && (System.currentTimeMillis() - queryResult.getF0().get() < 
clientConfig
+                        .getConfigFailStatusExpiredMs())) {
+            return queryResult.getF1();
+        }
+        return null;
+    }
+
+    private Tuple2<ProxyConfigEntry, String> getProxyConfigEntry(boolean 
fromManager, String strRet) {
+        DataProxyNodeResponse proxyNodeConfig;
+        if (fromManager) {
+            ProxyClusterConfig clusterConfig;
+            try {
+                clusterConfig = gson.fromJson(strRet, 
ProxyClusterConfig.class);
+            } catch (Throwable ex) {
+                if (parseCounter.shouldPrint()) {
+                    logger.warn("ConfigManager({}) parse exception, 
groupId={}, config={}",
+                            this.callerId, clientConfig.getInlongGroupId(), 
strRet, ex);
+                }
+                return new Tuple2<>(null, "parse failure:" + ex.getMessage());
+            }
+            if (clusterConfig == null) {
+                return new Tuple2<>(null, "content parse result is null!");
+            }
+            if (!clusterConfig.isSuccess()) {
+                return new Tuple2<>(null, clusterConfig.getErrMsg());
+            }
+            if (clusterConfig.getData() == null) {
+                return new Tuple2<>(null, "return data content is null!");
+            }
+            proxyNodeConfig = clusterConfig.getData();
+        } else {
+            try {
+                proxyNodeConfig = gson.fromJson(strRet, 
DataProxyNodeResponse.class);
+            } catch (Throwable ex) {
+                if (parseCounter.shouldPrint()) {
+                    logger.warn("ConfigManager({}) parse local file exception, 
groupId={}, config={}",
+                            this.callerId, clientConfig.getInlongGroupId(), 
strRet, ex);
+                }
+                return new Tuple2<>(null, "parse file failure:" + 
ex.getMessage());
+            }
+            if (proxyNodeConfig == null) {
+                return new Tuple2<>(null, "file content parse result is 
null!");
             }
-            return new Tuple2<>(null, "parse failure:" + ex.getMessage());
         }
         // parse nodeList
-        List<DataProxyNodeInfo> nodeList = proxyCluster.getNodeList();
+        List<DataProxyNodeInfo> nodeList = proxyNodeConfig.getNodeList();
         if (CollectionUtils.isEmpty(nodeList)) {
             return new Tuple2<>(null, "nodeList is empty!");
         }
@@ -836,23 +961,23 @@ public class ProxyConfigManager extends Thread {
         }
         // parse clusterId
         int clusterId = -1;
-        if (ObjectUtils.isNotEmpty(proxyCluster.getClusterId())) {
-            clusterId = proxyCluster.getClusterId();
+        if (ObjectUtils.isNotEmpty(proxyNodeConfig.getClusterId())) {
+            clusterId = proxyNodeConfig.getClusterId();
         }
         // parse load
         int load = ConfigConstants.LOAD_THRESHOLD;
-        if (ObjectUtils.isNotEmpty(proxyCluster.getLoad())) {
-            load = proxyCluster.getLoad() > 200 ? 200 : 
(Math.max(proxyCluster.getLoad(), 0));
+        if (ObjectUtils.isNotEmpty(proxyNodeConfig.getLoad())) {
+            load = proxyNodeConfig.getLoad() > 200 ? 200 : 
(Math.max(proxyNodeConfig.getLoad(), 0));
         }
         // parse isIntranet
         boolean isIntranet = true;
-        if (ObjectUtils.isNotEmpty(proxyCluster.getIsIntranet())) {
-            isIntranet = proxyCluster.getIsIntranet() == 1;
+        if (ObjectUtils.isNotEmpty(proxyNodeConfig.getIsIntranet())) {
+            isIntranet = proxyNodeConfig.getIsIntranet() == 1;
         }
         // parse isSwitch
         int isSwitch = 0;
-        if (ObjectUtils.isNotEmpty(proxyCluster.getIsSwitch())) {
-            isSwitch = proxyCluster.getIsSwitch();
+        if (ObjectUtils.isNotEmpty(proxyNodeConfig.getIsSwitch())) {
+            isSwitch = proxyNodeConfig.getIsSwitch();
         }
         // build ProxyConfigEntry
         ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
@@ -863,7 +988,7 @@ public class ProxyConfigManager extends Thread {
         proxyEntry.setSwitchStat(isSwitch);
         proxyEntry.setLoad(load);
         proxyEntry.setMaxPacketLength(
-                proxyCluster.getMaxPacketLength() != null ? 
proxyCluster.getMaxPacketLength() : -1);
+                proxyNodeConfig.getMaxPacketLength() != null ? 
proxyNodeConfig.getMaxPacketLength() : -1);
         return new Tuple2<>(proxyEntry, "ok");
     }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
index 0444edbfd5..ea9fc15a51 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
@@ -345,7 +345,7 @@ public class ClientMgr {
                 if (!realHosts.isEmpty()) {
                     break;
                 }
-                Thread.sleep(1000);
+                Thread.sleep(1000L);
             } while (--maxCycleCnt > 0);
             // update active nodes
             if (realHosts.isEmpty()) {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/QueueObject.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/QueueObject.java
index 3699213176..009d513bdb 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/QueueObject.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/QueueObject.java
@@ -19,7 +19,6 @@ package org.apache.inlong.sdk.dataproxy.network;
 
 import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
 
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class QueueObject {
@@ -32,11 +31,11 @@ public class QueueObject {
     private final int size;
 
     public QueueObject(NettyClient client, long sendTimeInMillis,
-            SendMessageCallback callback, int size, long timeout, TimeUnit 
timeUnit) {
+            SendMessageCallback callback, int size, long timeoutMs) {
         this.client = client;
         this.sendTimeInMillis = sendTimeInMillis;
         this.callback = callback;
-        this.timeoutInMillis = TimeUnit.MILLISECONDS.convert(timeout, 
timeUnit);
+        this.timeoutInMillis = timeoutMs;
         this.size = size;
     }
 
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 13920fb7a6..b56781c0ce 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -97,7 +97,7 @@ public class Sender {
             if (!configure.isEnableAuthentication()) {
                 throw new Exception("In OutNetwork isNeedAuthentication must 
be true!");
             }
-            if (!configure.isNeedDataEncry()) {
+            if (!configure.isEnableDataEncrypt()) {
                 throw new Exception("In OutNetwork isNeedDataEncry must be 
true!");
             }
         }
@@ -201,15 +201,13 @@ public class Sender {
                             
clientMgr.getStreamIdNum(encodeObject.getStreamId()));
                 }
             }
-            if (this.configure.isNeedDataEncry()) {
+            if (this.configure.isEnableDataEncrypt()) {
                 encodeObject.setEncryptEntry(true,
                         configure.getAuthSecretId(), 
clientMgr.getEncryptConfigureInfo());
-            } else {
-                encodeObject.setEncryptEntry(false, null, null);
             }
             encodeObject.setMsgUUID(msgUUID);
-            SyncMessageCallable callable = new 
SyncMessageCallable(clientResult.getF1(),
-                    encodeObject, configure.getRequestTimeoutMs(), 
TimeUnit.MILLISECONDS);
+            SyncMessageCallable callable = new SyncMessageCallable(
+                    clientResult.getF1(), encodeObject, 
configure.getRequestTimeoutMs());
             syncCallables.put(encodeObject.getMessageId(), callable);
             Future<SendResult> future = threadPool.submit(callable);
             message = future.get(configure.getRequestTimeoutMs(), 
TimeUnit.MILLISECONDS);
@@ -309,8 +307,8 @@ public class Sender {
     /**
      * Following methods used by asynchronously message sending.
      */
-    public void asyncSendMessage(EncodeObject encodeObject, 
SendMessageCallback callback, String msgUUID,
-            long timeout, TimeUnit timeUnit) throws ProxysdkException {
+    public void asyncSendMessage(EncodeObject encodeObject,
+            SendMessageCallback callback, String msgUUID) throws 
ProxysdkException {
         if (!started.get()) {
             if (callback != null) {
                 callback.onMessageAck(SendResult.SENDER_CLOSED);
@@ -381,8 +379,8 @@ public class Sender {
         }
         int size = 1;
         if (currentBufferSize.incrementAndGet() >= asyncCallbackMaxSize) {
-            currentBufferSize.decrementAndGet();
             clientResult.getF1().decMsgInFlight();
+            currentBufferSize.decrementAndGet();
             if (callback != null) {
                 callback.onMessageAck(SendResult.ASYNC_CALLBACK_BUFFER_FULL);
                 return;
@@ -393,7 +391,8 @@ public class Sender {
         ConcurrentHashMap<String, QueueObject> msgQueueMap =
                 callbacks.computeIfAbsent(clientResult.getF1().getChannel(), 
(k) -> new ConcurrentHashMap<>());
         QueueObject queueObject = 
msgQueueMap.putIfAbsent(encodeObject.getMessageId(),
-                new QueueObject(clientResult.getF1(), 
System.currentTimeMillis(), callback, size, timeout, timeUnit));
+                new QueueObject(clientResult.getF1(), 
System.currentTimeMillis(), callback,
+                        size, configure.getRequestTimeoutMs()));
         if (queueObject != null) {
             if (reqChkLoggCount.shouldPrint()) {
                 logger.warn("Sender({}) found message id {} has existed.",
@@ -407,11 +406,9 @@ public class Sender {
                         clientMgr.getStreamIdNum(encodeObject.getStreamId()));
             }
         }
-        if (this.configure.isNeedDataEncry()) {
+        if (this.configure.isEnableDataEncrypt()) {
             encodeObject.setEncryptEntry(true,
                     configure.getAuthSecretId(), 
clientMgr.getEncryptConfigureInfo());
-        } else {
-            encodeObject.setEncryptEntry(false, null, null);
         }
         encodeObject.setMsgUUID(msgUUID);
         clientResult.getF1().write(encodeObject);
@@ -501,7 +498,7 @@ public class Sender {
         }
     }
 
-    /* Deal with unexpected exception. only used for asyc send */
+    /* Deal with unexpected exception. only used for async send */
     public void waitForAckForChannel(Channel channel) {
         if (channel == null) {
             return;
@@ -513,14 +510,14 @@ public class Sender {
         }
         try {
             while (!queueObjMap.isEmpty()) {
+                if (System.currentTimeMillis() - startTime >= 
configure.getConCloseWaitPeriodMs()) {
+                    break;
+                }
                 try {
-                    Thread.sleep(100);
+                    Thread.sleep(100L);
                 } catch (InterruptedException ex1) {
                     //
                 }
-                if (System.currentTimeMillis() - startTime >= 
configure.getConCloseWaitPeriodMs()) {
-                    break;
-                }
             }
         } catch (Throwable ex) {
             if (exptCnt.shouldPrint()) {
@@ -559,13 +556,17 @@ public class Sender {
         return clientMgr;
     }
 
+    public ProxyClientConfig getConfigure() {
+        return configure;
+    }
+
     private void checkCallbackList() {
         // max wait for 1 min
         try {
             long startTime = System.currentTimeMillis();
             while (currentBufferSize.get() > 0
                     && System.currentTimeMillis() - startTime < 
configure.getConCloseWaitPeriodMs()) {
-                TimeUnit.MILLISECONDS.sleep(300);
+                Thread.sleep(300L);
             }
             if (currentBufferSize.get() > 0) {
                 logger.warn("Sender({}) callback size({}) not empty, force 
quit!",
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
index bce2ad4468..fa3f2bf14e 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
@@ -37,17 +37,14 @@ public class SyncMessageCallable implements 
Callable<SendResult> {
     private final NettyClient client;
     private final CountDownLatch awaitLatch = new CountDownLatch(1);
     private final EncodeObject encodeObject;
-    private final long timeout;
-    private final TimeUnit timeUnit;
+    private final long timeoutMs;
 
     private SendResult message;
 
-    public SyncMessageCallable(NettyClient client, EncodeObject encodeObject,
-            long timeout, TimeUnit timeUnit) {
+    public SyncMessageCallable(NettyClient client, EncodeObject encodeObject, 
long timeoutMs) {
         this.client = client;
         this.encodeObject = encodeObject;
-        this.timeout = timeout;
-        this.timeUnit = timeUnit;
+        this.timeoutMs = timeoutMs;
     }
 
     public void update(SendResult message) {
@@ -61,7 +58,7 @@ public class SyncMessageCallable implements 
Callable<SendResult> {
                 return SendResult.WRITE_OVER_WATERMARK;
             }
             ChannelFuture channelFuture = client.write(encodeObject);
-            awaitLatch.await(timeout, timeUnit);
+            awaitLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
         } catch (Throwable ex) {
             if (exptCnt.shouldPrint()) {
                 logger.warn("SyncMessageCallable write data throw exception", 
ex);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
index 39f8562b52..4d5459b8e1 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
@@ -193,8 +193,7 @@ public class MetricWorkerThread extends Thread implements 
Closeable {
         callBack.increaseRetry();
         try {
             if (callBack.getRetryCount() < 4) {
-                sender.asyncSendMessage(encodeObject, callBack,
-                        String.valueOf(System.currentTimeMillis()), 20, 
TimeUnit.SECONDS);
+                sender.asyncSendMessage(encodeObject, callBack, 
String.valueOf(System.currentTimeMillis()));
             } else {
                 logger.error("Send metric failure: {}", 
encodeObject.getBodylist());
             }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
index f53af9b7aa..5211ba9907 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
@@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Daemon threads to check timeout for asynchronous callback.
@@ -162,7 +161,6 @@ public class TimeoutScanThread extends Thread {
                     checkMessageIdBasedCallbacks(entry.getKey(), 
entry.getValue());
                 }
                 checkTimeoutChannel();
-                TimeUnit.SECONDS.sleep(1);
             } catch (Throwable ex) {
                 if (exptCnt.shouldPrint()) {
                     logger.warn("TimeoutScanThread({}) throw exception", 
sender.getInstanceId(), ex);
@@ -172,6 +170,14 @@ public class TimeoutScanThread extends Thread {
                 logger.info("TimeoutScanThread({}) scan, currentBufferSize={}",
                         sender.getInstanceId(), 
sender.getCurrentBufferSize().get());
             }
+            if (bShutDown) {
+                break;
+            }
+            try {
+                Thread.sleep(1000L);
+            } catch (InterruptedException e) {
+                //
+            }
         }
         logger.info("TimeoutScanThread({}) thread existed !", 
sender.getInstanceId());
     }

Reply via email to