This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 00bb1d2e2d [INLONG-10464][SDK] InlongSDK support retry sending when failed (#11144) 00bb1d2e2d is described below commit 00bb1d2e2dbb375c63e3c0b96e3c190e27dbf6f6 Author: emptyOVO <118812562+empty...@users.noreply.github.com> AuthorDate: Wed Oct 9 12:46:00 2024 +0800 [INLONG-10464][SDK] InlongSDK support retry sending when failed (#11144) --- .../inlong/sdk/dataproxy/ConfigConstants.java | 2 + .../inlong/sdk/dataproxy/DefaultMessageSender.java | 126 ++++++++++++++++----- .../inlong/sdk/dataproxy/ProxyClientConfig.java | 8 ++ 3 files changed, 105 insertions(+), 31 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java index d7bd34f948..45b7b4056d 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java @@ -74,4 +74,6 @@ public class ConfigConstants { public static String HTTP = "http://"; public static String HTTPS = "https://"; + public static int DEFAULT_SENDER_MAX_ATTEMPT = 1; + } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java index 7b655b29fa..8a833a69cb 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java @@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; public class DefaultMessageSender implements MessageSender { @@ -65,6 +66,7 @@ public class DefaultMessageSender implements MessageSender { private boolean isReport = false; private boolean isSupportLF = false; private int cpsSize = ConfigConstants.COMPRESS_SIZE; + private final int senderMaxAttempt; public DefaultMessageSender(ProxyClientConfig configure) throws Exception { this(configure, null); @@ -75,6 +77,7 @@ public class DefaultMessageSender implements MessageSender { sender = new Sender(configure, selfDefineFactory); groupId = configure.getInlongGroupId(); indexCol = new IndexCollectThread(storeIndex); + senderMaxAttempt = configure.getSenderMaxAttempt(); indexCol.start(); } @@ -191,11 +194,39 @@ public class DefaultMessageSender implements MessageSender { return ConfigConstants.PROXY_SDK_VERSION; } + private SendResult attemptSendMessage(Function<Sender, SendResult> sendOperation) { + int attempts = 0; + SendResult sendResult = null; + while (attempts < this.senderMaxAttempt) { + sendResult = sendOperation.apply(sender); + if (sendResult != null && sendResult.equals(SendResult.OK)) { + return sendResult; + } + attempts++; + } + return sendResult; + } + + private String attemptSendMessageIndex(Function<Sender, String> sendOperation) { + int attempts = 0; + String sendIndexResult = null; + while (attempts < this.senderMaxAttempt) { + sendIndexResult = sendOperation.apply(sender); + if (sendIndexResult != null && sendIndexResult.startsWith(SendResult.OK.toString())) { + return sendIndexResult; + } + attempts++; + } + return sendIndexResult; + } + @Deprecated public SendResult sendMessage(byte[] body, String attributes, String msgUUID, long timeout, TimeUnit timeUnit) { - return sender.syncSendMessage(new EncodeObject(body, attributes, - idGenerator.getNextId()), msgUUID, timeout, timeUnit); + Function<Sender, SendResult> sendOperation = + (sender) -> sender.syncSendMessage(new EncodeObject(body, attributes, idGenerator.getNextId()), msgUUID, + timeout, timeUnit); + return attemptSendMessage(sendOperation); } public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, @@ -235,20 +266,31 @@ public class DefaultMessageSender implements MessageSender { EncodeObject encodeObject = new EncodeObject(body, msgtype, isCompressEnd, isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend); encodeObject.setSupportLF(isSupportLF); - return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit); + Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID, + timeout, timeUnit); + return attemptSendMessage(sendOperation); } else if (msgtype == 3 || msgtype == 5) { if (isProxySend) { proxySend = "&" + proxySend; } + final String finalProxySend = proxySend; + final long finalDt = dt; + Function<Sender, SendResult> sendOperation; if (isCompressEnd) { - return sender.syncSendMessage(new EncodeObject(body, "groupId=" + groupId + "&streamId=" - + streamId + "&dt=" + dt + "&cp=snappy" + proxySend, idGenerator.getNextId(), - this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit); + sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(body, + "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + "&cp=snappy" + + finalProxySend, + idGenerator.getNextId(), this.getMsgtype(), + true, groupId), msgUUID, timeout, timeUnit); } else { - return sender.syncSendMessage(new EncodeObject(body, - "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt + proxySend, - idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit); + sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(body, + "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + + finalProxySend, + idGenerator.getNextId(), this.getMsgtype(), + false, groupId), msgUUID, timeout, timeUnit); + } + return attemptSendMessage(sendOperation); } return null; @@ -294,18 +336,22 @@ public class DefaultMessageSender implements MessageSender { isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, attrs.toString()); encodeObject.setSupportLF(isSupportLF); - return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit); + Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID, + timeout, timeUnit); + return attemptSendMessage(sendOperation); } else if (msgtype == 3 || msgtype == 5) { attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt); if (isCompressEnd) { attrs.append("&cp=snappy"); - return sender.syncSendMessage(new EncodeObject(body, attrs.toString(), - idGenerator.getNextId(), this.getMsgtype(), true, groupId), + Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(body, + attrs.toString(), idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit); + return attemptSendMessage(sendOperation); } else { - return sender.syncSendMessage(new EncodeObject(body, attrs.toString(), - idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, - timeout, timeUnit); + Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(body, + attrs.toString(), idGenerator.getNextId(), this.getMsgtype(), false, groupId), + msgUUID, timeout, timeUnit); + return attemptSendMessage(sendOperation); } } return null; @@ -348,21 +394,30 @@ public class DefaultMessageSender implements MessageSender { isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend); encodeObject.setSupportLF(isSupportLF); - return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit); + Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID, + timeout, timeUnit); + return attemptSendMessage(sendOperation); } else if (msgtype == 3 || msgtype == 5) { if (isProxySend) { proxySend = "&" + proxySend; } + final long finalDt = dt; + final String finalProxySend = proxySend; + Function<Sender, SendResult> sendOperation; if (isCompress) { - return sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId - + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size() + proxySend, - idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit); + sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(bodyList, + "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + "&cp=snappy" + "&cnt=" + + bodyList.size() + finalProxySend, + idGenerator.getNextId(), this.getMsgtype(), + true, groupId), msgUUID, timeout, timeUnit); } else { - return sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId - + "&dt=" + dt + "&cnt=" + bodyList.size() + proxySend, idGenerator.getNextId(), - this.getMsgtype(), + sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(bodyList, + "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + "&cnt=" + bodyList.size() + + finalProxySend, + idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit); } + return attemptSendMessage(sendOperation); } return null; } @@ -404,19 +459,24 @@ public class DefaultMessageSender implements MessageSender { isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, attrs.toString()); encodeObject.setSupportLF(isSupportLF); - return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit); + Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID, + timeout, timeUnit); + return attemptSendMessage(sendOperation); } else if (msgtype == 3 || msgtype == 5) { attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId) .append("&dt=").append(dt).append("&cnt=").append(bodyList.size()); if (isCompress) { attrs.append("&cp=snappy"); - return sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(), - idGenerator.getNextId(), this.getMsgtype(), true, groupId), - msgUUID, timeout, timeUnit); + Function<Sender, SendResult> sendOperation = + (sender) -> sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(), + idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit); + return attemptSendMessage(sendOperation); } else { - return sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(), - idGenerator.getNextId(), this.getMsgtype(), false, groupId), - msgUUID, timeout, timeUnit); + Function<Sender, SendResult> sendOperation = + (sender) -> sender.syncSendMessage(new EncodeObject(bodyList, attrs.toString(), + idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, + timeUnit); + return attemptSendMessage(sendOperation); } } return null; @@ -807,7 +867,9 @@ public class DefaultMessageSender implements MessageSender { isReport, isGroupIdTransfer, dt / 1000, sid, groupId, streamId, attrs.toString(), "data", ""); encodeObject.setSupportLF(isSupportLF); - return sender.syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit); + Function<Sender, String> sendOperation = (sender) -> sender.syncSendMessageIndex(encodeObject, msgUUID, + timeout, timeUnit); + return attemptSendMessageIndex(sendOperation); } return null; } @@ -822,7 +884,9 @@ public class DefaultMessageSender implements MessageSender { if (msgtype == 7 || msgtype == 8) { EncodeObject encodeObject = new EncodeObject(body, msgtype, false, isReport, isGroupIdTransfer, dt / 1000, sid, groupId, streamId, "", messageKey, ip); - return sender.syncSendMessageIndex(encodeObject, msgUUID, timeout, timeUnit); + Function<Sender, String> sendOperation = (sender) -> sender.syncSendMessageIndex(encodeObject, msgUUID, + timeout, timeUnit); + return attemptSendMessageIndex(sendOperation); } return null; } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java index 05b439d7f1..f866b4b76d 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java @@ -100,6 +100,7 @@ public class ProxyClientConfig { private LoadBalance loadBalance; private int maxRetry; + private int senderMaxAttempt = ConfigConstants.DEFAULT_SENDER_MAX_ATTEMPT; /* pay attention to the last url parameter ip */ public ProxyClientConfig(String localHost, boolean requestByHttp, String managerIp, @@ -548,4 +549,11 @@ public class ProxyClientConfig { public void setMaxRetry(int maxRetry) { this.maxRetry = maxRetry; } + public int getSenderMaxAttempt() { + return senderMaxAttempt; + } + + public void setSenderAttempt(int senderMaxAttempt) { + this.senderMaxAttempt = senderMaxAttempt; + } }