This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

The following commit(s) were added to refs/heads/master by this push:
     new dae7074d9e [INLONG-11564][SDK] DataProxy SDK Implementation 
Optimization (#11581)
dae7074d9e is described below

commit dae7074d9ee219135138e8a9a8a3db8948366bfd
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Fri Dec 6 08:57:13 2024 +0800

    [INLONG-11564][SDK] DataProxy SDK Implementation Optimization (#11581)
    
    Co-authored-by: gosonzhang <gosonzh...@tencent.com>
---
 .../inlong/sdk/dataproxy/DefaultMessageSender.java |   2 -
 .../inlong/sdk/dataproxy/ProxyClientConfig.java    |  24 --
 .../sdk/dataproxy/codec/ProtocolDecoder.java       |  53 ++--
 .../sdk/dataproxy/codec/ProtocolEncoder.java       |  32 +--
 .../inlong/sdk/dataproxy/common/SendResult.java    |   2 +-
 .../sdk/dataproxy/config/EncryptConfigEntry.java   |   5 +-
 .../inlong/sdk/dataproxy/config/HostInfo.java      |   4 +-
 .../sdk/dataproxy/config/ProxyConfigManager.java   | 270 +++++++-------------
 .../sdk/dataproxy/network/SyncMessageCallable.java |  13 +-
 .../sdk/dataproxy/network/TimeScanObject.java      |   4 +-
 .../dataproxy/threads/ManagerFetcherThread.java    |  68 -----
 .../inlong/sdk/dataproxy/utils/EventLoopUtil.java  |   3 +-
 .../sdk/dataproxy/utils/ServiceDiscoveryUtils.java | 279 ---------------------
 13 files changed, 153 insertions(+), 606 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 6fa6f6ff9a..dd22e63cce 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -29,7 +29,6 @@ import 
org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
 import org.apache.inlong.sdk.dataproxy.network.Sender;
 import org.apache.inlong.sdk.dataproxy.network.SequentialID;
 import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
-import org.apache.inlong.sdk.dataproxy.threads.ManagerFetcherThread;
 import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
 
 import org.slf4j.Logger;
@@ -52,7 +51,6 @@ public class DefaultMessageSender implements MessageSender {
     private static final ConcurrentHashMap<Integer, DefaultMessageSender> 
CACHE_SENDER =
             new ConcurrentHashMap<>();
     private static final AtomicBoolean MANAGER_FETCHER_THREAD_STARTED = new 
AtomicBoolean(false);
-    private static ManagerFetcherThread managerFetcherThread;
     private static final SequentialID idGenerator = new SequentialID();
     private final Sender sender;
     private final IndexCollectThread indexCol;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index e7a0f6a3e6..ce8a3b3b39 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -34,7 +34,6 @@ public class ProxyClientConfig {
     private String managerIP = "";
     private String managerAddress;
 
-    private String managerIpLocalPath = System.getProperty("user.dir") + 
"/.inlong/.managerIps";
     private String managerUrl = "";
     private int proxyUpdateIntervalMinutes;
     private int proxyUpdateMaxRetry;
@@ -54,7 +53,6 @@ public class ProxyClientConfig {
     private String authSecretKey;
     private String protocolType;
 
-    private boolean enableSaveManagerVIps = false;
     // metric configure
     private MetricConfig metricConfig = new MetricConfig();
 
@@ -211,28 +209,6 @@ public class ProxyClientConfig {
         return managerIP;
     }
 
-    public String getManagerIpLocalPath() {
-        return managerIpLocalPath;
-    }
-
-    public void setManagerIpLocalPath(String managerIpLocalPath) throws 
ProxysdkException {
-        if (StringUtils.isEmpty(managerIpLocalPath)) {
-            throw new ProxysdkException("managerIpLocalPath is empty.");
-        }
-        if (managerIpLocalPath.charAt(managerIpLocalPath.length() - 1) == '/') 
{
-            managerIpLocalPath = managerIpLocalPath.substring(0, 
managerIpLocalPath.length() - 1);
-        }
-        this.managerIpLocalPath = managerIpLocalPath + "/.managerIps";
-    }
-
-    public boolean isEnableSaveManagerVIps() {
-        return enableSaveManagerVIps;
-    }
-
-    public void setEnableSaveManagerVIps(boolean enable) {
-        this.enableSaveManagerVIps = enable;
-    }
-
     public String getConfStoreBasePath() {
         return confStoreBasePath;
     }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
index 2038a8b8d6..b7c31cba79 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sdk.dataproxy.codec;
 
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageDecoder;
@@ -29,18 +31,19 @@ import java.util.List;
 
 public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProtocolDecoder.class);
-
+    private static final Logger logger = 
LoggerFactory.getLogger(ProtocolDecoder.class);
+    private static final LogCounter decExptCounter = new LogCounter(10, 
200000, 60 * 1000L);
     @Override
     protected void decode(ChannelHandlerContext ctx,
             ByteBuf buffer, List<Object> out) throws Exception {
         buffer.markReaderIndex();
         // totallen
         int totalLen = buffer.readInt();
-        LOGGER.debug("decode totalLen : {}", totalLen);
         if (totalLen != buffer.readableBytes()) {
-            LOGGER.error("totalLen is not equal readableBytes.total:" + 
totalLen
-                    + ";readableBytes:" + buffer.readableBytes());
+            if (decExptCounter.shouldPrint()) {
+                logger.error("Length not equal, 
totalLen={},readableBytes={},from={}",
+                        totalLen, buffer.readableBytes(), ctx.channel());
+            }
             buffer.resetReaderIndex();
             throw new Exception("totalLen is not equal readableBytes.total");
         }
@@ -48,14 +51,17 @@ public class ProtocolDecoder extends 
MessageToMessageDecoder<ByteBuf> {
         int msgType = buffer.readByte() & 0x1f;
 
         if (msgType == 4) {
-            LOGGER.info("debug decode");
-        }
-        if (msgType == 3 | msgType == 5) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("debug decode");
+            }
+        } else if (msgType == 3 | msgType == 5) {
             // bodylen
             int bodyLength = buffer.readInt();
             if (bodyLength >= totalLen) {
-                LOGGER.error("bodyLen is greater than totalLen.totalLen:" + 
totalLen
-                        + ";bodyLen:" + bodyLength);
+                if (decExptCounter.shouldPrint()) {
+                    logger.error("bodyLen greater than totalLen, 
totalLen={},bodyLen={},from={}",
+                            totalLen, bodyLength, ctx.channel());
+                }
                 buffer.resetReaderIndex();
                 throw new Exception("bodyLen is greater than 
totalLen.totalLen");
             }
@@ -64,20 +70,19 @@ public class ProtocolDecoder extends 
MessageToMessageDecoder<ByteBuf> {
                 bodyBytes = new byte[bodyLength];
                 buffer.readBytes(bodyBytes);
             }
-
             // attrlen
+            String attrInfo = "";
             int attrLength = buffer.readInt();
-            byte[] attrBytes = null;
             if (attrLength > 0) {
-                attrBytes = new byte[attrLength];
+                byte[] attrBytes = new byte[attrLength];
                 buffer.readBytes(attrBytes);
+                attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
             }
             EncodeObject object;
             if (bodyBytes == null) {
-                object = new EncodeObject(new String(attrBytes, 
StandardCharsets.UTF_8));
+                object = new EncodeObject(attrInfo);
             } else {
-                object = new EncodeObject(Collections.singletonList(bodyBytes),
-                        new String(attrBytes, StandardCharsets.UTF_8));
+                object = new 
EncodeObject(Collections.singletonList(bodyBytes), attrInfo);
             }
             object.setMsgtype(5);
             out.add(object);
@@ -85,12 +90,13 @@ public class ProtocolDecoder extends 
MessageToMessageDecoder<ByteBuf> {
 
             int seqId = buffer.readInt();
             int attrLen = buffer.readShort();
-            byte[] attrBytes = null;
+            String attrInfo = "";
             if (attrLen > 0) {
-                attrBytes = new byte[attrLen];
+                byte[] attrBytes = new byte[attrLen];
                 buffer.readBytes(attrBytes);
+                attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
             }
-            EncodeObject object = new EncodeObject(new String(attrBytes, 
StandardCharsets.UTF_8));
+            EncodeObject object = new EncodeObject(attrInfo);
             object.setMessageId(String.valueOf(seqId));
 
             buffer.readShort();
@@ -103,15 +109,14 @@ public class ProtocolDecoder extends 
MessageToMessageDecoder<ByteBuf> {
             buffer.skipBytes(4 + 1 + 4); // skip datatime, body_ver and 
body_len
             final short load = buffer.readShort(); // read from body
             int attrLen = buffer.readShort();
-            byte[] attrBytes = null;
+            String attrInfo = "";
             if (attrLen > 0) {
-                attrBytes = new byte[attrLen];
+                byte[] attrBytes = new byte[attrLen];
                 buffer.readBytes(attrBytes);
+                attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
             }
             buffer.skipBytes(2); // skip magic
-
-            String attrs = (attrBytes == null ? "" : new String(attrBytes, 
StandardCharsets.UTF_8));
-            EncodeObject object = new EncodeObject(attrs);
+            EncodeObject object = new EncodeObject(attrInfo);
             object.setMsgtype(8);
             object.setLoad(load);
             out.add(object);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
index c3d463987b..1a20766e55 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
@@ -99,16 +99,17 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
             if (object.isAuth()) {
                 msgType |= FLAG_ALLOW_AUTH;
             }
-            int totalLength = 1 + 4 + 1 + 4 + 2 + 
endAttr.getBytes("utf8").length + 2;
+            byte[] attrData = endAttr.getBytes(StandardCharsets.UTF_8);
+            int totalLength = 1 + 4 + 1 + 4 + 2 + attrData.length + 2;
             buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
             buf.writeInt(totalLength);
             buf.writeByte(msgType);
             buf.writeInt((int) object.getDt());
             buf.writeByte(1);
             buf.writeInt(0);
-            buf.writeShort(endAttr.getBytes("utf8").length);
-            if (endAttr.getBytes("utf8").length > 0) {
-                buf.writeBytes(endAttr.getBytes("utf8"));
+            buf.writeShort(attrData.length);
+            if (attrData.length > 0) {
+                buf.writeBytes(attrData);
             }
             buf.writeShort(0xee01);
         } catch (Throwable ex) {
@@ -160,7 +161,8 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
             if (object.isCompress()) {
                 msgType |= FLAG_ALLOW_COMPRESS;
             }
-            totalLength = totalLength + body.length + 
endAttr.getBytes("utf8").length;
+            byte[] attrData = endAttr.getBytes(StandardCharsets.UTF_8);
+            totalLength = totalLength + body.length + attrData.length;
             buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
             buf.writeInt(totalLength);
             buf.writeByte(msgType);
@@ -181,8 +183,8 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
             buf.writeInt(body.length);
             buf.writeBytes(body);
 
-            buf.writeShort(endAttr.getBytes("utf8").length);
-            buf.writeBytes(endAttr.getBytes("utf8"));
+            buf.writeShort(attrData.length);
+            buf.writeBytes(attrData);
             buf.writeShort(0xee01);
         }
         return buf;
@@ -207,7 +209,7 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
                     ByteArrayOutputStream data = new ByteArrayOutputStream();
                     for (byte[] entry : object.getBodylist()) {
                         if (totalCnt++ > 0) {
-                            data.write("\n".getBytes("utf8"));
+                            
data.write(AttributeConstants.LINE_FEED_SEP.getBytes(StandardCharsets.UTF_8));
                         }
                         data.write(entry);
                     }
@@ -280,14 +282,15 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
                 if (object.isEncrypt()) {
                     msgType |= FLAG_ALLOW_ENCRYPT;
                 }
-                totalLength = totalLength + body.length + 
msgAttrs.getBytes("utf8").length;
+                byte[] attrData = msgAttrs.getBytes(StandardCharsets.UTF_8);
+                totalLength = totalLength + body.length + attrData.length;
                 buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
                 buf.writeInt(totalLength);
                 buf.writeByte(msgType);
                 buf.writeInt(body.length);
                 buf.writeBytes(body);
-                buf.writeInt(msgAttrs.getBytes("utf8").length);
-                buf.writeBytes(msgAttrs.getBytes("utf8"));
+                buf.writeInt(attrData.length);
+                buf.writeBytes(attrData);
             }
         } catch (Throwable ex) {
             if (exptCounter.shouldPrint()) {
@@ -344,14 +347,15 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
                 if (object.isEncrypt()) {
                     msgType |= FLAG_ALLOW_ENCRYPT;
                 }
-                totalLength = totalLength + body.length + 
msgAttrs.getBytes("utf8").length;
+                byte[] attrData = msgAttrs.getBytes(StandardCharsets.UTF_8);
+                totalLength = totalLength + body.length + attrData.length;
                 buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
                 buf.writeInt(totalLength);
                 buf.writeByte(msgType);
                 buf.writeInt(body.length);
                 buf.writeBytes(body);
-                buf.writeInt(msgAttrs.getBytes("utf8").length);
-                buf.writeBytes(msgAttrs.getBytes("utf8"));
+                buf.writeInt(attrData.length);
+                buf.writeBytes(attrData);
             }
         } catch (Throwable ex) {
             if (exptCounter.shouldPrint()) {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java
index adab601e0a..f336702ee4 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java
@@ -18,8 +18,8 @@
 package org.apache.inlong.sdk.dataproxy.common;
 
 public enum SendResult {
-    INVALID_ATTRIBUTES, // including DataProxyErrCode(100,101,102,112)
     OK,
+    INVALID_ATTRIBUTES, // including DataProxyErrCode(100,101,102,112)
     TIMEOUT,
     CONNECTION_BREAK,
     THREAD_INTERRUPT,
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
index 6acfe09d8a..47f6cd1ed7 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import java.net.URLEncoder;
 import java.security.interfaces.RSAPublicKey;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class EncryptConfigEntry implements java.io.Serializable {
@@ -122,7 +123,7 @@ public class EncryptConfigEntry implements 
java.io.Serializable {
 
     @Override
     public boolean equals(Object other) {
-        if (other == null || !(other instanceof EncryptConfigEntry)) {
+        if (!(other instanceof EncryptConfigEntry)) {
             return false;
         }
         if (other == this) {
@@ -131,7 +132,7 @@ public class EncryptConfigEntry implements 
java.io.Serializable {
         EncryptConfigEntry info = (EncryptConfigEntry) other;
         return (this.userName.equals(info.getUserName()))
                 && (this.version.equals(info.getVersion()))
-                && (this.pubKey == info.getPubKey());
+                && (Objects.equals(this.pubKey, info.getPubKey()));
     }
 
     public String toString() {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/HostInfo.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/HostInfo.java
index c93c872ea5..071cf4b6eb 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/HostInfo.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/HostInfo.java
@@ -25,10 +25,10 @@ public class HostInfo implements Comparable<HostInfo>, 
java.io.Serializable {
     private final String hostName;
     private final int portNumber;
 
-    public HostInfo(String referenceName, String hostName, int portNumber) {
-        this.referenceName = referenceName;
+    public HostInfo(String hostName, int portNumber) {
         this.hostName = hostName;
         this.portNumber = portNumber;
+        this.referenceName = hostName + ":" + portNumber;
     }
 
     public String getReferenceName() {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index b10e533037..d259cdff08 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -35,7 +35,6 @@ import com.google.gson.stream.JsonReader;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.Header;
@@ -91,11 +90,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 public class ProxyConfigManager extends Thread {
 
     public static final String APPLICATION_JSON = "application/json";
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProxyConfigManager.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(ProxyConfigManager.class);
     private final ProxyClientConfig clientConfig;
     private final ClientMgr clientManager;
     private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
-    private final JsonParser jsonParser = new JsonParser();
     private final Gson gson = new Gson();
     private final HashRing hashRing = HashRing.getInstance();
     private List<HostInfo> proxyInfoList = new ArrayList<HostInfo>();
@@ -104,8 +102,8 @@ public class ProxyConfigManager extends Thread {
     private String inlongGroupId;
     private String localMd5;
     private boolean bShutDown = false;
-    private long doworkTime = 0;
-    private EncryptConfigEntry userEncryConfigEntry;
+    private long lstUpdatedTime = 0;
+    private EncryptConfigEntry userEncryptConfigEntry;
 
     public ProxyConfigManager(final ProxyClientConfig configure, final 
ClientMgr clientManager) {
         this.clientConfig = configure;
@@ -122,7 +120,7 @@ public class ProxyConfigManager extends Thread {
     }
 
     public void shutDown() {
-        LOGGER.info("Begin to shut down ProxyConfigManager!");
+        logger.info("Begin to shut down ProxyConfigManager!");
         bShutDown = true;
     }
 
@@ -132,9 +130,9 @@ public class ProxyConfigManager extends Thread {
             try {
                 doProxyEntryQueryWork();
                 updateEncryptConfigEntry();
-                LOGGER.info("ProxyConf update!");
+                logger.info("ProxyConf update!");
             } catch (Throwable e) {
-                LOGGER.error("Refresh proxy ip list runs into exception {}, 
{}", e.toString(), e.getStackTrace());
+                logger.error("Refresh proxy ip list runs into exception {}, 
{}", e.toString(), e.getStackTrace());
                 e.printStackTrace();
             }
 
@@ -147,13 +145,13 @@ public class ProxyConfigManager extends Thread {
                 if (proxyUpdateIntervalSec > 5) {
                     sleepTimeSec = proxyUpdateIntervalSec + random.nextInt() % 
(proxyUpdateIntervalSec / 5);
                 }
-                LOGGER.info("sleep time {}", sleepTimeSec);
+                logger.info("sleep time {}", sleepTimeSec);
                 Thread.sleep(sleepTimeSec * 1000);
             } catch (Throwable e2) {
                 //
             }
         }
-        LOGGER.info("ProxyConfigManager worker existed!");
+        logger.info("ProxyConfigManager worker existed!");
     }
 
     /**
@@ -170,11 +168,11 @@ public class ProxyConfigManager extends Thread {
             if (diffTime < clientConfig.getMaxProxyCacheTimeInMs()) {
                 JsonReader reader = new JsonReader(new 
FileReader(configCachePath));
                 ProxyConfigEntry proxyConfigEntry = gson.fromJson(reader, 
ProxyConfigEntry.class);
-                LOGGER.info("{} has a backup! {}", inlongGroupId, 
proxyConfigEntry);
+                logger.info("{} has a backup! {}", inlongGroupId, 
proxyConfigEntry);
                 return proxyConfigEntry;
             }
         } catch (Exception ex) {
-            LOGGER.warn("try to read local cache, caught {}", ex.getMessage());
+            logger.warn("try to read local cache, caught {}", ex.getMessage());
         } finally {
             rw.readLock().unlock();
         }
@@ -189,13 +187,13 @@ public class ProxyConfigManager extends Thread {
                 // try to create parent
                 file.getParentFile().mkdirs();
             }
-            LOGGER.info("try to write {}} to local cache {}", entry, 
configCachePath);
+            logger.info("try to write {}} to local cache {}", entry, 
configCachePath);
             FileWriter fileWriter = new FileWriter(configCachePath);
             gson.toJson(entry, fileWriter);
             fileWriter.flush();
             fileWriter.close();
         } catch (Exception ex) {
-            LOGGER.warn("try to write local cache, caught {}", 
ex.getMessage());
+            logger.warn("try to write local cache, caught {}", 
ex.getMessage());
         } finally {
             rw.writeLock().unlock();
         }
@@ -205,7 +203,7 @@ public class ProxyConfigManager extends Thread {
         try {
             return requestProxyList(this.clientConfig.getManagerUrl());
         } catch (Exception e) {
-            LOGGER.warn("try to request proxy list by http, caught {}", 
e.getMessage());
+            logger.warn("try to request proxy list by http, caught {}", 
e.getMessage());
         }
         return null;
     }
@@ -280,7 +278,7 @@ public class ProxyConfigManager extends Thread {
             }
             /* We should exit if no local IP list and can't request it from 
manager. */
             if (localMd5 == null && proxyEntry == null) {
-                LOGGER.error("Can't connect manager at the start of proxy API 
{}",
+                logger.error("Can't connect manager at the start of proxy API 
{}",
                         this.clientConfig.getManagerUrl());
                 proxyEntry = tryToReadCacheProxyEntry(configAddr);
             }
@@ -290,7 +288,7 @@ public class ProxyConfigManager extends Thread {
                     
s.append(tmp.getHostName()).append(";").append(tmp.getPortNumber())
                             .append(",");
                 }
-                LOGGER.warn("Backup proxyEntry [{}]", s);
+                logger.warn("Backup proxyEntry [{}]", s);
             }
         }
         if (localMd5 == null && proxyEntry == null && proxyInfoList == null) {
@@ -312,7 +310,7 @@ public class ProxyConfigManager extends Thread {
      */
     private void compareProxyList(ProxyConfigEntry proxyEntry) {
         if (proxyEntry != null) {
-            LOGGER.info("{}", proxyEntry.toString());
+            logger.info("{}", proxyEntry.toString());
             if (proxyEntry.getSize() != 0) {
                 /* Initialize the current proxy information list first. */
                 clientManager.setLoadThreshold(proxyEntry.getLoad());
@@ -326,31 +324,31 @@ public class ProxyConfigManager extends Thread {
                 String oldMd5 = calcHostInfoMd5(proxyInfoList);
                 if (newMd5 != null && !newMd5.equals(oldMd5)) {
                     /* Choose random alive connections to send messages. */
-                    LOGGER.info("old md5 {} new md5 {}", oldMd5, newMd5);
+                    logger.info("old md5 {} new md5 {}", oldMd5, newMd5);
                     proxyInfoList.clear();
                     proxyInfoList = newProxyInfoList;
                     clientManager.setProxyInfoList(proxyInfoList);
-                    doworkTime = System.currentTimeMillis();
+                    lstUpdatedTime = System.currentTimeMillis();
                 } else if (proxyEntry.getSwitchStat() != oldStat) {
                     /* judge cluster's switch state */
                     oldStat = proxyEntry.getSwitchStat();
-                    if ((System.currentTimeMillis() - doworkTime) > 3 * 60 * 
1000) {
-                        LOGGER.info("switch the cluster!");
+                    if ((System.currentTimeMillis() - lstUpdatedTime) > 3 * 60 
* 1000) {
+                        logger.info("switch the cluster!");
                         proxyInfoList.clear();
                         proxyInfoList = newProxyInfoList;
                         clientManager.setProxyInfoList(proxyInfoList);
                     } else {
-                        LOGGER.info("only change oldStat ");
+                        logger.info("only change oldStat ");
                     }
                 } else {
                     newProxyInfoList.clear();
-                    LOGGER.info("proxy IP list doesn't change, load {}", 
proxyEntry.getLoad());
+                    logger.info("proxy IP list doesn't change, load {}", 
proxyEntry.getLoad());
                 }
                 if (clientConfig.getLoadBalance() == 
LoadBalance.CONSISTENCY_HASH) {
                     updateHashRing(proxyInfoList);
                 }
             } else {
-                LOGGER.error("proxyEntry's size is zero");
+                logger.error("proxyEntry's size is zero");
             }
         }
     }
@@ -359,7 +357,7 @@ public class ProxyConfigManager extends Thread {
         if (StringUtils.isBlank(userName)) {
             return null;
         }
-        EncryptConfigEntry encryptEntry = this.userEncryConfigEntry;
+        EncryptConfigEntry encryptEntry = this.userEncryptConfigEntry;
         if (encryptEntry == null) {
             int retryCount = 0;
             encryptEntry = requestPubKey(this.clientConfig.getRsaPubKeyUrl(), 
userName, false);
@@ -372,21 +370,21 @@ public class ProxyConfigManager extends Thread {
                 if (encryptEntry != null) {
                     encryptEntry.getRsaEncryptedKey();
                     synchronized (this) {
-                        if (this.userEncryConfigEntry == null) {
-                            this.userEncryConfigEntry = encryptEntry;
+                        if (this.userEncryptConfigEntry == null) {
+                            this.userEncryptConfigEntry = encryptEntry;
                         } else {
-                            encryptEntry = this.userEncryConfigEntry;
+                            encryptEntry = this.userEncryptConfigEntry;
                         }
                     }
                 }
             } else {
                 synchronized (this) {
-                    if (this.userEncryConfigEntry == null || 
this.userEncryConfigEntry != encryptEntry) {
+                    if (this.userEncryptConfigEntry == null || 
this.userEncryptConfigEntry != encryptEntry) {
                         storePubKeyEntry(encryptEntry);
                         encryptEntry.getRsaEncryptedKey();
-                        this.userEncryConfigEntry = encryptEntry;
+                        this.userEncryptConfigEntry = encryptEntry;
                     } else {
-                        encryptEntry = this.userEncryConfigEntry;
+                        encryptEntry = this.userEncryptConfigEntry;
                     }
                 }
             }
@@ -410,10 +408,10 @@ public class ProxyConfigManager extends Thread {
             return;
         }
         synchronized (this) {
-            if (this.userEncryConfigEntry == null || this.userEncryConfigEntry 
!= encryptConfigEntry) {
+            if (this.userEncryptConfigEntry == null || 
this.userEncryptConfigEntry != encryptConfigEntry) {
                 storePubKeyEntry(encryptConfigEntry);
                 encryptConfigEntry.getRsaEncryptedKey();
-                this.userEncryConfigEntry = encryptConfigEntry;
+                this.userEncryptConfigEntry = encryptConfigEntry;
             }
         }
         return;
@@ -421,7 +419,7 @@ public class ProxyConfigManager extends Thread {
 
     private EncryptConfigEntry getStoredPubKeyEntry(String userName) {
         if (StringUtils.isBlank(userName)) {
-            LOGGER.warn(" userName(" + userName + ") is not available");
+            logger.warn(" userName(" + userName + ") is not available");
             return null;
         }
         EncryptConfigEntry entry;
@@ -441,7 +439,7 @@ public class ProxyConfigManager extends Thread {
                 return null;
             }
         } catch (Throwable e1) {
-            LOGGER.error("Read " + userName + " stored PubKeyEntry error ", 
e1);
+            logger.error("Read " + userName + " stored PubKeyEntry error ", 
e1);
             return null;
         } finally {
             if (fis != null) {
@@ -473,7 +471,7 @@ public class ProxyConfigManager extends Thread {
             p.flush();
             // p.close();
         } catch (Throwable e) {
-            LOGGER.error("store EncryptConfigEntry " + entry.toString() + " 
exception ", e);
+            logger.error("store EncryptConfigEntry " + entry.toString() + " 
exception ", e);
             e.printStackTrace();
         } finally {
             if (fos != null) {
@@ -508,7 +506,7 @@ public class ProxyConfigManager extends Thread {
 
     private EncryptConfigEntry requestPubKey(String pubKeyUrl, String 
userName, boolean needGet) {
         if (StringUtils.isBlank(userName)) {
-            LOGGER.error("Queried userName is null!");
+            logger.error("Queried userName is null!");
             return null;
         }
         List<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
@@ -516,26 +514,26 @@ public class ProxyConfigManager extends Thread {
         params.add(new BasicNameValuePair("username", userName));
         String returnStr = requestConfiguration(pubKeyUrl, params);
         if (StringUtils.isBlank(returnStr)) {
-            LOGGER.info("No public key information returned from manager");
+            logger.info("No public key information returned from manager");
             return null;
         }
-        JsonObject pubKeyConf = jsonParser.parse(returnStr).getAsJsonObject();
+        JsonObject pubKeyConf = 
JsonParser.parseString(returnStr).getAsJsonObject();
         if (pubKeyConf == null) {
-            LOGGER.info("No public key information returned from manager");
+            logger.info("No public key information returned from manager");
             return null;
         }
         if (!pubKeyConf.has("resultCode")) {
-            LOGGER.info("Parse pubKeyConf failure: No resultCode key 
information returned from manager");
+            logger.info("Parse pubKeyConf failure: No resultCode key 
information returned from manager");
             return null;
         }
         int resultCode = pubKeyConf.get("resultCode").getAsInt();
         if (resultCode != 0) {
-            LOGGER.info("query pubKeyConf failure, error code is " + 
resultCode + ", errInfo is "
+            logger.info("query pubKeyConf failure, error code is " + 
resultCode + ", errInfo is "
                     + pubKeyConf.get("message").getAsString());
             return null;
         }
         if (!pubKeyConf.has("resultData")) {
-            LOGGER.info("Parse pubKeyConf failure: No resultData key 
information returned from manager");
+            logger.info("Parse pubKeyConf failure: No resultData key 
information returned from manager");
             return null;
         }
         JsonObject resultData = pubKeyConf.get("resultData").getAsJsonObject();
@@ -566,7 +564,7 @@ public class ProxyConfigManager extends Thread {
             throw new Exception("Read local proxyList File failure by " + 
filePath + ", reason is " + e.getCause());
         }
         if (ObjectUtils.isEmpty(proxyCluster)) {
-            LOGGER.warn("no proxyCluster configure from local file");
+            logger.warn("no proxyCluster configure from local file");
             return null;
         }
 
@@ -591,7 +589,7 @@ public class ProxyConfigManager extends Thread {
         ArrayList<BasicNameValuePair> params = new 
ArrayList<BasicNameValuePair>();
         params.add(new BasicNameValuePair("ip", IpUtils.getLocalIp()));
         params.add(new BasicNameValuePair("protocolType", 
clientConfig.getProtocolType()));
-        LOGGER.info("Begin to get configure from manager {}, param is {}", 
url, params);
+        logger.info("Begin to get configure from manager {}, param is {}", 
url, params);
 
         String resultStr = requestConfiguration(url, params);
         ProxyClusterConfig clusterConfig = gson.fromJson(resultStr, 
ProxyClusterConfig.class);
@@ -606,7 +604,7 @@ public class ProxyConfigManager extends Thread {
     private ProxyConfigEntry getProxyConfigEntry(DataProxyNodeResponse 
proxyCluster) {
         List<DataProxyNodeInfo> nodeList = proxyCluster.getNodeList();
         if (CollectionUtils.isEmpty(nodeList)) {
-            LOGGER.error("dataproxy nodeList is empty in 
DataProxyNodeResponse!");
+            logger.error("dataproxy nodeList is empty in 
DataProxyNodeResponse!");
             return null;
         }
         Map<String, HostInfo> hostMap = formatHostInfoMap(nodeList);
@@ -644,133 +642,73 @@ public class ProxyConfigManager extends Thread {
     }
 
     private Map<String, HostInfo> formatHostInfoMap(List<DataProxyNodeInfo> 
nodeList) {
+        HostInfo tmpHostInfo;
         Map<String, HostInfo> hostMap = new HashMap<>();
         for (DataProxyNodeInfo proxy : nodeList) {
             if (ObjectUtils.isEmpty(proxy.getId()) || 
StringUtils.isEmpty(proxy.getIp()) || ObjectUtils
                     .isEmpty(proxy.getPort()) || proxy.getPort() < 0) {
-                LOGGER.error("invalid proxy node, id:{}, ip:{}, port:{}", 
proxy.getId(), proxy.getIp(),
+                logger.error("invalid proxy node, id:{}, ip:{}, port:{}", 
proxy.getId(), proxy.getIp(),
                         proxy.getPort());
                 continue;
             }
-            String refId = proxy.getIp() + ":" + proxy.getPort();
-            hostMap.put(refId, new HostInfo(refId, proxy.getIp(), 
proxy.getPort()));
+            tmpHostInfo = new HostInfo(proxy.getIp(), proxy.getPort());
+            hostMap.put(tmpHostInfo.getReferenceName(), tmpHostInfo);
 
         }
         if (hostMap.isEmpty()) {
-            LOGGER.error("Parse proxyList failure: address is empty for 
response from manager!");
+            logger.error("Parse proxyList failure: address is empty for 
response from manager!");
             return null;
         }
         return hostMap;
     }
 
-    private String updateUrl(String url, int tryIdx, String 
localManagerIpList) {
-        if (tryIdx == 0) {
-            return url;
-        }
-
-        int headerIdx = url.indexOf("://");
-        if (headerIdx == -1) {
-            return null;
-        }
-        String header = "";
-        header = url.substring(0, headerIdx + 3);
-        String tmpUrl = url.substring(headerIdx + 3);
-        int tailerIdx = tmpUrl.indexOf("/");
-        if (tailerIdx == -1) {
-            return null;
-        }
-        String tailer = "";
-        tailer = tmpUrl.substring(tailerIdx);
-        String[] managerIps = localManagerIpList.split(",");
-        String currentManagerIp = "";
-        int idx = 1;
-        for (String managerIp : managerIps) {
-            if (idx++ == tryIdx) {
-                currentManagerIp = managerIp;
-                break;
-            }
-        }
-        if (!currentManagerIp.equals("")) {
-            return header + currentManagerIp + ":" + 
clientConfig.getManagerPort() + tailer;
-        }
-        return null;
-    }
-
     /* Request new configurations from Manager. */
     private String requestConfiguration(String url, List<BasicNameValuePair> 
params) {
         if (StringUtils.isBlank(url)) {
-            LOGGER.error("request url is null");
+            logger.error("request url is null");
             return null;
         }
-        // get local managerIpList
-        String localManagerIps = "";
-        int tryIdx = 0;
-        while (true) {
-            HttpPost httpPost = null;
-            String returnStr = null;
-            HttpParams myParams = new BasicHttpParams();
-            HttpConnectionParams.setConnectionTimeout(myParams, 10000);
-            HttpConnectionParams.setSoTimeout(myParams, 
clientConfig.getManagerSocketTimeout());
-            CloseableHttpClient httpClient = null;
-            if (this.clientConfig.isRequestByHttp()) {
-                httpClient = new DefaultHttpClient(myParams);
-            } else {
-                try {
-                    httpClient = getCloseableHttpClient(params);
-                } catch (Throwable eHttps) {
-                    LOGGER.error("Create Https cliet failure, error 1 is ", 
eHttps);
-                    eHttps.printStackTrace();
-                    return null;
-                }
-            }
-
-            if (!clientConfig.isEnableSaveManagerVIps() && tryIdx > 0) {
+        HttpPost httpPost = null;
+        HttpParams myParams = new BasicHttpParams();
+        HttpConnectionParams.setConnectionTimeout(myParams, 10000);
+        HttpConnectionParams.setSoTimeout(myParams, 
clientConfig.getManagerSocketTimeout());
+        CloseableHttpClient httpClient;
+        if (this.clientConfig.isRequestByHttp()) {
+            httpClient = new DefaultHttpClient(myParams);
+        } else {
+            try {
+                httpClient = getCloseableHttpClient(params);
+            } catch (Throwable eHttps) {
+                logger.error("Create Https cliet failure, error 1 is ", 
eHttps);
+                eHttps.printStackTrace();
                 return null;
             }
-            // change url's manager host port when occur error
-            url = updateUrl(url, tryIdx, localManagerIps);
-            if (url == null) {
-                return null;
+        }
+        logger.info("Request url : {}, params : {}", url, params);
+        try {
+            httpPost = new HttpPost(url);
+            httpPost.addHeader(BasicAuth.BASIC_AUTH_HEADER,
+                    
BasicAuth.genBasicAuthCredential(clientConfig.getAuthSecretId(),
+                            clientConfig.getAuthSecretKey()));
+            UrlEncodedFormEntity urlEncodedFormEntity = new 
UrlEncodedFormEntity(params, "UTF-8");
+            httpPost.setEntity(urlEncodedFormEntity);
+            HttpResponse response = httpClient.execute(httpPost);
+            String returnStr = EntityUtils.toString(response.getEntity());
+            if (StringUtils.isNotBlank(returnStr)
+                    && response.getStatusLine().getStatusCode() == 
HttpStatus.SC_OK) {
+                logger.info("Get configure from manager is {}", returnStr);
+                return returnStr;
             }
-            tryIdx++;
-
-            LOGGER.info("Request url : " + url + ", localManagerIps : " + 
localManagerIps);
-            try {
-                httpPost = new HttpPost(url);
-                httpPost.addHeader(BasicAuth.BASIC_AUTH_HEADER,
-                        
BasicAuth.genBasicAuthCredential(clientConfig.getAuthSecretId(),
-                                clientConfig.getAuthSecretKey()));
-                UrlEncodedFormEntity urlEncodedFormEntity = new 
UrlEncodedFormEntity(params, "UTF-8");
-                httpPost.setEntity(urlEncodedFormEntity);
-                HttpResponse response = httpClient.execute(httpPost);
-                returnStr = EntityUtils.toString(response.getEntity());
-                if (StringUtils.isNotBlank(returnStr)
-                        && response.getStatusLine().getStatusCode() == 
HttpStatus.SC_OK) {
-                    LOGGER.info("Get configure from manager is " + returnStr);
-                    return returnStr;
-                }
-
-                if (!clientConfig.isRequestByHttp()) {
-                    return null;
-                }
-            } catch (Throwable e) {
-                LOGGER.error("Connect Manager error, message: {}, url is {}", 
e.getMessage(), url);
-
-                if (!clientConfig.isRequestByHttp()) {
-                    return null;
-                }
-                // get localManagerIps
-                localManagerIps = getLocalManagerIps();
-                if (localManagerIps == null) {
-                    return null;
-                }
-            } finally {
-                if (httpPost != null) {
-                    httpPost.releaseConnection();
-                }
-                if (httpClient != null) {
-                    httpClient.getConnectionManager().shutdown();
-                }
+            return null;
+        } catch (Throwable e) {
+            logger.error("Connect Manager error, message: {}, url is {}", 
e.getMessage(), url);
+            return null;
+        } finally {
+            if (httpPost != null) {
+                httpPost.releaseConnection();
+            }
+            if (httpClient != null) {
+                httpClient.getConnectionManager().shutdown();
             }
         }
     }
@@ -803,36 +741,8 @@ public class ProxyConfigManager extends Thread {
         return httpClient;
     }
 
-    private String getLocalManagerIps() {
-        String localManagerIps;
-        try {
-            File localManagerIpsFile = new 
File(clientConfig.getManagerIpLocalPath());
-            if (localManagerIpsFile.exists()) {
-                byte[] serialized;
-                serialized = 
FileUtils.readFileToByteArray(localManagerIpsFile);
-                if (serialized == null) {
-                    LOGGER.error("Local managerIp file is empty, file path : "
-                            + clientConfig.getManagerIpLocalPath());
-                    return null;
-                }
-                localManagerIps = new String(serialized, "UTF-8");
-            } else {
-                if (!localManagerIpsFile.getParentFile().exists()) {
-                    localManagerIpsFile.getParentFile().mkdirs();
-                }
-                localManagerIps = "";
-                LOGGER.error("Get local managerIpList not exist, file path : "
-                        + clientConfig.getManagerIpLocalPath());
-            }
-        } catch (Throwable t) {
-            localManagerIps = "";
-            LOGGER.error("Get local managerIpList occur exception,", t);
-        }
-        return localManagerIps;
-    }
-
     public void updateHashRing(List<HostInfo> newHosts) {
         this.hashRing.updateNode(newHosts);
-        LOGGER.debug("update hash ring {}", 
hashRing.getVirtualNode2RealNode());
+        logger.debug("update hash ring {}", 
hashRing.getVirtualNode2RealNode());
     }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
index 8e00ff8c3e..2f75226925 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SyncMessageCallable.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.dataproxy.network;
 
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
 
 import io.netty.channel.ChannelFuture;
 import org.slf4j.Logger;
@@ -30,8 +31,8 @@ import java.util.concurrent.TimeUnit;
 
 public class SyncMessageCallable implements Callable<SendResult> {
 
-    private static final Logger logger = LoggerFactory
-            .getLogger(SyncMessageCallable.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(SyncMessageCallable.class);
+    private static final LogCounter exptCnt = new LogCounter(10, 100000, 60 * 
1000L);
 
     private final NettyClient client;
     private final CountDownLatch awaitLatch = new CountDownLatch(1);
@@ -55,13 +56,13 @@ public class SyncMessageCallable implements 
Callable<SendResult> {
     }
 
     public SendResult call() throws Exception {
-        // TODO Auto-generated method stub
         try {
             ChannelFuture channelFuture = client.write(encodeObject);
             awaitLatch.await(timeout, timeUnit);
-        } catch (Exception e) {
-            logger.error("SendResult call", e);
-            e.printStackTrace();
+        } catch (Throwable ex) {
+            if (exptCnt.shouldPrint()) {
+                logger.warn("SyncMessageCallable write data throw exception", 
ex);
+            }
             return SendResult.UNKOWN_ERROR;
         }
         return message;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/TimeScanObject.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/TimeScanObject.java
index 01d36c23fb..19291e4336 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/TimeScanObject.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/TimeScanObject.java
@@ -22,8 +22,8 @@ import java.util.concurrent.atomic.AtomicLong;
 
 public class TimeScanObject {
 
-    private AtomicInteger count = new AtomicInteger(0);
-    private AtomicLong time = new AtomicLong(0);
+    private final AtomicInteger count = new AtomicInteger(0);
+    private final AtomicLong time = new AtomicLong(0);
 
     public TimeScanObject() {
         this.count.set(0);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/ManagerFetcherThread.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/ManagerFetcherThread.java
deleted file mode 100644
index 1424c3cb8f..0000000000
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/ManagerFetcherThread.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.threads;
-
-import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.utils.ServiceDiscoveryUtils;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * manager fetch thread
- */
-public class ManagerFetcherThread extends Thread {
-
-    private final Logger logger = 
LoggerFactory.getLogger(ManagerFetcherThread.class);
-    private volatile boolean isShutdown;
-    private final ProxyClientConfig proxyClientConfig;
-
-    public ManagerFetcherThread(ProxyClientConfig proxyClientConfig) {
-        isShutdown = false;
-        this.proxyClientConfig = proxyClientConfig;
-        this.setDaemon(true);
-        this.setName("ManagerFetcherThread");
-    }
-
-    public void shutdown() {
-        logger.info("Begin to shutdown ManagerFetcherThread.");
-        isShutdown = true;
-    }
-
-    @Override
-    public void run() {
-        logger.info("ManagerFetcherThread Thread=" + 
Thread.currentThread().getId() + " started !");
-        while (!isShutdown) {
-            try {
-                String managerIpList = 
ServiceDiscoveryUtils.getManagerIpList(proxyClientConfig);
-                if (StringUtils.isBlank(managerIpList)) {
-                    logger.error("ManagerFetcher get managerIpList is blank.");
-                } else {
-                    
ServiceDiscoveryUtils.updateManagerInfo2Local(managerIpList,
-                            proxyClientConfig.getManagerIpLocalPath());
-                }
-                TimeUnit.MILLISECONDS.sleep((long) 
proxyClientConfig.getProxyUpdateIntervalMinutes() * 60 * 1000);
-            } catch (Throwable e) {
-                logger.error("ManagerFetcher get or save managerIpList occur 
error,", e);
-            }
-        }
-    }
-}
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/EventLoopUtil.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/EventLoopUtil.java
index bf26d3fc59..eb99c43b9e 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/EventLoopUtil.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/EventLoopUtil.java
@@ -53,12 +53,11 @@ public class EventLoopUtil {
         } else if (!enableBusyWait) {
             return new EpollEventLoopGroup(nThreads, threadFactory);
         } else {
-            EpollEventLoopGroup eventLoopGroup = new 
EpollEventLoopGroup(nThreads, threadFactory, () -> {
+            return new EpollEventLoopGroup(nThreads, threadFactory, () -> {
                 return (selectSupplier, hasTasks) -> {
                     return -3;
                 };
             });
-            return eventLoopGroup;
         }
     }
 
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ServiceDiscoveryUtils.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ServiceDiscoveryUtils.java
deleted file mode 100644
index 6352e25308..0000000000
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ServiceDiscoveryUtils.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.utils;
-
-import org.apache.inlong.common.util.BasicAuth;
-import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.network.IpUtils;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.http.Header;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.message.BasicHeader;
-import org.apache.http.message.BasicNameValuePair;
-import org.apache.http.params.BasicHttpParams;
-import org.apache.http.params.HttpConnectionParams;
-import org.apache.http.params.HttpParams;
-import org.apache.http.ssl.SSLContexts;
-import org.apache.http.util.EntityUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLContext;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.StandardCharsets;
-import java.security.SecureRandom;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Utils for service discovery
- */
-public class ServiceDiscoveryUtils {
-
-    private static final Logger log = 
LoggerFactory.getLogger(ServiceDiscoveryUtils.class);
-
-    private static final String GET_MANAGER_IP_LIST_API = 
"/inlong/manager/openapi/agent/getManagerIpList";
-    private static String latestManagerIPList = "";
-    private static String arraySed = ",";
-
-    /**
-     * Get Inlong-Manager IP list from the given proxy client config
-     */
-    public static String getManagerIpList(ProxyClientConfig clientConfig) {
-        String managerAddress = clientConfig.getManagerAddress();
-        if (StringUtils.isBlank(managerAddress)) {
-            log.error("ServiceDiscovery get managerIpList but managerAddress 
is blank, just return");
-            return null;
-        }
-
-        String managerIpList = getManagerIpListByHttp(managerAddress, 
clientConfig);
-        if (StringUtils.isNotBlank(managerIpList)) {
-            latestManagerIPList = managerIpList;
-            return managerIpList;
-        }
-
-        log.error("ServiceDiscovery get managerIpList from {} occur error, try 
to get from latestManagerIPList",
-                managerAddress);
-
-        String[] managerIps = latestManagerIPList.split(arraySed);
-        if (managerIps.length > 0) {
-            for (String managerIp : managerIps) {
-                if (StringUtils.isBlank(managerIp)) {
-                    log.error("ServiceDiscovery managerIp is null, 
latestManagerIPList is {}", latestManagerIPList);
-                    continue;
-                }
-
-                String currentAddress = managerIp + ":" + 
clientConfig.getManagerPort();
-                managerIpList = getManagerIpListByHttp(currentAddress, 
clientConfig);
-                if (StringUtils.isBlank(managerIpList)) {
-                    log.error("ServiceDiscovery get latestManagerIPList from 
{} but got nothing, will try next ip",
-                            managerIp);
-                    continue;
-                }
-                latestManagerIPList = managerIpList;
-                return managerIpList;
-            }
-        } else {
-            log.error("ServiceDiscovery latestManagerIpList {} format error, 
or not contain ip", latestManagerIPList);
-        }
-
-        String existedIpList = 
getLocalManagerIpList(clientConfig.getManagerIpLocalPath());
-        if (StringUtils.isNotBlank(existedIpList)) {
-            String[] existedIps = existedIpList.split(arraySed);
-            if (existedIps.length > 0) {
-                for (String existedIp : existedIps) {
-                    if (StringUtils.isBlank(existedIp)) {
-                        log.error("ServiceDiscovery get illegal format ipList 
from local file, "
-                                + "exist ip is empty, managerIpList is {}, 
local file is {}",
-                                existedIpList, 
clientConfig.getManagerIpLocalPath());
-                        continue;
-                    }
-
-                    String currentAddress = existedIp + ":" + 
clientConfig.getManagerPort();
-                    managerIpList = getManagerIpListByHttp(currentAddress, 
clientConfig);
-                    if (StringUtils.isBlank(managerIpList)) {
-                        log.error("ServiceDiscovery get {} from local file {} 
but got nothing, will try next ip",
-                                existedIp, 
clientConfig.getManagerIpLocalPath());
-                        continue;
-                    }
-                    latestManagerIPList = managerIpList;
-                    return managerIpList;
-                }
-            } else {
-                log.error("ServiceDiscovery get illegal format ipList from 
local file, "
-                        + "exist ip is empty, managerIpList is {}, local file 
is {}",
-                        existedIpList, clientConfig.getManagerIpLocalPath());
-            }
-        } else {
-            log.error("ServiceDiscovery get empty ipList from local file {}", 
clientConfig.getManagerIpLocalPath());
-        }
-
-        return managerIpList;
-    }
-
-    /**
-     * Get Inlong-Manager IP list from the given managerIp and proxy client 
config
-     */
-    public static String getManagerIpListByHttp(String managerIp, 
ProxyClientConfig proxyClientConfig) {
-        String url = managerIp + GET_MANAGER_IP_LIST_API;
-        ArrayList<BasicNameValuePair> params = new 
ArrayList<BasicNameValuePair>();
-        params.add(new BasicNameValuePair("operation", "query"));
-        params.add(new BasicNameValuePair("username", 
proxyClientConfig.getUserName()));
-
-        log.info("Begin to get configure from manager {}, param is {}", url, 
params);
-        CloseableHttpClient httpClient;
-        HttpParams myParams = new BasicHttpParams();
-        HttpConnectionParams.setConnectionTimeout(myParams, 
proxyClientConfig.getManagerConnectionTimeout());
-        HttpConnectionParams.setSoTimeout(myParams, 
proxyClientConfig.getManagerSocketTimeout());
-        if (proxyClientConfig.isRequestByHttp()) {
-            httpClient = new DefaultHttpClient(myParams);
-        } else {
-            try {
-                ArrayList<Header> headers = new ArrayList<>();
-                for (BasicNameValuePair paramItem : params) {
-                    headers.add(new BasicHeader(paramItem.getName(), 
paramItem.getValue()));
-                }
-                RequestConfig requestConfig = RequestConfig.custom()
-                        
.setConnectTimeout(10000).setSocketTimeout(30000).build();
-                SSLContext sslContext = SSLContexts.custom().build();
-                SSLConnectionSocketFactory sslsf = new 
SSLConnectionSocketFactory(sslContext,
-                        new String[]{"TLSv1"}, null,
-                        
SSLConnectionSocketFactory.getDefaultHostnameVerifier());
-                httpClient = HttpClients.custom().setDefaultHeaders(headers)
-                        
.setDefaultRequestConfig(requestConfig).setSSLSocketFactory(sslsf).build();
-            } catch (Throwable t) {
-                log.error("Create Https client failed: ", t);
-                return null;
-            }
-        }
-
-        HttpPost httpPost = null;
-        try {
-            httpPost = new HttpPost(url);
-            if (proxyClientConfig.isNeedAuthentication()) {
-                long timestamp = System.currentTimeMillis();
-                int nonce = new 
SecureRandom(String.valueOf(timestamp).getBytes()).nextInt(Integer.MAX_VALUE);
-                httpPost.setHeader(BasicAuth.BASIC_AUTH_HEADER,
-                        
IpUtils.getAuthorizenInfo(proxyClientConfig.getUserName(),
-                                proxyClientConfig.getSecretKey(), timestamp, 
nonce));
-            }
-            httpPost.setEntity(new UrlEncodedFormEntity(params));
-            HttpResponse response = httpClient.execute(httpPost);
-            String returnStr = EntityUtils.toString(response.getEntity());
-            if (StringUtils.isNotBlank(returnStr) && 
response.getStatusLine().getStatusCode() == 200) {
-                log.info("Get configure from manager is " + returnStr);
-                JsonParser jsonParser = new JsonParser();
-                JsonObject jb = jsonParser.parse(returnStr).getAsJsonObject();
-                if (jb == null) {
-                    log.warn("ServiceDiscovery updated manager ip failed, 
returnStr = {} jb is "
-                            + "null ", returnStr, jb);
-                    return null;
-                }
-                JsonArray retData = jb.get("data").getAsJsonArray();
-                List<String> managerIpList = new ArrayList<>();
-                for (JsonElement datum : retData) {
-                    JsonObject record = datum.getAsJsonObject();
-                    managerIpList.add(record.get("ip").getAsString());
-                }
-                if (managerIpList.isEmpty()) {
-                    return null;
-                }
-                String strIPs = String.join(",", managerIpList);
-                log.info("ServiceDiscovery updated manager ip success, ip : " 
+ strIPs + ", retStr : " + returnStr);
-                return strIPs;
-            }
-            return null;
-        } catch (Throwable t) {
-            log.error("Connect Manager error: ", t);
-            return null;
-        } finally {
-            if (httpPost != null) {
-                httpPost.releaseConnection();
-            }
-            if (httpClient != null) {
-                httpClient.getConnectionManager().shutdown();
-            }
-        }
-    }
-
-    /**
-     * Get Inlong-Manager IP list from local path
-     */
-    public static String getLocalManagerIpList(String localPath) {
-        log.info("ServiceDiscovery start loading config from file {} ...", 
localPath);
-        String newestIp = null;
-        try {
-            File managerIpListFile = new File(localPath);
-            if (!managerIpListFile.exists()) {
-                log.info("ServiceDiscovery not found local groupIdInfo file 
from {}", localPath);
-                return null;
-            }
-            byte[] serialized = 
FileUtils.readFileToByteArray(managerIpListFile);
-            if (serialized == null) {
-                return null;
-            }
-            newestIp = new String(serialized, StandardCharsets.UTF_8);
-            log.info("ServiceDiscovery get manager ip list from local success, 
result is: {}", newestIp);
-        } catch (IOException e) {
-            log.error("ServiceDiscovery load manager config error: ", e);
-        }
-
-        return newestIp;
-    }
-
-    /**
-     * Update Inlong-Manager info to local file
-     */
-    public static void updateManagerInfo2Local(String storeString, String 
path) {
-        if (StringUtils.isBlank(storeString)) {
-            log.warn("ServiceDiscovery updateTdmInfo2Local error, configMap is 
empty or managerIpList is blank");
-            return;
-        }
-        File localPath = new File(path);
-        if (!localPath.getParentFile().exists()) {
-            localPath.getParentFile().mkdirs();
-        }
-
-        try (BufferedWriter writer = new BufferedWriter(
-                new OutputStreamWriter(new FileOutputStream(localPath), 
StandardCharsets.UTF_8))) {
-            writer.write(storeString);
-            writer.flush();
-        } catch (IOException e) {
-            log.error("ServiceDiscovery save manager config error: ", e);
-        }
-    }
-
-}

Reply via email to