This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7ef33f5a5 [INLONG-5101][DataProxy] Optimize load balancing for
DataProxy (#5595)
7ef33f5a5 is described below
commit 7ef33f5a586433a4a7ac3f1f1f1e4e28250a7a7d
Author: rhizoma-atractylodis
<[email protected]>
AuthorDate: Fri Sep 2 14:17:26 2022 +0800
[INLONG-5101][DataProxy] Optimize load balancing for DataProxy (#5595)
---
.../inlong/sdk/dataproxy/ConfigConstants.java | 3 +
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 126 ++++----
.../apache/inlong/sdk/dataproxy/LoadBalance.java | 45 +++
.../inlong/sdk/dataproxy/ProxyClientConfig.java | 47 ++-
.../sdk/dataproxy/config/ProxyConfigEntry.java | 3 +-
.../sdk/dataproxy/config/ProxyConfigManager.java | 26 +-
.../inlong/sdk/dataproxy/network/ClientMgr.java | 350 ++++++++++++++-------
.../inlong/sdk/dataproxy/network/HashRing.java | 111 +++++++
.../inlong/sdk/dataproxy/network/NettyClient.java | 18 +-
.../inlong/sdk/dataproxy/network/Sender.java | 41 ++-
.../sdk/dataproxy/utils/ConsistencyHashUtil.java | 36 +++
11 files changed, 583 insertions(+), 223 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
index a5e826c47..67fa7a2a8 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
@@ -68,5 +68,8 @@ public class ConfigConstants {
public static final int FLAG_ALLOW_COMPRESS = 1 << 5;
public static final String MANAGER_DATAPROXY_API =
"/inlong/manager/openapi/dataproxy/getIpList/";
+ public static LoadBalance DEFAULT_LOAD_BALANCE = LoadBalance.ROBIN;
+ public static int DEFAULT_VIRTUAL_NODE = 1000;
+ public static int DEFAULT_RANDOM_MAX_RETRY = 1000;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index e5e3496cd..2dc1c2ca5 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -95,12 +95,12 @@ public class DefaultMessageSender implements MessageSender {
/**
* generate by cluster id
*
- * @param configure - sender
+ * @param configure - sender
* @param selfDefineFactory - sender factory
* @return - sender
*/
public static DefaultMessageSender
generateSenderByClusterId(ProxyClientConfig configure,
- ThreadFactory selfDefineFactory) throws Exception {
+ ThreadFactory
selfDefineFactory) throws Exception {
ProxyConfigManager proxyConfigManager = new
ProxyConfigManager(configure,
Utils.getLocalIp(), null);
proxyConfigManager.setGroupId(configure.getGroupId());
@@ -188,13 +188,13 @@ public class DefaultMessageSender implements
MessageSender {
@Deprecated
public SendResult sendMessage(byte[] body, String attributes, String
msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long timeout, TimeUnit timeUnit) {
return sender.syncSendMessage(new EncodeObject(body, attributes,
idGenerator.getNextId()), msgUUID, timeout, timeUnit);
}
public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long timeout, TimeUnit timeUnit) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
@@ -210,13 +210,13 @@ public class DefaultMessageSender implements
MessageSender {
return sender.syncSendMessage(encodeObject, msgUUID, timeout,
timeUnit);
} else if (msgtype == 3 || msgtype == 5) {
if (isCompressEnd) {
- return sender.syncSendMessage(new EncodeObject(body,
"groupId=" + groupId
- + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy",
- idGenerator.getNextId(), this.getMsgtype(), true,
groupId), msgUUID, timeout, timeUnit);
+ return sender.syncSendMessage(new EncodeObject(body,
"groupId=" + groupId + "&streamId="
+ + streamId + "&dt=" + dt + "&cp=snappy",
idGenerator.getNextId(), this.getMsgtype(),
+ true, groupId), msgUUID, timeout, timeUnit);
} else {
- return sender.syncSendMessage(new EncodeObject(body,
"groupId=" + groupId
- + "&streamId=" + streamId + "&dt=" + dt,
- idGenerator.getNextId(), this.getMsgtype(), false,
groupId), msgUUID, timeout, timeUnit);
+ return sender.syncSendMessage(new EncodeObject(body,
"groupId=" + groupId + "&streamId="
+ + streamId + "&dt=" + dt,
idGenerator.getNextId(), this.getMsgtype(), false, groupId),
+ msgUUID, timeout, timeUnit);
}
}
@@ -224,7 +224,7 @@ public class DefaultMessageSender implements MessageSender {
}
public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap)
{
+ long timeout, TimeUnit timeUnit, Map<String,
String> extraAttrMap) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) ||
!ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -247,10 +247,12 @@ public class DefaultMessageSender implements
MessageSender {
if (isCompressEnd) {
attrs.append("&cp=snappy");
return sender.syncSendMessage(new EncodeObject(body,
attrs.toString(),
- idGenerator.getNextId(), this.getMsgtype(), true,
groupId), msgUUID, timeout, timeUnit);
+ idGenerator.getNextId(), this.getMsgtype(),
true, groupId),
+ msgUUID, timeout, timeUnit);
} else {
return sender.syncSendMessage(new EncodeObject(body,
attrs.toString(),
- idGenerator.getNextId(), this.getMsgtype(), false,
groupId), msgUUID, timeout, timeUnit);
+ idGenerator.getNextId(), this.getMsgtype(),
false, groupId), msgUUID,
+ timeout, timeUnit);
}
}
return null;
@@ -258,7 +260,7 @@ public class DefaultMessageSender implements MessageSender {
}
public SendResult sendMessage(List<byte[]> bodyList, String groupId,
String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long timeout, TimeUnit timeUnit) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
@@ -274,19 +276,19 @@ public class DefaultMessageSender implements
MessageSender {
} else if (msgtype == 3 || msgtype == 5) {
if (isCompress) {
return sender.syncSendMessage(new EncodeObject(bodyList,
"groupId=" + groupId + "&streamId=" + streamId
- + "&dt=" + dt + "&cp=snappy" + "&cnt=" +
bodyList.size(),
- idGenerator.getNextId(), this.getMsgtype(), true,
groupId), msgUUID, timeout, timeUnit);
+ + "&dt=" + dt + "&cp=snappy" + "&cnt=" +
bodyList.size(), idGenerator.getNextId(),
+ this.getMsgtype(), true, groupId), msgUUID, timeout,
timeUnit);
} else {
return sender.syncSendMessage(new EncodeObject(bodyList,
"groupId=" + groupId + "&streamId=" + streamId
- + "&dt=" + dt + "&cnt=" + bodyList.size(),
- idGenerator.getNextId(), this.getMsgtype(), false,
groupId), msgUUID, timeout, timeUnit);
+ + "&dt=" + dt + "&cnt=" + bodyList.size(),
idGenerator.getNextId(), this.getMsgtype(),
+ false, groupId), msgUUID, timeout, timeUnit);
}
}
return null;
}
- public SendResult sendMessage(List<byte[]> bodyList, String groupId,
String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap)
{
+ public SendResult sendMessage(List<byte[]> bodyList, String groupId,
String streamId, long dt,
+ String msgUUID, long timeout, TimeUnit
timeUnit, Map<String, String> extraAttrMap) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -307,25 +309,27 @@ public class DefaultMessageSender implements
MessageSender {
if (isCompress) {
attrs.append("&cp=snappy");
return sender.syncSendMessage(new EncodeObject(bodyList,
attrs.toString(),
- idGenerator.getNextId(), this.getMsgtype(), true,
groupId), msgUUID, timeout, timeUnit);
+ idGenerator.getNextId(), this.getMsgtype(),
true, groupId),
+ msgUUID, timeout, timeUnit);
} else {
return sender.syncSendMessage(new EncodeObject(bodyList,
attrs.toString(),
- idGenerator.getNextId(), this.getMsgtype(), false,
groupId), msgUUID, timeout, timeUnit);
+ idGenerator.getNextId(), this.getMsgtype(),
false, groupId),
+ msgUUID, timeout, timeUnit);
}
}
return null;
}
@Deprecated
- public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String attributes, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String attributes,
+ String msgUUID, long timeout, TimeUnit
timeUnit) throws ProxysdkException {
sender.asyncSendMessage(new EncodeObject(body, attributes,
idGenerator.getNextId()),
callback, msgUUID, timeout, timeUnit);
}
public void asyncSendMessage(SendMessageCallback callback, byte[] body,
- String groupId, String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ String groupId, String streamId, long dt,
String msgUUID,
+ long timeout, TimeUnit timeUnit) throws
ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -356,10 +360,9 @@ public class DefaultMessageSender implements MessageSender
{
}
- public void asyncSendMessage(SendMessageCallback callback,
- byte[] body, String groupId, String streamId, long dt, String
msgUUID,
- long timeout, TimeUnit timeUnit,
- Map<String, String> extraAttrMap) throws ProxysdkException {
+ public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId,
+ long dt, String msgUUID, long timeout,
TimeUnit timeUnit,
+ Map<String, String> extraAttrMap) throws
ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) ||
!ProxyUtils.isAttrKeysValid(extraAttrMap)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -390,8 +393,8 @@ public class DefaultMessageSender implements MessageSender {
}
public void asyncSendMessage(SendMessageCallback callback, List<byte[]>
bodyList,
- String groupId, String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ String groupId, String streamId, long dt,
String msgUUID,
+ long timeout, TimeUnit timeUnit) throws
ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -407,23 +410,21 @@ public class DefaultMessageSender implements
MessageSender {
if (isCompress) {
sender.asyncSendMessage(
new EncodeObject(bodyList, "groupId=" + groupId +
"&streamId=" + streamId
- + "&dt=" + dt + "&cp=snappy" + "&cnt=" +
bodyList.size(),
- idGenerator.getNextId(), this.getMsgtype(),
- true, groupId), callback, msgUUID, timeout,
timeUnit);
+ + "&dt=" + dt + "&cp=snappy" + "&cnt=" +
bodyList.size(), idGenerator.getNextId(),
+ this.getMsgtype(), true, groupId), callback,
msgUUID, timeout, timeUnit);
} else {
sender.asyncSendMessage(
- new EncodeObject(bodyList, "groupId=" + groupId +
"&streamId="
- + streamId + "&dt=" + dt + "&cnt=" +
bodyList.size(),
- idGenerator.getNextId(), this.getMsgtype(),
+ new EncodeObject(bodyList, "groupId=" + groupId +
"&streamId=" + streamId + "&dt=" + dt
+ + "&cnt=" + bodyList.size(),
idGenerator.getNextId(), this.getMsgtype(),
false, groupId), callback, msgUUID, timeout,
timeUnit);
}
}
}
public void asyncSendMessage(SendMessageCallback callback,
- List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID,
- long timeout, TimeUnit timeUnit,
- Map<String, String> extraAttrMap) throws ProxysdkException {
+ List<byte[]> bodyList, String groupId, String
streamId, long dt, String msgUUID,
+ long timeout, TimeUnit timeUnit,
+ Map<String, String> extraAttrMap) throws
ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -465,7 +466,7 @@ public class DefaultMessageSender implements MessageSender {
*/
@Override
public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
byte[] body,
- SendMessageCallback callback) throws ProxysdkException {
+ SendMessageCallback callback) throws
ProxysdkException {
this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId,
System.currentTimeMillis(),
idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT,
DEFAULT_SEND_TIMEUNIT);
}
@@ -481,7 +482,7 @@ public class DefaultMessageSender implements MessageSender {
*/
@Override
public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
List<byte[]> bodyList,
- SendMessageCallback callback) throws ProxysdkException {
+ SendMessageCallback callback) throws
ProxysdkException {
this.asyncSendMessage(callback, bodyList, inlongGroupId,
inlongStreamId, System.currentTimeMillis(),
idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT,
DEFAULT_SEND_TIMEUNIT);
}
@@ -501,9 +502,9 @@ public class DefaultMessageSender implements MessageSender {
}
public void asyncsendMessageData(FileCallback callback, List<byte[]>
bodyList, String groupId,
- String streamId, long dt, int sid, boolean isSupportLF, String
msgUUID,
- long timeout, TimeUnit timeUnit,
- Map<String, String> extraAttrMap) throws ProxysdkException {
+ String streamId, long dt, int sid,
boolean isSupportLF, String msgUUID,
+ long timeout, TimeUnit timeUnit,
+ Map<String, String> extraAttrMap) throws
ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -523,8 +524,8 @@ public class DefaultMessageSender implements MessageSender {
}
private void asyncSendMetric(FileCallback callback, byte[] body, String
groupId,
- String streamId, long dt, int sid, String ip, String msgUUID,
- long timeout, TimeUnit timeUnit, String messageKey) throws
ProxysdkException {
+ String streamId, long dt, int sid, String ip,
String msgUUID,
+ long timeout, TimeUnit timeUnit, String
messageKey) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
throw new
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -538,20 +539,22 @@ public class DefaultMessageSender implements
MessageSender {
}
public void asyncsendMessageProxy(FileCallback callback, byte[] body,
String groupId, String streamId,
- long dt, int sid, String ip, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
- asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip,
msgUUID, timeout, timeUnit, "minute");
+ long dt, int sid, String ip, String
msgUUID,
+ long timeout, TimeUnit timeUnit) throws
ProxysdkException {
+ asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip,
msgUUID, timeout,
+ timeUnit, "minute");
}
public void asyncsendMessageFile(FileCallback callback, byte[] body,
String groupId,
- String streamId, long dt, int sid, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
- asyncSendMetric(callback, body, groupId, streamId, dt, sid, "",
msgUUID, timeout, timeUnit, "file");
+ String streamId, long dt, int sid, String
msgUUID,
+ long timeout, TimeUnit timeUnit) throws
ProxysdkException {
+ asyncSendMetric(callback, body, groupId, streamId, dt, sid, "",
msgUUID, timeout, timeUnit,
+ "file");
}
public String sendMessageData(List<byte[]> bodyList, String groupId,
- String streamId, long dt, int sid, boolean isSupportLF, String
msgUUID,
- long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap)
{
+ String streamId, long dt, int sid, boolean
isSupportLF, String msgUUID,
+ long timeout, TimeUnit timeUnit, Map<String,
String> extraAttrMap) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
|| !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -571,8 +574,9 @@ public class DefaultMessageSender implements MessageSender {
return null;
}
- private String sendMetric(byte[] body, String groupId, String streamId,
long dt, int sid, String ip, String msgUUID,
- long timeout, TimeUnit timeUnit, String messageKey) {
+ private String sendMetric(byte[] body, String groupId, String streamId,
long dt, int sid, String ip,
+ String msgUUID,
+ long timeout, TimeUnit timeUnit, String
messageKey) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES.toString();
@@ -586,13 +590,13 @@ public class DefaultMessageSender implements
MessageSender {
}
public String sendMessageProxy(byte[] body, String groupId, String
streamId,
- long dt, int sid, String ip, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long dt, int sid, String ip, String msgUUID,
+ long timeout, TimeUnit timeUnit) {
return sendMetric(body, groupId, streamId, dt, sid, ip, msgUUID,
timeout, timeUnit, "minute");
}
public String sendMessageFile(byte[] body, String groupId, String
streamId, long dt, int sid, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long timeout, TimeUnit timeUnit) {
return sendMetric(body, groupId, streamId, dt, sid, "", msgUUID,
timeout, timeUnit, "file");
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java
new file mode 100644
index 000000000..ff239288e
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy;
+
+public enum LoadBalance {
+
+ RANDOM("random", 0),
+ ROBIN("robin", 1),
+ CONSISTENCY_HASH("consistency hash", 2),
+ WEIGHT_RANDOM("weight random", 3),
+ WEIGHT_ROBIN("weight robin", 4);
+
+ private String name;
+ private int index;
+
+ private LoadBalance(String name, int index) {
+ this.name = name;
+ this.index = index;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index 30bedcdb7..964c43be3 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -88,17 +88,22 @@ public class ProxyClientConfig {
private int metricIntervalInMs = 60 * 1000;
// max cache time for proxy config.
private long maxProxyCacheTimeInMs = 30 * 60 * 1000;
-
// metric groupId
private String metricGroupId = "inlong_sla_metric";
private int ioThreadNum = Runtime.getRuntime().availableProcessors();
private boolean enableBusyWait = false;
+ private int virtualNode;
+
+ private LoadBalance loadBalance;
+
+ private int maxRetry;
+
/*pay attention to the last url parameter ip*/
public ProxyClientConfig(String localHost, boolean isLocalVisit, String
managerIp,
- int managerPort, String groupId, String netTag, String
authSecretId, String authSecretKey)
- throws ProxysdkException {
+ int managerPort, String groupId, String netTag,
String authSecretId, String authSecretKey,
+ LoadBalance loadBalance, int virtualNode, int
maxRetry) throws ProxysdkException {
if (Utils.isBlank(localHost)) {
throw new ProxysdkException("localHost is blank!");
}
@@ -126,6 +131,16 @@ public class ProxyClientConfig {
this.setRequestTimeoutMillis(ConfigConstants.DEFAULT_SEND_BUFFER_SIZE);
this.authSecretId = authSecretId;
this.authSecretKey = authSecretKey;
+ this.loadBalance = loadBalance;
+ this.virtualNode = virtualNode;
+ this.maxRetry = maxRetry;
+ }
+
+ public ProxyClientConfig(String localHost, boolean isLocalVisit, String
managerIp, int managerPort, String groupId,
+ String netTag, String authSecretId, String
authSecretKey) throws ProxysdkException {
+ this(localHost, isLocalVisit, managerIp, managerPort, groupId, netTag,
authSecretId, authSecretKey,
+ ConfigConstants.DEFAULT_LOAD_BALANCE,
ConfigConstants.DEFAULT_VIRTUAL_NODE,
+ ConfigConstants.DEFAULT_RANDOM_MAX_RETRY);
}
public String getTlsServerCertFilePathAndName() {
@@ -289,7 +304,7 @@ public class ProxyClientConfig {
}
public void setAuthenticationInfo(boolean needAuthentication, boolean
needDataEncry,
- final String userName, final String secretKey) {
+ final String userName, final String
secretKey) {
this.needAuthentication = needAuthentication;
this.isNeedDataEncry = needDataEncry;
if (this.needAuthentication || this.isNeedDataEncry) {
@@ -459,4 +474,28 @@ public class ProxyClientConfig {
public void setEnableBusyWait(boolean enableBusyWait) {
this.enableBusyWait = enableBusyWait;
}
+
+ public int getVirtualNode() {
+ return virtualNode;
+ }
+
+ public void setVirtualNode(int virtualNode) {
+ this.virtualNode = virtualNode;
+ }
+
+ public LoadBalance getLoadBalance() {
+ return loadBalance;
+ }
+
+ public void setLoadBalance(LoadBalance loadBalance) {
+ this.loadBalance = loadBalance;
+ }
+
+ public int getMaxRetry() {
+ return maxRetry;
+ }
+
+ public void setMaxRetry(int maxRetry) {
+ this.maxRetry = maxRetry;
+ }
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
index c975fafa4..5e82b4558 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
@@ -81,8 +81,7 @@ public class ProxyConfigEntry implements java.io.Serializable
{
@Override
public String toString() {
return "ProxyConfigEntry [hostMap=" + hostMap + ", load=" + load + ",
size=" + size + ", isInterVisit="
- + isInterVisit + ", groupId=" + groupId
- + ", switch=" + switchStat + "]";
+ + isInterVisit + ", groupId=" + groupId + ", switch=" +
switchStat + "]";
}
public int getClusterId() {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index a69b91478..f0f3acc7f 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -45,12 +45,13 @@ import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.EntityUtils;
-import org.apache.inlong.common.util.BasicAuth;
import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo;
import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
+import org.apache.inlong.common.util.BasicAuth;
import org.apache.inlong.sdk.dataproxy.ConfigConstants;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
+import org.apache.inlong.sdk.dataproxy.network.HashRing;
import org.apache.inlong.sdk.dataproxy.network.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,22 +59,22 @@ import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.FileWriter;
import java.io.ObjectInputStream;
+import java.io.FileOutputStream;
import java.io.ObjectOutputStream;
+import java.io.FileWriter;
+import java.io.FileReader;
import java.io.UnsupportedEncodingException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.Random;
import java.util.List;
+import java.util.ArrayList;
import java.util.Map;
-import java.util.Random;
+import java.util.HashMap;
+import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -93,6 +94,7 @@ public class ProxyConfigManager extends Thread {
private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private final JsonParser jsonParser = new JsonParser();
private final Gson gson = new Gson();
+ private final HashRing hashRing = HashRing.getInstance();
private List<HostInfo> proxyInfoList = new ArrayList<HostInfo>();
/*the status of the cluster.if this value is changed,we need rechoose
three proxy*/
private int oldStat = 0;
@@ -106,6 +108,7 @@ public class ProxyConfigManager extends Thread {
this.clientConfig = configure;
this.localIP = localIP;
this.clientManager = clientManager;
+ this.hashRing.setVirtualNode(configure.getVirtualNode());
}
public String getGroupId() {
@@ -297,6 +300,7 @@ public class ProxyConfigManager extends Thread {
}
}
compareProxyList(proxyEntry);
+
}
/**
@@ -340,6 +344,7 @@ public class ProxyConfigManager extends Thread {
newProxyInfoList.clear();
LOGGER.info("proxy IP list doesn't change, load {}",
proxyEntry.getLoad());
}
+ updateHashRing(proxyInfoList);
} else {
LOGGER.error("proxyEntry's size is zero");
}
@@ -819,4 +824,9 @@ public class ProxyConfigManager extends Thread {
}
return localManagerIps;
}
+
+ public void updateHashRing(List<HostInfo> newHosts) {
+ this.hashRing.updateNode(newHosts);
+ LOGGER.info("update hash ring {}", hashRing.getVirtualNode2RealNode());
+ }
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
index dee9e8e3c..d01074307 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
@@ -24,25 +24,28 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.inlong.sdk.dataproxy.ConfigConstants;
+import org.apache.inlong.sdk.dataproxy.LoadBalance;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
+import org.apache.inlong.sdk.dataproxy.utils.ConsistencyHashUtil;
import org.apache.inlong.sdk.dataproxy.utils.EventLoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.List;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
+import java.util.Map;
import java.util.HashMap;
+import java.util.Random;
+import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.Comparator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
@@ -51,22 +54,27 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ClientMgr {
private static final Logger logger = LoggerFactory
.getLogger(ClientMgr.class);
-
+ private static final int[] weight = {
+ 1, 1, 1, 1, 1,
+ 2, 2, 2, 2, 2,
+ 3, 3, 3, 3, 3,
+ 6, 6, 6, 6, 6,
+ 12, 12, 12, 12, 12,
+ 48, 96, 192, 384, 1000};
private final Map<HostInfo, NettyClient> clientMapData = new
ConcurrentHashMap<>();
-
private final ConcurrentHashMap<HostInfo, NettyClient> clientMapHB = new
ConcurrentHashMap<>();
// clientMapData + clientMapHB = clientMap
private final ConcurrentHashMap<HostInfo, NettyClient> clientMap = new
ConcurrentHashMap<>();
-
private final ConcurrentHashMap<HostInfo, AtomicLong> lastBadHostMap = new
ConcurrentHashMap<>();
-
// clientList is the valueSet of clientMapData
private final ArrayList<NettyClient> clientList = new ArrayList<>();
- private List<HostInfo> proxyInfoList = new ArrayList<>();
-
private final Map<HostInfo, int[]> channelLoadMapData = new
ConcurrentHashMap<>();
private final Map<HostInfo, int[]> channelLoadMapHB = new
ConcurrentHashMap<>();
-
+ /**
+ * Lock to protect FSNamesystem.
+ */
+ private final ReentrantReadWriteLock fsLock = new
ReentrantReadWriteLock(true);
+ private List<HostInfo> proxyInfoList = new ArrayList<>();
private Bootstrap bootstrap;
private int currentIndex = 0;
private ProxyClientConfig configure;
@@ -75,24 +83,72 @@ public class ClientMgr {
private int realSize;
private SendHBThread sendHBThread;
private ProxyConfigManager ipManager;
-
private int groupIdNum = 0;
private String groupId = "";
private Map<String, Integer> streamIdMap = new HashMap<String, Integer>();
+ // private static final int total_weight = 240;
private int loadThreshold;
private int loadCycle = 0;
- private static final int[] weight = {
- 1, 1, 1, 1, 1,
- 2, 2, 2, 2, 2,
- 3, 3, 3, 3, 3,
- 6, 6, 6, 6, 6,
- 12, 12, 12, 12, 12,
- 48, 96, 192, 384, 1000};
-// private static final int total_weight = 240;
+ private LoadBalance loadBalance;
+
+ public ClientMgr(ProxyClientConfig configure, Sender sender) throws
Exception {
+ this(configure, sender, null);
+ }
+
/**
- * Lock to protect FSNamesystem.
+ * Build up the connection between the server and client.
*/
- private final ReentrantReadWriteLock fsLock = new
ReentrantReadWriteLock(true);
+ public ClientMgr(ProxyClientConfig configure, Sender sender, ThreadFactory
selfDefineFactory) throws Exception {
+ /* Initialize the bootstrap. */
+ if (selfDefineFactory == null) {
+ selfDefineFactory = new DefaultThreadFactory("agent-client-io",
+ Thread.currentThread().isDaemon());
+ }
+ EventLoopGroup eventLoopGroup =
EventLoopUtil.newEventLoopGroup(configure.getIoThreadNum(),
+ configure.isEnableBusyWait(), selfDefineFactory);
+ bootstrap = new Bootstrap();
+ bootstrap.group(eventLoopGroup);
+
bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
+ bootstrap.option(ChannelOption.SO_RCVBUF,
ConfigConstants.DEFAULT_RECEIVE_BUFFER_SIZE);
+ bootstrap.option(ChannelOption.SO_SNDBUF,
ConfigConstants.DEFAULT_SEND_BUFFER_SIZE);
+ if (configure.getNetTag().equals("bobcat")) {
+ bootstrap.option(ChannelOption.IP_TOS, 96);
+ }
+ bootstrap.handler(new ClientPipelineFactory(this, sender));
+ /* ready to Start the thread which refreshes the proxy list. */
+ ipManager = new ProxyConfigManager(configure, Utils.getLocalIp(),
this);
+ ipManager.setName("proxyConfigManager");
+ if (configure.getGroupId() != null) {
+ ipManager.setGroupId(configure.getGroupId());
+ groupId = configure.getGroupId();
+ }
+
+ /*
+ * Request the IP before starting, so that we already have three
+ * connections.
+ */
+ this.configure = configure;
+ this.sender = sender;
+ this.aliveConnections = configure.getAliveConnections();
+ this.loadBalance = configure.getLoadBalance();
+
+ try {
+ ipManager.doProxyEntryQueryWork();
+ } catch (IOException e) {
+ e.printStackTrace();
+ logger.info(e.getMessage());
+ }
+ ipManager.setDaemon(true);
+ ipManager.start();
+
+ this.sendHBThread = new SendHBThread();
+ this.sendHBThread.setName("SendHBThread");
+ this.sendHBThread.start();
+ }
+
+ public LoadBalance getLoadBalance() {
+ return this.loadBalance;
+ }
public int getLoadThreshold() {
return loadThreshold;
@@ -134,6 +190,50 @@ public class ClientMgr {
return proxyInfoList;
}
+ public void setProxyInfoList(List<HostInfo> proxyInfoList) {
+ try {
+ /* Close and remove old client. */
+ writeLock();
+ this.proxyInfoList = proxyInfoList;
+
+ if (loadThreshold == 0) {
+ if (aliveConnections >= proxyInfoList.size()) {
+ realSize = proxyInfoList.size();
+ aliveConnections = realSize;
+ logger.error("there is no enough proxy to work!");
+ } else {
+ realSize = aliveConnections;
+ }
+ } else {
+ if (aliveConnections >= proxyInfoList.size()) {
+ realSize = proxyInfoList.size();
+ aliveConnections = realSize;
+ logger.error("there is no idle proxy to choose for
balancing!");
+ } else if ((aliveConnections + 4) > proxyInfoList.size()) {
+ realSize = proxyInfoList.size();
+ logger.warn("there is only {} idle proxy to choose for
balancing!",
+ proxyInfoList.size() - aliveConnections);
+ } else {
+ realSize = aliveConnections + 4;
+ }
+ }
+
+ List<HostInfo> hostInfos = getRealHosts(proxyInfoList, realSize);
+
+ /* Refresh the current channel connections. */
+ updateAllConnection(hostInfos);
+
+ logger.info(
+ "update all connection ,client map size {},client list
size {}",
+ clientMapData.size(), clientList.size());
+
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ } finally {
+ writeUnlock();
+ }
+ }
+
public int getAliveConnections() {
return aliveConnections;
}
@@ -174,60 +274,6 @@ public class ClientMgr {
return hasReadLock() || hasWriteLock();
}
- public ClientMgr(ProxyClientConfig configure, Sender sender) throws
Exception {
- this(configure, sender, null);
- }
-
- /**
- * Build up the connection between the server and client.
- */
- public ClientMgr(ProxyClientConfig configure, Sender sender, ThreadFactory
selfDefineFactory) throws Exception {
- /* Initialize the bootstrap. */
- if (selfDefineFactory == null) {
- selfDefineFactory = new DefaultThreadFactory("agent-client-io",
- Thread.currentThread().isDaemon());
- }
- EventLoopGroup eventLoopGroup =
EventLoopUtil.newEventLoopGroup(configure.getIoThreadNum(),
- configure.isEnableBusyWait(), selfDefineFactory);
- bootstrap = new Bootstrap();
- bootstrap.group(eventLoopGroup);
-
bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
- bootstrap.option(ChannelOption.SO_RCVBUF,
ConfigConstants.DEFAULT_RECEIVE_BUFFER_SIZE);
- bootstrap.option(ChannelOption.SO_SNDBUF,
ConfigConstants.DEFAULT_SEND_BUFFER_SIZE);
- if (configure.getNetTag().equals("bobcat")) {
- bootstrap.option(ChannelOption.IP_TOS, 96);
- }
- bootstrap.handler(new ClientPipelineFactory(this, sender));
- /* ready to Start the thread which refreshes the proxy list. */
- ipManager = new ProxyConfigManager(configure, Utils.getLocalIp(),
this);
- ipManager.setName("proxyConfigManager");
- if (configure.getGroupId() != null) {
- ipManager.setGroupId(configure.getGroupId());
- groupId = configure.getGroupId();
- }
-
- /*
- * Request the IP before starting, so that we already have three
- * connections.
- */
- this.configure = configure;
- this.sender = sender;
- this.aliveConnections = configure.getAliveConnections();
-
- try {
- ipManager.doProxyEntryQueryWork();
- } catch (IOException e) {
- e.printStackTrace();
- logger.info(e.getMessage());
- }
- ipManager.setDaemon(true);
- ipManager.start();
-
- this.sendHBThread = new SendHBThread();
- this.sendHBThread.setName("SendHBThread");
- this.sendHBThread.start();
- }
-
public ProxyConfigEntry getGroupIdConfigureInfo() throws Exception {
return ipManager.getGroupIdConfigure();
}
@@ -348,6 +394,90 @@ public class ClientMgr {
return client;
}
+ public synchronized NettyClient getClientByRandom() {
+ NettyClient client;
+ if (clientList.isEmpty()) {
+ return null;
+ }
+ int currSize = clientList.size();
+ int maxRetry = this.configure.getMaxRetry();
+ Random random = new Random(System.currentTimeMillis());
+ do {
+ int randomId = random.nextInt();
+ client = clientList.get(randomId % currSize);
+ if (client != null && client.isActive()) {
+ break;
+ }
+ maxRetry--;
+ } while (maxRetry > 0);
+ if (client == null || !client.isActive()) {
+ return null;
+ }
+ return client;
+ }
+
+// public synchronized NettyClient getClientByLeastConnections() {}
+
+ public synchronized NettyClient getClientByConsistencyHash(String
messageId) {
+ NettyClient client;
+ if (clientList.isEmpty()) {
+ return null;
+ }
+ String hash = ConsistencyHashUtil.hashMurMurHash(messageId);
+ HashRing cluster = HashRing.getInstance();
+ HostInfo info = cluster.getNode(hash);
+ client = this.clientMap.get(info);
+ return client;
+ }
+
+ public synchronized NettyClient getClientByWeightRoundRobin() {
+ NettyClient client = null;
+ double maxWeight = Double.MIN_VALUE;
+ int clientId = 0;
+ if (clientList.isEmpty()) {
+ return null;
+ }
+ int currSize = clientList.size();
+ for (int retryTime = 0; retryTime < currSize; retryTime++) {
+ currentIndex = (++currentIndex) % currSize;
+ client = clientList.get(currentIndex);
+ if (client != null && client.isActive() && client.getWeight() >
maxWeight) {
+ clientId = currentIndex;
+ }
+ }
+ if (client == null || !client.isActive()) {
+ return null;
+ }
+ return clientList.get(clientId);
+ }
+
+// public synchronized NettyClient getClientByWeightLeastConnections(){}
+
+ public synchronized NettyClient getClientByWeightRandom() {
+ NettyClient client;
+ double maxWeight = Double.MIN_VALUE;
+ int clientId = 0;
+ if (clientList.isEmpty()) {
+ return null;
+ }
+ int currSize = clientList.size();
+ int maxRetry = this.configure.getMaxRetry();
+ Random random = new Random(System.currentTimeMillis());
+ do {
+ int randomId = random.nextInt();
+ client = clientList.get(randomId % currSize);
+ if (client != null && client.isActive()) {
+ clientId = randomId;
+ break;
+ }
+ maxRetry--;
+ } while (maxRetry > 0);
+ if (client == null || !client.isActive()) {
+ return null;
+ }
+ return clientList.get(clientId);
+ }
+
public NettyClient getContainProxy(String proxyip) {
if (proxyip == null) {
return null;
@@ -854,50 +984,6 @@ public class ClientMgr {
}
- public void setProxyInfoList(List<HostInfo> proxyInfoList) {
- try {
- /* Close and remove old client. */
- writeLock();
- this.proxyInfoList = proxyInfoList;
-
- if (loadThreshold == 0) {
- if (aliveConnections >= proxyInfoList.size()) {
- realSize = proxyInfoList.size();
- aliveConnections = realSize;
- logger.error("there is no enough proxy to work!");
- } else {
- realSize = aliveConnections;
- }
- } else {
- if (aliveConnections >= proxyInfoList.size()) {
- realSize = proxyInfoList.size();
- aliveConnections = realSize;
- logger.error("there is no idle proxy to choose for
balancing!");
- } else if ((aliveConnections + 4) > proxyInfoList.size()) {
- realSize = proxyInfoList.size();
- logger.warn("there is only {} idle proxy to choose for
balancing!",
- proxyInfoList.size() - aliveConnections);
- } else {
- realSize = aliveConnections + 4;
- }
- }
-
- List<HostInfo> hostInfos = getRealHosts(proxyInfoList, realSize);
-
- /* Refresh the current channel connections. */
- updateAllConnection(hostInfos);
-
- logger.info(
- "update all connection ,client map size {},client list
size {}",
- clientMapData.size(), clientList.size());
-
- } catch (Exception e) {
- logger.error(e.getMessage());
- } finally {
- writeUnlock();
- }
- }
-
private List<HostInfo> getRealHosts(List<HostInfo> hostList, int realSize)
{
if (realSize > hostList.size()) {
return hostList;
@@ -911,10 +997,32 @@ public class ClientMgr {
return resultHosts;
}
+ public NettyClient getClient(LoadBalance loadBalance, EncodeObject
encodeObject) {
+ NettyClient client = null;
+ switch (loadBalance) {
+ case RANDOM:
+ client = getClientByRandom();
+ break;
+ case CONSISTENCY_HASH:
+ client =
getClientByConsistencyHash(encodeObject.getMessageId());
+ break;
+ case ROBIN:
+ client = getClientByRoundRobin();
+ break;
+ case WEIGHT_ROBIN:
+ client = getClientByWeightRoundRobin();
+ break;
+ case WEIGHT_RANDOM:
+ client = getClientByWeightRandom();
+ break;
+ }
+ return client;
+ }
+
private class SendHBThread extends Thread {
- private boolean bShutDown = false;
private final int[] random = {17, 19, 23, 31, 37};
+ private boolean bShutDown = false;
public SendHBThread() {
bShutDown = false;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HashRing.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HashRing.java
new file mode 100644
index 000000000..00db8ae39
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HashRing.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network;
+
+import org.apache.inlong.sdk.dataproxy.ConfigConstants;
+import org.apache.inlong.sdk.dataproxy.config.HostInfo;
+import org.apache.inlong.sdk.dataproxy.utils.ConsistencyHashUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.TreeMap;
+import java.util.SortedMap;
+import java.util.stream.Collectors;
+
+public class HashRing {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HashRing.class);
+ private static volatile HashRing instance;
+ private int virtualNode = ConfigConstants.DEFAULT_VIRTUAL_NODE;
+ private TreeMap<String, HostInfo> virtualNode2RealNode;
+ private List<HostInfo> nodeList;
+
+ private HashRing() {
+ this.virtualNode2RealNode = new TreeMap<>();
+ this.nodeList = new ArrayList<>();
+ }
+
+ public static HashRing getInstance() {
+ if (instance == null) {
+ synchronized (HashRing.class) {
+ if (instance == null) {
+ instance = new HashRing();
+ }
+ }
+ }
+ return instance;
+ }
+
+ public TreeMap<String, HostInfo> getVirtualNode2RealNode() {
+ return virtualNode2RealNode;
+ }
+
+ public String node2VirtualNode(HostInfo node, int index) {
+ return "virtual&&" + index + "&&" + node.toString();
+ }
+
+ public void setVirtualNode(int virtualNode) {
+ this.virtualNode = virtualNode;
+ }
+
+ public HostInfo getNode(String key) {
+ String hash = ConsistencyHashUtil.hashMurMurHash(key);
+ SortedMap<String, HostInfo> tailMap =
this.virtualNode2RealNode.tailMap(hash);
+ HostInfo node;
+ if (tailMap.isEmpty()) {
+ node =
this.virtualNode2RealNode.get(this.virtualNode2RealNode.firstKey());
+ } else {
+ node = this.virtualNode2RealNode.get(tailMap.firstKey());
+ }
+ LOGGER.info("{} located to {}", key, node);
+ return node;
+ }
+
+ private synchronized void extendNode(List<HostInfo> nodes) {
+ this.nodeList.addAll(nodes);
+ for (HostInfo host : this.nodeList) {
+ for (int i = 0; i < this.virtualNode; i++) {
+ String key = node2VirtualNode(host, i);
+ String hash = ConsistencyHashUtil.hashMurMurHash(key);
+ virtualNode2RealNode.put(hash, host);
+ }
+ }
+ LOGGER.info("append node list {}", nodes);
+ }
+
+ private synchronized void removeNode(List<HostInfo> hosts) {
+ for (HostInfo host : hosts) {
+ this.nodeList.remove(host);
+ for (int i = 0; i < this.virtualNode; i++) {
+ String hash =
ConsistencyHashUtil.hashMurMurHash(node2VirtualNode(host, i));
+ virtualNode2RealNode.remove(hash);
+ }
+ }
+ LOGGER.info("remove node list {}", hosts);
+ }
+
+ public synchronized void updateNode(List<HostInfo> nodes) {
+ List<HostInfo> newHosts = new ArrayList<>(nodes);
+ List<HostInfo> oldHosts = new ArrayList<>(this.nodeList);
+ List<HostInfo> append = newHosts.stream().filter(host ->
!oldHosts.contains(host)).collect(Collectors.toList());
+ List<HostInfo> remove = oldHosts.stream().filter(host ->
!newHosts.contains(host)).collect(Collectors.toList());
+ extendNode(append);
+ removeNode(remove);
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
index cc3e50410..4d32917de 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
@@ -18,20 +18,22 @@
package org.apache.inlong.sdk.dataproxy.network;
+import com.sun.management.OperatingSystemMXBean;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
-import java.net.InetSocketAddress;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
public class NettyClient {
private static final Logger logger =
LoggerFactory.getLogger(NettyClient.class);
@@ -213,4 +215,10 @@ public class NettyClient {
setState(ConnState.BUSY);
}
+ public double getWeight() {
+ OperatingSystemMXBean operatingSystemMXBean =
+ (OperatingSystemMXBean)
ManagementFactory.getOperatingSystemMXBean();
+ return operatingSystemMXBean.getSystemLoadAverage();
+ }
+
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 5f2c72ebc..49be1ce54 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -22,10 +22,10 @@ import io.netty.channel.Channel;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.sdk.dataproxy.FileCallback;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.FileCallback;
import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.threads.MetricWorkerThread;
@@ -36,11 +36,11 @@ import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -179,8 +179,8 @@ public class Sender {
}
private SendResult syncSendInternalMessage(NettyClient client,
- EncodeObject encodeObject, String msgUUID,
- long timeout, TimeUnit timeUnit)
+ EncodeObject encodeObject,
String msgUUID,
+ long timeout, TimeUnit timeUnit)
throws ExecutionException, InterruptedException, TimeoutException {
if (client == null) {
@@ -228,12 +228,10 @@ public class Sender {
* @param timeUnit
* @return
*/
- public SendResult syncSendMessage(EncodeObject encodeObject, String
msgUUID,
- long timeout, TimeUnit timeUnit) {
- metricWorker.recordNumByKey(encodeObject.getMessageId(),
- encodeObject.getGroupId(), encodeObject.getStreamId(),
+ public SendResult syncSendMessage(EncodeObject encodeObject, String
msgUUID, long timeout, TimeUnit timeUnit) {
+ metricWorker.recordNumByKey(encodeObject.getMessageId(),
encodeObject.getGroupId(), encodeObject.getStreamId(),
Utils.getLocalIp(), encodeObject.getDt(),
encodeObject.getPackageTime(), encodeObject.getRealCnt());
- NettyClient client = clientMgr.getClientByRoundRobin();
+ NettyClient client = clientMgr.getClient(clientMgr.getLoadBalance(),
encodeObject);
SendResult message = null;
try {
message = syncSendInternalMessage(client, encodeObject, msgUUID,
timeout, timeUnit);
@@ -281,9 +279,9 @@ public class Sender {
return message;
}
- private SendResult syncSendMessageIndexInternal(NettyClient client,
- EncodeObject encodeObject, String msgUUID, long timeout,
- TimeUnit timeUnit) throws ExecutionException,
InterruptedException, TimeoutException {
+ private SendResult syncSendMessageIndexInternal(NettyClient client,
EncodeObject encodeObject, String msgUUID,
+ long timeout, TimeUnit
timeUnit)
+ throws ExecutionException, InterruptedException, TimeoutException {
if (client == null || !client.isActive()) {
chooseProxy.remove(encodeObject.getMessageId());
client = clientMgr.getClientByRoundRobin();
@@ -329,8 +327,7 @@ public class Sender {
* @param timeUnit
* @return
*/
- public String syncSendMessageIndex(EncodeObject encodeObject, String
msgUUID, long timeout,
- TimeUnit timeUnit) {
+ public String syncSendMessageIndex(EncodeObject encodeObject, String
msgUUID, long timeout, TimeUnit timeUnit) {
try {
SendResult message = null;
NettyClient client = chooseProxy.get(encodeObject.getMessageId());
@@ -396,9 +393,8 @@ public class Sender {
* @param timeUnit
* @throws ProxysdkException
*/
- public void asyncSendMessageIndex(EncodeObject encodeObject,
- FileCallback callback, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ public void asyncSendMessageIndex(EncodeObject encodeObject, FileCallback
callback, String msgUUID, long timeout,
+ TimeUnit timeUnit) throws
ProxysdkException {
NettyClient client = chooseProxy.get(encodeObject.getMessageId());
String proxyip = encodeObject.getProxyIp();
if (proxyip != null && proxyip.length() != 0) {
@@ -514,14 +510,14 @@ public class Sender {
* Following methods used by asynchronously message sending.
*/
public void asyncSendMessage(EncodeObject encodeObject,
SendMessageCallback callback, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ long timeout, TimeUnit timeUnit) throws
ProxysdkException {
metricWorker.recordNumByKey(encodeObject.getMessageId(),
encodeObject.getGroupId(),
encodeObject.getStreamId(), Utils.getLocalIp(),
encodeObject.getPackageTime(),
encodeObject.getDt(), encodeObject.getRealCnt());
// send message package time
- NettyClient client = clientMgr.getClientByRoundRobin();
+ NettyClient client = clientMgr.getClient(clientMgr.getLoadBalance(),
encodeObject);
if (client == null) {
throw new ProxysdkException(SendResult.NO_CONNECTION.toString());
}
@@ -620,7 +616,8 @@ public class Sender {
continue;
}
if (isFile) {
- ((FileCallback)
queueObject.getCallback()).onMessageAck(SendResult.CONNECTION_BREAK.toString());
+ ((FileCallback) queueObject.getCallback())
+
.onMessageAck(SendResult.CONNECTION_BREAK.toString());
currentBufferSize.addAndGet(-queueObject.getSize());
} else {
queueObject.getCallback().onMessageAck(SendResult.CONNECTION_BREAK);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConsistencyHashUtil.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConsistencyHashUtil.java
new file mode 100644
index 000000000..f5ff51c24
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ConsistencyHashUtil.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.utils;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+
+public class ConsistencyHashUtil {
+ public static String hashMurMurHash(String key, int seed) {
+ HashFunction hashFunction = Hashing.murmur3_128(seed);
+ return hashFunction.hashString(key, StandardCharsets.UTF_8).toString();
+ }
+
+ public static String hashMurMurHash(String key) {
+ Random random = new Random(System.currentTimeMillis());
+ return hashMurMurHash(key, random.nextInt());
+ }
+}