This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ef33f5a5 [INLONG-5101][DataProxy] Optimize load balancing for 
DataProxy (#5595)
7ef33f5a5 is described below

commit 7ef33f5a586433a4a7ac3f1f1f1e4e28250a7a7d
Author: rhizoma-atractylodis 
<[email protected]>
AuthorDate: Fri Sep 2 14:17:26 2022 +0800

    [INLONG-5101][DataProxy] Optimize load balancing for DataProxy (#5595)
---
 .../inlong/sdk/dataproxy/ConfigConstants.java      |   3 +
 .../inlong/sdk/dataproxy/DefaultMessageSender.java | 126 ++++----
 .../apache/inlong/sdk/dataproxy/LoadBalance.java   |  45 +++
 .../inlong/sdk/dataproxy/ProxyClientConfig.java    |  47 ++-
 .../sdk/dataproxy/config/ProxyConfigEntry.java     |   3 +-
 .../sdk/dataproxy/config/ProxyConfigManager.java   |  26 +-
 .../inlong/sdk/dataproxy/network/ClientMgr.java    | 350 ++++++++++++++-------
 .../inlong/sdk/dataproxy/network/HashRing.java     | 111 +++++++
 .../inlong/sdk/dataproxy/network/NettyClient.java  |  18 +-
 .../inlong/sdk/dataproxy/network/Sender.java       |  41 ++-
 .../sdk/dataproxy/utils/ConsistencyHashUtil.java   |  36 +++
 11 files changed, 583 insertions(+), 223 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
index a5e826c47..67fa7a2a8 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
@@ -68,5 +68,8 @@ public class ConfigConstants {
     public static final int FLAG_ALLOW_COMPRESS = 1 << 5;
 
     public static final String MANAGER_DATAPROXY_API = 
"/inlong/manager/openapi/dataproxy/getIpList/";
+    public static LoadBalance DEFAULT_LOAD_BALANCE = LoadBalance.ROBIN;
+    public static int DEFAULT_VIRTUAL_NODE = 1000;
+    public static int DEFAULT_RANDOM_MAX_RETRY = 1000;
 
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index e5e3496cd..2dc1c2ca5 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -95,12 +95,12 @@ public class DefaultMessageSender implements MessageSender {
     /**
      * generate by cluster id
      *
-     * @param configure - sender
+     * @param configure         - sender
      * @param selfDefineFactory - sender factory
      * @return - sender
      */
     public static DefaultMessageSender 
generateSenderByClusterId(ProxyClientConfig configure,
-            ThreadFactory selfDefineFactory) throws Exception {
+                                                                 ThreadFactory 
selfDefineFactory) throws Exception {
         ProxyConfigManager proxyConfigManager = new 
ProxyConfigManager(configure,
                 Utils.getLocalIp(), null);
         proxyConfigManager.setGroupId(configure.getGroupId());
@@ -188,13 +188,13 @@ public class DefaultMessageSender implements 
MessageSender {
 
     @Deprecated
     public SendResult sendMessage(byte[] body, String attributes, String 
msgUUID,
-            long timeout, TimeUnit timeUnit) {
+                                  long timeout, TimeUnit timeUnit) {
         return sender.syncSendMessage(new EncodeObject(body, attributes,
                 idGenerator.getNextId()), msgUUID, timeout, timeUnit);
     }
 
     public SendResult sendMessage(byte[] body, String groupId, String 
streamId, long dt, String msgUUID,
-            long timeout, TimeUnit timeUnit) {
+                                  long timeout, TimeUnit timeUnit) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES;
@@ -210,13 +210,13 @@ public class DefaultMessageSender implements 
MessageSender {
             return sender.syncSendMessage(encodeObject, msgUUID, timeout, 
timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
             if (isCompressEnd) {
-                return sender.syncSendMessage(new EncodeObject(body, 
"groupId=" + groupId
-                        + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy",
-                        idGenerator.getNextId(), this.getMsgtype(), true, 
groupId), msgUUID, timeout, timeUnit);
+                return sender.syncSendMessage(new EncodeObject(body, 
"groupId=" + groupId + "&streamId="
+                        + streamId + "&dt=" + dt + "&cp=snappy", 
idGenerator.getNextId(), this.getMsgtype(),
+                        true, groupId), msgUUID, timeout, timeUnit);
             } else {
-                return sender.syncSendMessage(new EncodeObject(body, 
"groupId=" + groupId
-                        + "&streamId=" + streamId + "&dt=" + dt,
-                        idGenerator.getNextId(), this.getMsgtype(), false, 
groupId), msgUUID, timeout, timeUnit);
+                return sender.syncSendMessage(new EncodeObject(body, 
"groupId=" + groupId + "&streamId="
+                                + streamId + "&dt=" + dt, 
idGenerator.getNextId(), this.getMsgtype(), false, groupId),
+                        msgUUID, timeout, timeUnit);
             }
         }
 
@@ -224,7 +224,7 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public SendResult sendMessage(byte[] body, String groupId, String 
streamId, long dt, String msgUUID,
-            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) 
{
+                                  long timeout, TimeUnit timeUnit, Map<String, 
String> extraAttrMap) {
 
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || 
!ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -247,10 +247,12 @@ public class DefaultMessageSender implements 
MessageSender {
             if (isCompressEnd) {
                 attrs.append("&cp=snappy");
                 return sender.syncSendMessage(new EncodeObject(body, 
attrs.toString(),
-                        idGenerator.getNextId(), this.getMsgtype(), true, 
groupId), msgUUID, timeout, timeUnit);
+                                idGenerator.getNextId(), this.getMsgtype(), 
true, groupId),
+                        msgUUID, timeout, timeUnit);
             } else {
                 return sender.syncSendMessage(new EncodeObject(body, 
attrs.toString(),
-                        idGenerator.getNextId(), this.getMsgtype(), false, 
groupId), msgUUID, timeout, timeUnit);
+                                idGenerator.getNextId(), this.getMsgtype(), 
false, groupId), msgUUID,
+                        timeout, timeUnit);
             }
         }
         return null;
@@ -258,7 +260,7 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public SendResult sendMessage(List<byte[]> bodyList, String groupId, 
String streamId, long dt, String msgUUID,
-            long timeout, TimeUnit timeUnit) {
+                                  long timeout, TimeUnit timeUnit) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES;
@@ -274,19 +276,19 @@ public class DefaultMessageSender implements 
MessageSender {
         } else if (msgtype == 3 || msgtype == 5) {
             if (isCompress) {
                 return sender.syncSendMessage(new EncodeObject(bodyList, 
"groupId=" + groupId + "&streamId=" + streamId
-                        + "&dt=" + dt + "&cp=snappy" + "&cnt=" + 
bodyList.size(),
-                        idGenerator.getNextId(), this.getMsgtype(), true, 
groupId), msgUUID, timeout, timeUnit);
+                        + "&dt=" + dt + "&cp=snappy" + "&cnt=" + 
bodyList.size(), idGenerator.getNextId(),
+                        this.getMsgtype(), true, groupId), msgUUID, timeout, 
timeUnit);
             } else {
                 return sender.syncSendMessage(new EncodeObject(bodyList, 
"groupId=" + groupId + "&streamId=" + streamId
-                        + "&dt=" + dt + "&cnt=" + bodyList.size(),
-                        idGenerator.getNextId(), this.getMsgtype(), false, 
groupId), msgUUID, timeout, timeUnit);
+                        + "&dt=" + dt + "&cnt=" + bodyList.size(), 
idGenerator.getNextId(), this.getMsgtype(),
+                        false, groupId), msgUUID, timeout, timeUnit);
             }
         }
         return null;
     }
 
-    public SendResult sendMessage(List<byte[]> bodyList, String groupId, 
String streamId, long dt, String msgUUID,
-            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) 
{
+    public SendResult sendMessage(List<byte[]> bodyList, String groupId, 
String streamId, long dt,
+                                  String msgUUID, long timeout, TimeUnit 
timeUnit, Map<String, String> extraAttrMap) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
                 || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -307,25 +309,27 @@ public class DefaultMessageSender implements 
MessageSender {
             if (isCompress) {
                 attrs.append("&cp=snappy");
                 return sender.syncSendMessage(new EncodeObject(bodyList, 
attrs.toString(),
-                        idGenerator.getNextId(), this.getMsgtype(), true, 
groupId), msgUUID, timeout, timeUnit);
+                                idGenerator.getNextId(), this.getMsgtype(), 
true, groupId),
+                        msgUUID, timeout, timeUnit);
             } else {
                 return sender.syncSendMessage(new EncodeObject(bodyList, 
attrs.toString(),
-                        idGenerator.getNextId(), this.getMsgtype(), false, 
groupId), msgUUID, timeout, timeUnit);
+                                idGenerator.getNextId(), this.getMsgtype(), 
false, groupId),
+                        msgUUID, timeout, timeUnit);
             }
         }
         return null;
     }
 
     @Deprecated
-    public void asyncSendMessage(SendMessageCallback callback, byte[] body, 
String attributes, String msgUUID,
-            long timeout, TimeUnit timeUnit) throws ProxysdkException {
+    public void asyncSendMessage(SendMessageCallback callback, byte[] body, 
String attributes,
+                                 String msgUUID, long timeout, TimeUnit 
timeUnit) throws ProxysdkException {
         sender.asyncSendMessage(new EncodeObject(body, attributes, 
idGenerator.getNextId()),
                 callback, msgUUID, timeout, timeUnit);
     }
 
     public void asyncSendMessage(SendMessageCallback callback, byte[] body,
-            String groupId, String streamId, long dt, String msgUUID,
-            long timeout, TimeUnit timeUnit) throws ProxysdkException {
+                                 String groupId, String streamId, long dt, 
String msgUUID,
+                                 long timeout, TimeUnit timeUnit) throws 
ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             throw new 
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -356,10 +360,9 @@ public class DefaultMessageSender implements MessageSender 
{
 
     }
 
-    public void asyncSendMessage(SendMessageCallback callback,
-            byte[] body, String groupId, String streamId, long dt, String 
msgUUID,
-            long timeout, TimeUnit timeUnit,
-            Map<String, String> extraAttrMap) throws ProxysdkException {
+    public void asyncSendMessage(SendMessageCallback callback, byte[] body, 
String groupId, String streamId,
+                                 long dt, String msgUUID, long timeout, 
TimeUnit timeUnit,
+                                 Map<String, String> extraAttrMap) throws 
ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || 
!ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             throw new 
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -390,8 +393,8 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public void asyncSendMessage(SendMessageCallback callback, List<byte[]> 
bodyList,
-            String groupId, String streamId, long dt, String msgUUID,
-            long timeout, TimeUnit timeUnit) throws ProxysdkException {
+                                 String groupId, String streamId, long dt, 
String msgUUID,
+                                 long timeout, TimeUnit timeUnit) throws 
ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
             throw new 
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -407,23 +410,21 @@ public class DefaultMessageSender implements 
MessageSender {
             if (isCompress) {
                 sender.asyncSendMessage(
                         new EncodeObject(bodyList, "groupId=" + groupId + 
"&streamId=" + streamId
-                                + "&dt=" + dt + "&cp=snappy" + "&cnt=" + 
bodyList.size(),
-                                idGenerator.getNextId(), this.getMsgtype(),
-                                true, groupId), callback, msgUUID, timeout, 
timeUnit);
+                                + "&dt=" + dt + "&cp=snappy" + "&cnt=" + 
bodyList.size(), idGenerator.getNextId(),
+                                this.getMsgtype(), true, groupId), callback, 
msgUUID, timeout, timeUnit);
             } else {
                 sender.asyncSendMessage(
-                        new EncodeObject(bodyList, "groupId=" + groupId + 
"&streamId="
-                                + streamId + "&dt=" + dt + "&cnt=" + 
bodyList.size(),
-                                idGenerator.getNextId(), this.getMsgtype(),
+                        new EncodeObject(bodyList, "groupId=" + groupId + 
"&streamId=" + streamId + "&dt=" + dt
+                                + "&cnt=" + bodyList.size(), 
idGenerator.getNextId(), this.getMsgtype(),
                                 false, groupId), callback, msgUUID, timeout, 
timeUnit);
             }
         }
     }
 
     public void asyncSendMessage(SendMessageCallback callback,
-            List<byte[]> bodyList, String groupId, String streamId, long dt, 
String msgUUID,
-            long timeout, TimeUnit timeUnit,
-            Map<String, String> extraAttrMap) throws ProxysdkException {
+                                 List<byte[]> bodyList, String groupId, String 
streamId, long dt, String msgUUID,
+                                 long timeout, TimeUnit timeUnit,
+                                 Map<String, String> extraAttrMap) throws 
ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
                 || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -465,7 +466,7 @@ public class DefaultMessageSender implements MessageSender {
      */
     @Override
     public void asyncSendMessage(String inlongGroupId, String inlongStreamId, 
byte[] body,
-            SendMessageCallback callback) throws ProxysdkException {
+                                 SendMessageCallback callback) throws 
ProxysdkException {
         this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, 
System.currentTimeMillis(),
                 idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, 
DEFAULT_SEND_TIMEUNIT);
     }
@@ -481,7 +482,7 @@ public class DefaultMessageSender implements MessageSender {
      */
     @Override
     public void asyncSendMessage(String inlongGroupId, String inlongStreamId, 
List<byte[]> bodyList,
-            SendMessageCallback callback) throws ProxysdkException {
+                                 SendMessageCallback callback) throws 
ProxysdkException {
         this.asyncSendMessage(callback, bodyList, inlongGroupId, 
inlongStreamId, System.currentTimeMillis(),
                 idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, 
DEFAULT_SEND_TIMEUNIT);
     }
@@ -501,9 +502,9 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public void asyncsendMessageData(FileCallback callback, List<byte[]> 
bodyList, String groupId,
-            String streamId, long dt, int sid, boolean isSupportLF, String 
msgUUID,
-            long timeout, TimeUnit timeUnit,
-            Map<String, String> extraAttrMap) throws ProxysdkException {
+                                     String streamId, long dt, int sid, 
boolean isSupportLF, String msgUUID,
+                                     long timeout, TimeUnit timeUnit,
+                                     Map<String, String> extraAttrMap) throws 
ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
                 || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -523,8 +524,8 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     private void asyncSendMetric(FileCallback callback, byte[] body, String 
groupId,
-            String streamId, long dt, int sid, String ip, String msgUUID,
-            long timeout, TimeUnit timeUnit, String messageKey) throws 
ProxysdkException {
+                                 String streamId, long dt, int sid, String ip, 
String msgUUID,
+                                 long timeout, TimeUnit timeUnit, String 
messageKey) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             throw new 
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -538,20 +539,22 @@ public class DefaultMessageSender implements 
MessageSender {
     }
 
     public void asyncsendMessageProxy(FileCallback callback, byte[] body, 
String groupId, String streamId,
-            long dt, int sid, String ip, String msgUUID,
-            long timeout, TimeUnit timeUnit) throws ProxysdkException {
-        asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip, 
msgUUID, timeout, timeUnit, "minute");
+                                      long dt, int sid, String ip, String 
msgUUID,
+                                      long timeout, TimeUnit timeUnit) throws 
ProxysdkException {
+        asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip, 
msgUUID, timeout,
+                timeUnit, "minute");
     }
 
     public void asyncsendMessageFile(FileCallback callback, byte[] body, 
String groupId,
-            String streamId, long dt, int sid, String msgUUID,
-            long timeout, TimeUnit timeUnit) throws ProxysdkException {
-        asyncSendMetric(callback, body, groupId, streamId, dt, sid, "", 
msgUUID, timeout, timeUnit, "file");
+                                     String streamId, long dt, int sid, String 
msgUUID,
+                                     long timeout, TimeUnit timeUnit) throws 
ProxysdkException {
+        asyncSendMetric(callback, body, groupId, streamId, dt, sid, "", 
msgUUID, timeout, timeUnit,
+                "file");
     }
 
     public String sendMessageData(List<byte[]> bodyList, String groupId,
-            String streamId, long dt, int sid, boolean isSupportLF, String 
msgUUID,
-            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) 
{
+                                  String streamId, long dt, int sid, boolean 
isSupportLF, String msgUUID,
+                                  long timeout, TimeUnit timeUnit, Map<String, 
String> extraAttrMap) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
                 || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -571,8 +574,9 @@ public class DefaultMessageSender implements MessageSender {
         return null;
     }
 
-    private String sendMetric(byte[] body, String groupId, String streamId, 
long dt, int sid, String ip, String msgUUID,
-            long timeout, TimeUnit timeUnit, String messageKey) {
+    private String sendMetric(byte[] body, String groupId, String streamId, 
long dt, int sid, String ip,
+                              String msgUUID,
+                              long timeout, TimeUnit timeUnit, String 
messageKey) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES.toString();
@@ -586,13 +590,13 @@ public class DefaultMessageSender implements 
MessageSender {
     }
 
     public String sendMessageProxy(byte[] body, String groupId, String 
streamId,
-            long dt, int sid, String ip, String msgUUID,
-            long timeout, TimeUnit timeUnit) {
+                                   long dt, int sid, String ip, String msgUUID,
+                                   long timeout, TimeUnit timeUnit) {
         return sendMetric(body, groupId, streamId, dt, sid, ip, msgUUID, 
timeout, timeUnit, "minute");
     }
 
     public String sendMessageFile(byte[] body, String groupId, String 
streamId, long dt, int sid, String msgUUID,
-            long timeout, TimeUnit timeUnit) {
+                                  long timeout, TimeUnit timeUnit) {
         return sendMetric(body, groupId, streamId, dt, sid, "", msgUUID, 
timeout, timeUnit, "file");
     }
 
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java
new file mode 100644
index 000000000..ff239288e
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy;
+
+public enum LoadBalance {
+
+    RANDOM("random", 0),
+    ROBIN("robin", 1),
+    CONSISTENCY_HASH("consistency hash", 2),
+    WEIGHT_RANDOM("weight random", 3),
+    WEIGHT_ROBIN("weight robin", 4);
+
+    private String name;
+    private int index;
+
+    private LoadBalance(String name, int index) {
+        this.name = name;
+        this.index = index;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public int getIndex() {
+        return index;
+    }
+
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index 30bedcdb7..964c43be3 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -88,17 +88,22 @@ public class ProxyClientConfig {
     private int metricIntervalInMs = 60 * 1000;
     // max cache time for proxy config.
     private long maxProxyCacheTimeInMs = 30 * 60 * 1000;
-
     // metric groupId
     private String metricGroupId = "inlong_sla_metric";
 
     private int ioThreadNum = Runtime.getRuntime().availableProcessors();
     private boolean enableBusyWait = false;
 
+    private int virtualNode;
+
+    private LoadBalance loadBalance;
+
+    private int maxRetry;
+
     /*pay attention to the last url parameter ip*/
     public ProxyClientConfig(String localHost, boolean isLocalVisit, String 
managerIp,
-            int managerPort, String groupId, String netTag, String 
authSecretId, String authSecretKey)
-            throws ProxysdkException {
+                             int managerPort, String groupId, String netTag, 
String authSecretId, String authSecretKey,
+                             LoadBalance loadBalance, int virtualNode, int 
maxRetry) throws ProxysdkException {
         if (Utils.isBlank(localHost)) {
             throw new ProxysdkException("localHost is blank!");
         }
@@ -126,6 +131,16 @@ public class ProxyClientConfig {
         this.setRequestTimeoutMillis(ConfigConstants.DEFAULT_SEND_BUFFER_SIZE);
         this.authSecretId = authSecretId;
         this.authSecretKey = authSecretKey;
+        this.loadBalance = loadBalance;
+        this.virtualNode = virtualNode;
+        this.maxRetry = maxRetry;
+    }
+
+    public ProxyClientConfig(String localHost, boolean isLocalVisit, String 
managerIp, int managerPort, String groupId,
+                             String netTag, String authSecretId, String 
authSecretKey) throws ProxysdkException {
+        this(localHost, isLocalVisit, managerIp, managerPort, groupId, netTag, 
authSecretId, authSecretKey,
+                ConfigConstants.DEFAULT_LOAD_BALANCE, 
ConfigConstants.DEFAULT_VIRTUAL_NODE,
+                ConfigConstants.DEFAULT_RANDOM_MAX_RETRY);
     }
 
     public String getTlsServerCertFilePathAndName() {
@@ -289,7 +304,7 @@ public class ProxyClientConfig {
     }
 
     public void setAuthenticationInfo(boolean needAuthentication, boolean 
needDataEncry,
-            final String userName, final String secretKey) {
+                                      final String userName, final String 
secretKey) {
         this.needAuthentication = needAuthentication;
         this.isNeedDataEncry = needDataEncry;
         if (this.needAuthentication || this.isNeedDataEncry) {
@@ -459,4 +474,28 @@ public class ProxyClientConfig {
     public void setEnableBusyWait(boolean enableBusyWait) {
         this.enableBusyWait = enableBusyWait;
     }
+
+    public int getVirtualNode() {
+        return virtualNode;
+    }
+
+    public void setVirtualNode(int virtualNode) {
+        this.virtualNode = virtualNode;
+    }
+
+    public LoadBalance getLoadBalance() {
+        return loadBalance;
+    }
+
+    public void setLoadBalance(LoadBalance loadBalance) {
+        this.loadBalance = loadBalance;
+    }
+
+    public int getMaxRetry() {
+        return maxRetry;
+    }
+
+    public void setMaxRetry(int maxRetry) {
+        this.maxRetry = maxRetry;
+    }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
index c975fafa4..5e82b4558 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
@@ -81,8 +81,7 @@ public class ProxyConfigEntry implements java.io.Serializable 
{
     @Override
     public String toString() {
         return "ProxyConfigEntry [hostMap=" + hostMap + ", load=" + load + ", 
size=" + size + ", isInterVisit="
-                + isInterVisit + ", groupId=" + groupId
-                + ", switch=" + switchStat + "]";
+                + isInterVisit + ", groupId=" + groupId + ", switch=" + 
switchStat + "]";
     }
 
     public int getClusterId() {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index a69b91478..f0f3acc7f 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -45,12 +45,13 @@ import org.apache.http.params.HttpConnectionParams;
 import org.apache.http.params.HttpParams;
 import org.apache.http.ssl.SSLContexts;
 import org.apache.http.util.EntityUtils;
-import org.apache.inlong.common.util.BasicAuth;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
+import org.apache.inlong.common.util.BasicAuth;
 import org.apache.inlong.sdk.dataproxy.ConfigConstants;
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
+import org.apache.inlong.sdk.dataproxy.network.HashRing;
 import org.apache.inlong.sdk.dataproxy.network.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,22 +59,22 @@ import org.slf4j.LoggerFactory;
 import javax.net.ssl.SSLContext;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.FileWriter;
 import java.io.ObjectInputStream;
+import java.io.FileOutputStream;
 import java.io.ObjectOutputStream;
+import java.io.FileWriter;
+import java.io.FileReader;
 import java.io.UnsupportedEncodingException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.Random;
 import java.util.List;
+import java.util.ArrayList;
 import java.util.Map;
-import java.util.Random;
+import java.util.HashMap;
+import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -93,6 +94,7 @@ public class ProxyConfigManager extends Thread {
     private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
     private final JsonParser jsonParser = new JsonParser();
     private final Gson gson = new Gson();
+    private final HashRing hashRing = HashRing.getInstance();
     private List<HostInfo> proxyInfoList = new ArrayList<HostInfo>();
     /*the status of the cluster.if this value is changed,we need rechoose  
three proxy*/
     private int oldStat = 0;
@@ -106,6 +108,7 @@ public class ProxyConfigManager extends Thread {
         this.clientConfig = configure;
         this.localIP = localIP;
         this.clientManager = clientManager;
+        this.hashRing.setVirtualNode(configure.getVirtualNode());
     }
 
     public String getGroupId() {
@@ -297,6 +300,7 @@ public class ProxyConfigManager extends Thread {
             }
         }
         compareProxyList(proxyEntry);
+
     }
 
     /**
@@ -340,6 +344,7 @@ public class ProxyConfigManager extends Thread {
                     newProxyInfoList.clear();
                     LOGGER.info("proxy IP list doesn't change, load {}", 
proxyEntry.getLoad());
                 }
+                updateHashRing(proxyInfoList);
             } else {
                 LOGGER.error("proxyEntry's size is zero");
             }
@@ -819,4 +824,9 @@ public class ProxyConfigManager extends Thread {
         }
         return localManagerIps;
     }
+
+    public void updateHashRing(List<HostInfo> newHosts) {
+        this.hashRing.updateNode(newHosts);
+        LOGGER.info("update hash ring {}", hashRing.getVirtualNode2RealNode());
+    }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
index dee9e8e3c..d01074307 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
@@ -24,25 +24,28 @@ import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import org.apache.inlong.sdk.dataproxy.ConfigConstants;
+import org.apache.inlong.sdk.dataproxy.LoadBalance;
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
 import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.HostInfo;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
+import org.apache.inlong.sdk.dataproxy.utils.ConsistencyHashUtil;
 import org.apache.inlong.sdk.dataproxy.utils.EventLoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
+import java.util.Map;
 import java.util.HashMap;
+import java.util.Random;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.Comparator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicLong;
@@ -51,22 +54,27 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 public class ClientMgr {
     private static final Logger logger = LoggerFactory
             .getLogger(ClientMgr.class);
-
+    private static final int[] weight = {
+            1, 1, 1, 1, 1,
+            2, 2, 2, 2, 2,
+            3, 3, 3, 3, 3,
+            6, 6, 6, 6, 6,
+            12, 12, 12, 12, 12,
+            48, 96, 192, 384, 1000};
     private final Map<HostInfo, NettyClient> clientMapData = new 
ConcurrentHashMap<>();
-
     private final ConcurrentHashMap<HostInfo, NettyClient> clientMapHB = new 
ConcurrentHashMap<>();
     // clientMapData + clientMapHB = clientMap
     private final ConcurrentHashMap<HostInfo, NettyClient> clientMap = new 
ConcurrentHashMap<>();
-
     private final ConcurrentHashMap<HostInfo, AtomicLong> lastBadHostMap = new 
ConcurrentHashMap<>();
-
     // clientList is the valueSet of clientMapData
     private final ArrayList<NettyClient> clientList = new ArrayList<>();
-    private List<HostInfo> proxyInfoList = new ArrayList<>();
-
     private final Map<HostInfo, int[]> channelLoadMapData = new 
ConcurrentHashMap<>();
     private final Map<HostInfo, int[]> channelLoadMapHB = new 
ConcurrentHashMap<>();
-
+    /**
+     * Lock to protect FSNamesystem.
+     */
+    private final ReentrantReadWriteLock fsLock = new 
ReentrantReadWriteLock(true);
+    private List<HostInfo> proxyInfoList = new ArrayList<>();
     private Bootstrap bootstrap;
     private int currentIndex = 0;
     private ProxyClientConfig configure;
@@ -75,24 +83,72 @@ public class ClientMgr {
     private int realSize;
     private SendHBThread sendHBThread;
     private ProxyConfigManager ipManager;
-
     private int groupIdNum = 0;
     private String groupId = "";
     private Map<String, Integer> streamIdMap = new HashMap<String, Integer>();
+    //    private static final int total_weight = 240;
     private int loadThreshold;
     private int loadCycle = 0;
-    private static final int[] weight = {
-            1, 1, 1, 1, 1,
-            2, 2, 2, 2, 2,
-            3, 3, 3, 3, 3,
-            6, 6, 6, 6, 6,
-            12, 12, 12, 12, 12,
-            48, 96, 192, 384, 1000};
-//    private static final int total_weight = 240;
+    private LoadBalance loadBalance;
+
+    public ClientMgr(ProxyClientConfig configure, Sender sender) throws 
Exception {
+        this(configure, sender, null);
+    }
+
     /**
-     * Lock to protect FSNamesystem.
+     * Build up the connection between the server and client.
      */
-    private final ReentrantReadWriteLock fsLock = new 
ReentrantReadWriteLock(true);
+    public ClientMgr(ProxyClientConfig configure, Sender sender, ThreadFactory 
selfDefineFactory) throws Exception {
+        /* Initialize the bootstrap. */
+        if (selfDefineFactory == null) {
+            selfDefineFactory = new DefaultThreadFactory("agent-client-io",
+                    Thread.currentThread().isDaemon());
+        }
+        EventLoopGroup eventLoopGroup = 
EventLoopUtil.newEventLoopGroup(configure.getIoThreadNum(),
+                configure.isEnableBusyWait(), selfDefineFactory);
+        bootstrap = new Bootstrap();
+        bootstrap.group(eventLoopGroup);
+        
bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
+        bootstrap.option(ChannelOption.SO_RCVBUF, 
ConfigConstants.DEFAULT_RECEIVE_BUFFER_SIZE);
+        bootstrap.option(ChannelOption.SO_SNDBUF, 
ConfigConstants.DEFAULT_SEND_BUFFER_SIZE);
+        if (configure.getNetTag().equals("bobcat")) {
+            bootstrap.option(ChannelOption.IP_TOS, 96);
+        }
+        bootstrap.handler(new ClientPipelineFactory(this, sender));
+        /* ready to Start the thread which refreshes the proxy list. */
+        ipManager = new ProxyConfigManager(configure, Utils.getLocalIp(), 
this);
+        ipManager.setName("proxyConfigManager");
+        if (configure.getGroupId() != null) {
+            ipManager.setGroupId(configure.getGroupId());
+            groupId = configure.getGroupId();
+        }
+
+        /*
+         * Request the IP before starting, so that we already have three
+         * connections.
+         */
+        this.configure = configure;
+        this.sender = sender;
+        this.aliveConnections = configure.getAliveConnections();
+        this.loadBalance = configure.getLoadBalance();
+
+        try {
+            ipManager.doProxyEntryQueryWork();
+        } catch (IOException e) {
+            e.printStackTrace();
+            logger.info(e.getMessage());
+        }
+        ipManager.setDaemon(true);
+        ipManager.start();
+
+        this.sendHBThread = new SendHBThread();
+        this.sendHBThread.setName("SendHBThread");
+        this.sendHBThread.start();
+    }
+
+    public LoadBalance getLoadBalance() {
+        return this.loadBalance;
+    }
 
     public int getLoadThreshold() {
         return loadThreshold;
@@ -134,6 +190,50 @@ public class ClientMgr {
         return proxyInfoList;
     }
 
+    public void setProxyInfoList(List<HostInfo> proxyInfoList) {
+        try {
+            /* Close and remove old client. */
+            writeLock();
+            this.proxyInfoList = proxyInfoList;
+
+            if (loadThreshold == 0) {
+                if (aliveConnections >= proxyInfoList.size()) {
+                    realSize = proxyInfoList.size();
+                    aliveConnections = realSize;
+                    logger.error("there is no enough proxy to work!");
+                } else {
+                    realSize = aliveConnections;
+                }
+            } else {
+                if (aliveConnections >= proxyInfoList.size()) {
+                    realSize = proxyInfoList.size();
+                    aliveConnections = realSize;
+                    logger.error("there is no idle proxy to choose for 
balancing!");
+                } else if ((aliveConnections + 4) > proxyInfoList.size()) {
+                    realSize = proxyInfoList.size();
+                    logger.warn("there is only {} idle proxy to choose for 
balancing!",
+                            proxyInfoList.size() - aliveConnections);
+                } else {
+                    realSize = aliveConnections + 4;
+                }
+            }
+
+            List<HostInfo> hostInfos = getRealHosts(proxyInfoList, realSize);
+
+            /* Refresh the current channel connections. */
+            updateAllConnection(hostInfos);
+
+            logger.info(
+                    "update all connection ,client map size {},client list 
size {}",
+                    clientMapData.size(), clientList.size());
+
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        } finally {
+            writeUnlock();
+        }
+    }
+
     public int getAliveConnections() {
         return aliveConnections;
     }
@@ -174,60 +274,6 @@ public class ClientMgr {
         return hasReadLock() || hasWriteLock();
     }
 
-    public ClientMgr(ProxyClientConfig configure, Sender sender) throws 
Exception {
-        this(configure, sender, null);
-    }
-    
-    /**
-     * Build up the connection between the server and client.
-     */
-    public ClientMgr(ProxyClientConfig configure, Sender sender, ThreadFactory 
selfDefineFactory) throws Exception {
-        /* Initialize the bootstrap. */
-        if (selfDefineFactory == null) {
-            selfDefineFactory = new DefaultThreadFactory("agent-client-io",
-                    Thread.currentThread().isDaemon());
-        }
-        EventLoopGroup eventLoopGroup = 
EventLoopUtil.newEventLoopGroup(configure.getIoThreadNum(),
-                configure.isEnableBusyWait(), selfDefineFactory);
-        bootstrap = new Bootstrap();
-        bootstrap.group(eventLoopGroup);
-        
bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
-        bootstrap.option(ChannelOption.SO_RCVBUF, 
ConfigConstants.DEFAULT_RECEIVE_BUFFER_SIZE);
-        bootstrap.option(ChannelOption.SO_SNDBUF, 
ConfigConstants.DEFAULT_SEND_BUFFER_SIZE);
-        if (configure.getNetTag().equals("bobcat")) {
-            bootstrap.option(ChannelOption.IP_TOS, 96);
-        }
-        bootstrap.handler(new ClientPipelineFactory(this, sender));
-        /* ready to Start the thread which refreshes the proxy list. */
-        ipManager = new ProxyConfigManager(configure, Utils.getLocalIp(), 
this);
-        ipManager.setName("proxyConfigManager");
-        if (configure.getGroupId() != null) {
-            ipManager.setGroupId(configure.getGroupId());
-            groupId = configure.getGroupId();
-        }
-
-        /*
-         * Request the IP before starting, so that we already have three
-         * connections.
-         */
-        this.configure = configure;
-        this.sender = sender;
-        this.aliveConnections = configure.getAliveConnections();
-
-        try {
-            ipManager.doProxyEntryQueryWork();
-        } catch (IOException e) {
-            e.printStackTrace();
-            logger.info(e.getMessage());
-        }
-        ipManager.setDaemon(true);
-        ipManager.start();
-
-        this.sendHBThread = new SendHBThread();
-        this.sendHBThread.setName("SendHBThread");
-        this.sendHBThread.start();
-    }
-
     public ProxyConfigEntry getGroupIdConfigureInfo() throws Exception {
         return ipManager.getGroupIdConfigure();
     }
@@ -348,6 +394,90 @@ public class ClientMgr {
         return client;
     }
 
+    public synchronized NettyClient getClientByRandom() {
+        NettyClient client;
+        if (clientList.isEmpty()) {
+            return null;
+        }
+        int currSize = clientList.size();
+        int maxRetry = this.configure.getMaxRetry();
+        Random random = new Random(System.currentTimeMillis());
+        do {
+            int randomId = random.nextInt();
+            client = clientList.get(randomId % currSize);
+            if (client != null && client.isActive()) {
+                break;
+            }
+            maxRetry--;
+        } while (maxRetry > 0);
+        if (client == null || !client.isActive()) {
+            return null;
+        }
+        return client;
+    }
+
+//    public synchronized NettyClient getClientByLeastConnections() {}
+
+    public synchronized NettyClient getClientByConsistencyHash(String 
messageId) {
+        NettyClient client;
+        if (clientList.isEmpty()) {
+            return null;
+        }
+        String hash = ConsistencyHashUtil.hashMurMurHash(messageId);
+        HashRing cluster = HashRing.getInstance();
+        HostInfo info = cluster.getNode(hash);
+        client = this.clientMap.get(info);
+        return client;
+    }
+
+    public synchronized NettyClient getClientByWeightRoundRobin() {
+        NettyClient client = null;
+        double maxWeight = Double.MIN_VALUE;
+        int clientId = 0;
+        if (clientList.isEmpty()) {
+            return null;
+        }
+        int currSize = clientList.size();
+        for (int retryTime = 0; retryTime < currSize; retryTime++) {
+            currentIndex = (++currentIndex) % currSize;
+            client = clientList.get(currentIndex);
+            if (client != null && client.isActive() && client.getWeight() > 
maxWeight) {
+                clientId = currentIndex;
+            }
+        }
+        if (client == null || !client.isActive()) {
+            return null;
+        }
+        return clientList.get(clientId);
+    }
+
+//    public synchronized NettyClient getClientByWeightLeastConnections(){}
+
+    public synchronized NettyClient getClientByWeightRandom() {
+        NettyClient client;
+        double maxWeight = Double.MIN_VALUE;
+        int clientId = 0;
+        if (clientList.isEmpty()) {
+            return null;
+        }
+        int currSize = clientList.size();
+        int maxRetry = this.configure.getMaxRetry();
+        Random random = new Random(System.currentTimeMillis());
+        do {
+            int randomId = random.nextInt();
+            client = clientList.get(randomId % currSize);
+            if (client != null && client.isActive()) {
+                clientId = randomId;
+                break;
+            }
+            maxRetry--;
+        } while (maxRetry > 0);
+        if (client == null || !client.isActive()) {
+            return null;
+        }
+        return clientList.get(clientId);
+    }
+
     public NettyClient getContainProxy(String proxyip) {
         if (proxyip == null) {
             return null;
@@ -854,50 +984,6 @@ public class ClientMgr {
 
     }
 
-    public void setProxyInfoList(List<HostInfo> proxyInfoList) {
-        try {
-            /* Close and remove old client. */
-            writeLock();
-            this.proxyInfoList = proxyInfoList;
-
-            if (loadThreshold == 0) {
-                if (aliveConnections >= proxyInfoList.size()) {
-                    realSize = proxyInfoList.size();
-                    aliveConnections = realSize;
-                    logger.error("there is no enough proxy to work!");
-                } else {
-                    realSize = aliveConnections;
-                }
-            } else {
-                if (aliveConnections >= proxyInfoList.size()) {
-                    realSize = proxyInfoList.size();
-                    aliveConnections = realSize;
-                    logger.error("there is no idle proxy to choose for 
balancing!");
-                } else if ((aliveConnections + 4) > proxyInfoList.size()) {
-                    realSize = proxyInfoList.size();
-                    logger.warn("there is only {} idle proxy to choose for 
balancing!",
-                            proxyInfoList.size() - aliveConnections);
-                } else {
-                    realSize = aliveConnections + 4;
-                }
-            }
-
-            List<HostInfo> hostInfos = getRealHosts(proxyInfoList, realSize);
-
-            /* Refresh the current channel connections. */
-            updateAllConnection(hostInfos);
-
-            logger.info(
-                    "update all connection ,client map size {},client list 
size {}",
-                    clientMapData.size(), clientList.size());
-
-        } catch (Exception e) {
-            logger.error(e.getMessage());
-        } finally {
-            writeUnlock();
-        }
-    }
-
     private List<HostInfo> getRealHosts(List<HostInfo> hostList, int realSize) 
{
         if (realSize > hostList.size()) {
             return hostList;
@@ -911,10 +997,32 @@ public class ClientMgr {
         return resultHosts;
     }
 
+    public NettyClient getClient(LoadBalance loadBalance, EncodeObject 
encodeObject) {
+        NettyClient client = null;
+        switch (loadBalance) {
+            case RANDOM:
+                client = getClientByRandom();
+                break;
+            case CONSISTENCY_HASH:
+                client = 
getClientByConsistencyHash(encodeObject.getMessageId());
+                break;
+            case ROBIN:
+                client = getClientByRoundRobin();
+                break;
+            case WEIGHT_ROBIN:
+                client = getClientByWeightRoundRobin();
+                break;
+            case WEIGHT_RANDOM:
+                client = getClientByWeightRandom();
+                break;
+        }
+        return client;
+    }
+
     private class SendHBThread extends Thread {
 
-        private boolean bShutDown = false;
         private final int[] random = {17, 19, 23, 31, 37};
+        private boolean bShutDown = false;
 
         public SendHBThread() {
             bShutDown = false;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HashRing.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HashRing.java
new file mode 100644
index 000000000..00db8ae39
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HashRing.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network;
+
+import org.apache.inlong.sdk.dataproxy.ConfigConstants;
+import org.apache.inlong.sdk.dataproxy.config.HostInfo;
+import org.apache.inlong.sdk.dataproxy.utils.ConsistencyHashUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.TreeMap;
+import java.util.SortedMap;
+import java.util.stream.Collectors;
+
+public class HashRing {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HashRing.class);
+    private static volatile HashRing instance;
+    private int virtualNode = ConfigConstants.DEFAULT_VIRTUAL_NODE;
+    private TreeMap<String, HostInfo> virtualNode2RealNode;
+    private List<HostInfo> nodeList;
+
+    private HashRing() {
+        this.virtualNode2RealNode = new TreeMap<>();
+        this.nodeList = new ArrayList<>();
+    }
+
+    public static HashRing getInstance() {
+        if (instance == null) {
+            synchronized (HashRing.class) {
+                if (instance == null) {
+                    instance = new HashRing();
+                }
+            }
+        }
+        return instance;
+    }
+
+    public TreeMap<String, HostInfo> getVirtualNode2RealNode() {
+        return virtualNode2RealNode;
+    }
+
+    public String node2VirtualNode(HostInfo node, int index) {
+        return "virtual&&" + index + "&&" + node.toString();
+    }
+
+    public void setVirtualNode(int virtualNode) {
+        this.virtualNode = virtualNode;
+    }
+
+    public HostInfo getNode(String key) {
+        String hash = ConsistencyHashUtil.hashMurMurHash(key);
+        SortedMap<String, HostInfo> tailMap = 
this.virtualNode2RealNode.tailMap(hash);
+        HostInfo node;
+        if (tailMap.isEmpty()) {
+            node = 
this.virtualNode2RealNode.get(this.virtualNode2RealNode.firstKey());
+        } else {
+            node = this.virtualNode2RealNode.get(tailMap.firstKey());
+        }
+        LOGGER.info("{} located to {}", key, node);
+        return node;
+    }
+
+    private synchronized void extendNode(List<HostInfo> nodes) {
+        this.nodeList.addAll(nodes);
+        for (HostInfo host : this.nodeList) {
+            for (int i = 0; i < this.virtualNode; i++) {
+                String key = node2VirtualNode(host, i);
+                String hash = ConsistencyHashUtil.hashMurMurHash(key);
+                virtualNode2RealNode.put(hash, host);
+            }
+        }
+        LOGGER.info("append node list {}", nodes);
+    }
+
+    private synchronized void removeNode(List<HostInfo> hosts) {
+        for (HostInfo host : hosts) {
+            this.nodeList.remove(host);
+            for (int i = 0; i < this.virtualNode; i++) {
+                String hash = 
ConsistencyHashUtil.hashMurMurHash(node2VirtualNode(host, i));
+                virtualNode2RealNode.remove(hash);
+            }
+        }
+        LOGGER.info("remove node list {}", hosts);
+    }
+
+    public synchronized void updateNode(List<HostInfo> nodes) {
+        List<HostInfo> newHosts = new ArrayList<>(nodes);
+        List<HostInfo> oldHosts = new ArrayList<>(this.nodeList);
+        List<HostInfo> append = newHosts.stream().filter(host -> 
!oldHosts.contains(host)).collect(Collectors.toList());
+        List<HostInfo> remove = oldHosts.stream().filter(host -> 
!newHosts.contains(host)).collect(Collectors.toList());
+        extendNode(append);
+        removeNode(remove);
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
index cc3e50410..4d32917de 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
@@ -18,20 +18,22 @@
 
 package org.apache.inlong.sdk.dataproxy.network;
 
+import com.sun.management.OperatingSystemMXBean;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
 public class NettyClient {
     private static final Logger logger = 
LoggerFactory.getLogger(NettyClient.class);
 
@@ -213,4 +215,10 @@ public class NettyClient {
         setState(ConnState.BUSY);
     }
 
+    public double getWeight() {
+        OperatingSystemMXBean operatingSystemMXBean =
+                (OperatingSystemMXBean) 
ManagementFactory.getOperatingSystemMXBean();
+        return operatingSystemMXBean.getSystemLoadAverage();
+    }
+
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 5f2c72ebc..49be1ce54 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -22,10 +22,10 @@ import io.netty.channel.Channel;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.sdk.dataproxy.FileCallback;
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.FileCallback;
 import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.threads.MetricWorkerThread;
@@ -36,11 +36,11 @@ import org.slf4j.LoggerFactory;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -179,8 +179,8 @@ public class Sender {
     }
 
     private SendResult syncSendInternalMessage(NettyClient client,
-            EncodeObject encodeObject, String msgUUID,
-            long timeout, TimeUnit timeUnit)
+                                               EncodeObject encodeObject, 
String msgUUID,
+                                               long timeout, TimeUnit timeUnit)
             throws ExecutionException, InterruptedException, TimeoutException {
 
         if (client == null) {
@@ -228,12 +228,10 @@ public class Sender {
      * @param timeUnit
      * @return
      */
-    public SendResult syncSendMessage(EncodeObject encodeObject, String 
msgUUID,
-            long timeout, TimeUnit timeUnit) {
-        metricWorker.recordNumByKey(encodeObject.getMessageId(),
-                encodeObject.getGroupId(), encodeObject.getStreamId(),
+    public SendResult syncSendMessage(EncodeObject encodeObject, String 
msgUUID, long timeout, TimeUnit timeUnit) {
+        metricWorker.recordNumByKey(encodeObject.getMessageId(), 
encodeObject.getGroupId(), encodeObject.getStreamId(),
                 Utils.getLocalIp(), encodeObject.getDt(), 
encodeObject.getPackageTime(), encodeObject.getRealCnt());
-        NettyClient client = clientMgr.getClientByRoundRobin();
+        NettyClient client = clientMgr.getClient(clientMgr.getLoadBalance(), 
encodeObject);
         SendResult message = null;
         try {
             message = syncSendInternalMessage(client, encodeObject, msgUUID, 
timeout, timeUnit);
@@ -281,9 +279,9 @@ public class Sender {
         return message;
     }
 
-    private SendResult syncSendMessageIndexInternal(NettyClient client,
-            EncodeObject encodeObject, String msgUUID, long timeout,
-            TimeUnit timeUnit) throws ExecutionException, 
InterruptedException, TimeoutException {
+    private SendResult syncSendMessageIndexInternal(NettyClient client, 
EncodeObject encodeObject, String msgUUID,
+                                                    long timeout, TimeUnit 
timeUnit)
+            throws ExecutionException, InterruptedException, TimeoutException {
         if (client == null || !client.isActive()) {
             chooseProxy.remove(encodeObject.getMessageId());
             client = clientMgr.getClientByRoundRobin();
@@ -329,8 +327,7 @@ public class Sender {
      * @param timeUnit
      * @return
      */
-    public String syncSendMessageIndex(EncodeObject encodeObject, String 
msgUUID, long timeout,
-            TimeUnit timeUnit) {
+    public String syncSendMessageIndex(EncodeObject encodeObject, String 
msgUUID, long timeout, TimeUnit timeUnit) {
         try {
             SendResult message = null;
             NettyClient client = chooseProxy.get(encodeObject.getMessageId());
@@ -396,9 +393,8 @@ public class Sender {
      * @param timeUnit
      * @throws ProxysdkException
      */
-    public void asyncSendMessageIndex(EncodeObject encodeObject,
-            FileCallback callback, String msgUUID,
-            long timeout, TimeUnit timeUnit) throws ProxysdkException {
+    public void asyncSendMessageIndex(EncodeObject encodeObject, FileCallback 
callback, String msgUUID, long timeout,
+                                      TimeUnit timeUnit) throws 
ProxysdkException {
         NettyClient client = chooseProxy.get(encodeObject.getMessageId());
         String proxyip = encodeObject.getProxyIp();
         if (proxyip != null && proxyip.length() != 0) {
@@ -514,14 +510,14 @@ public class Sender {
      * Following methods used by asynchronously message sending.
      */
     public void asyncSendMessage(EncodeObject encodeObject, 
SendMessageCallback callback, String msgUUID,
-            long timeout, TimeUnit timeUnit) throws ProxysdkException {
+                                 long timeout, TimeUnit timeUnit) throws 
ProxysdkException {
         metricWorker.recordNumByKey(encodeObject.getMessageId(), 
encodeObject.getGroupId(),
                 encodeObject.getStreamId(), Utils.getLocalIp(), 
encodeObject.getPackageTime(),
                 encodeObject.getDt(), encodeObject.getRealCnt());
 
         // send message package time
 
-        NettyClient client = clientMgr.getClientByRoundRobin();
+        NettyClient client = clientMgr.getClient(clientMgr.getLoadBalance(), 
encodeObject);
         if (client == null) {
             throw new ProxysdkException(SendResult.NO_CONNECTION.toString());
         }
@@ -620,7 +616,8 @@ public class Sender {
                         continue;
                     }
                     if (isFile) {
-                        ((FileCallback) 
queueObject.getCallback()).onMessageAck(SendResult.CONNECTION_BREAK.toString());
+                        ((FileCallback) queueObject.getCallback())
+                                
.onMessageAck(SendResult.CONNECTION_BREAK.toString());
                         currentBufferSize.addAndGet(-queueObject.getSize());
                     } else {
                         
queueObject.getCallback().onMessageAck(SendResult.CONNECTION_BREAK);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConsistencyHashUtil.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConsistencyHashUtil.java
new file mode 100644
index 000000000..f5ff51c24
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConsistencyHashUtil.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.utils;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+
+public class ConsistencyHashUtil {
+    public static String hashMurMurHash(String key, int seed) {
+        HashFunction hashFunction = Hashing.murmur3_128(seed);
+        return hashFunction.hashString(key, StandardCharsets.UTF_8).toString();
+    }
+
+    public static String hashMurMurHash(String key) {
+        Random random = new Random(System.currentTimeMillis());
+        return hashMurMurHash(key, random.nextInt());
+    }
+}

Reply via email to