This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 00bb1d2e2d [INLONG-10464][SDK] InlongSDK support retry sending when 
failed (#11144)
00bb1d2e2d is described below

commit 00bb1d2e2dbb375c63e3c0b96e3c190e27dbf6f6
Author: emptyOVO <118812562+empty...@users.noreply.github.com>
AuthorDate: Wed Oct 9 12:46:00 2024 +0800

    [INLONG-10464][SDK] InlongSDK support retry sending when failed (#11144)
---
 .../inlong/sdk/dataproxy/ConfigConstants.java      |   2 +
 .../inlong/sdk/dataproxy/DefaultMessageSender.java | 126 ++++++++++++++++-----
 .../inlong/sdk/dataproxy/ProxyClientConfig.java    |   8 ++
 3 files changed, 105 insertions(+), 31 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
index d7bd34f948..45b7b4056d 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
@@ -74,4 +74,6 @@ public class ConfigConstants {
     public static String HTTP = "http://";;
     public static String HTTPS = "https://";;
 
+    public static int DEFAULT_SENDER_MAX_ATTEMPT = 1;
+
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 7b655b29fa..8a833a69cb 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 
 public class DefaultMessageSender implements MessageSender {
 
@@ -65,6 +66,7 @@ public class DefaultMessageSender implements MessageSender {
     private boolean isReport = false;
     private boolean isSupportLF = false;
     private int cpsSize = ConfigConstants.COMPRESS_SIZE;
+    private final int senderMaxAttempt;
 
     public DefaultMessageSender(ProxyClientConfig configure) throws Exception {
         this(configure, null);
@@ -75,6 +77,7 @@ public class DefaultMessageSender implements MessageSender {
         sender = new Sender(configure, selfDefineFactory);
         groupId = configure.getInlongGroupId();
         indexCol = new IndexCollectThread(storeIndex);
+        senderMaxAttempt = configure.getSenderMaxAttempt();
         indexCol.start();
 
     }
@@ -191,11 +194,39 @@ public class DefaultMessageSender implements 
MessageSender {
         return ConfigConstants.PROXY_SDK_VERSION;
     }
 
+    private SendResult attemptSendMessage(Function<Sender, SendResult> 
sendOperation) {
+        int attempts = 0;
+        SendResult sendResult = null;
+        while (attempts < this.senderMaxAttempt) {
+            sendResult = sendOperation.apply(sender);
+            if (sendResult != null && sendResult.equals(SendResult.OK)) {
+                return sendResult;
+            }
+            attempts++;
+        }
+        return sendResult;
+    }
+
+    private String attemptSendMessageIndex(Function<Sender, String> 
sendOperation) {
+        int attempts = 0;
+        String sendIndexResult = null;
+        while (attempts < this.senderMaxAttempt) {
+            sendIndexResult = sendOperation.apply(sender);
+            if (sendIndexResult != null && 
sendIndexResult.startsWith(SendResult.OK.toString())) {
+                return sendIndexResult;
+            }
+            attempts++;
+        }
+        return sendIndexResult;
+    }
+
     @Deprecated
     public SendResult sendMessage(byte[] body, String attributes, String 
msgUUID,
             long timeout, TimeUnit timeUnit) {
-        return sender.syncSendMessage(new EncodeObject(body, attributes,
-                idGenerator.getNextId()), msgUUID, timeout, timeUnit);
+        Function<Sender, SendResult> sendOperation =
+                (sender) -> sender.syncSendMessage(new EncodeObject(body, 
attributes, idGenerator.getNextId()), msgUUID,
+                        timeout, timeUnit);
+        return attemptSendMessage(sendOperation);
     }
 
     public SendResult sendMessage(byte[] body, String groupId, String 
streamId, long dt, String msgUUID,
@@ -235,20 +266,31 @@ public class DefaultMessageSender implements 
MessageSender {
             EncodeObject encodeObject = new EncodeObject(body, msgtype, 
isCompressEnd, isReport,
                     isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), 
groupId, streamId, proxySend);
             encodeObject.setSupportLF(isSupportLF);
-            return sender.syncSendMessage(encodeObject, msgUUID, timeout, 
timeUnit);
+            Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(encodeObject, msgUUID,
+                    timeout, timeUnit);
+            return attemptSendMessage(sendOperation);
         } else if (msgtype == 3 || msgtype == 5) {
             if (isProxySend) {
                 proxySend = "&" + proxySend;
             }
+            final String finalProxySend = proxySend;
+            final long finalDt = dt;
+            Function<Sender, SendResult> sendOperation;
             if (isCompressEnd) {
-                return sender.syncSendMessage(new EncodeObject(body, 
"groupId=" + groupId + "&streamId="
-                        + streamId + "&dt=" + dt + "&cp=snappy" + proxySend, 
idGenerator.getNextId(),
-                        this.getMsgtype(), true, groupId), msgUUID, timeout, 
timeUnit);
+                sendOperation = (sender) -> sender.syncSendMessage(new 
EncodeObject(body,
+                        "groupId=" + groupId + "&streamId=" + streamId + 
"&dt=" + finalDt + "&cp=snappy"
+                                + finalProxySend,
+                        idGenerator.getNextId(), this.getMsgtype(),
+                        true, groupId), msgUUID, timeout, timeUnit);
             } else {
-                return sender.syncSendMessage(new EncodeObject(body,
-                        "groupId=" + groupId + "&streamId=" + streamId + 
"&dt=" + dt + proxySend,
-                        idGenerator.getNextId(), this.getMsgtype(), false, 
groupId), msgUUID, timeout, timeUnit);
+                sendOperation = (sender) -> sender.syncSendMessage(new 
EncodeObject(body,
+                        "groupId=" + groupId + "&streamId=" + streamId + 
"&dt=" + finalDt
+                                + finalProxySend,
+                        idGenerator.getNextId(), this.getMsgtype(),
+                        false, groupId), msgUUID, timeout, timeUnit);
+
             }
+            return attemptSendMessage(sendOperation);
         }
 
         return null;
@@ -294,18 +336,22 @@ public class DefaultMessageSender implements 
MessageSender {
                     isGroupIdTransfer, dt / 1000,
                     idGenerator.getNextInt(), groupId, streamId, 
attrs.toString());
             encodeObject.setSupportLF(isSupportLF);
-            return sender.syncSendMessage(encodeObject, msgUUID, timeout, 
timeUnit);
+            Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(encodeObject, msgUUID,
+                    timeout, timeUnit);
+            return attemptSendMessage(sendOperation);
         } else if (msgtype == 3 || msgtype == 5) {
             
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
             if (isCompressEnd) {
                 attrs.append("&cp=snappy");
-                return sender.syncSendMessage(new EncodeObject(body, 
attrs.toString(),
-                        idGenerator.getNextId(), this.getMsgtype(), true, 
groupId),
+                Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(new EncodeObject(body,
+                        attrs.toString(), idGenerator.getNextId(), 
this.getMsgtype(), true, groupId),
                         msgUUID, timeout, timeUnit);
+                return attemptSendMessage(sendOperation);
             } else {
-                return sender.syncSendMessage(new EncodeObject(body, 
attrs.toString(),
-                        idGenerator.getNextId(), this.getMsgtype(), false, 
groupId), msgUUID,
-                        timeout, timeUnit);
+                Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(new EncodeObject(body,
+                        attrs.toString(), idGenerator.getNextId(), 
this.getMsgtype(), false, groupId),
+                        msgUUID, timeout, timeUnit);
+                return attemptSendMessage(sendOperation);
             }
         }
         return null;
@@ -348,21 +394,30 @@ public class DefaultMessageSender implements 
MessageSender {
                     isGroupIdTransfer, dt / 1000,
                     idGenerator.getNextInt(), groupId, streamId, proxySend);
             encodeObject.setSupportLF(isSupportLF);
-            return sender.syncSendMessage(encodeObject, msgUUID, timeout, 
timeUnit);
+            Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(encodeObject, msgUUID,
+                    timeout, timeUnit);
+            return attemptSendMessage(sendOperation);
         } else if (msgtype == 3 || msgtype == 5) {
             if (isProxySend) {
                 proxySend = "&" + proxySend;
             }
+            final long finalDt = dt;
+            final String finalProxySend = proxySend;
+            Function<Sender, SendResult> sendOperation;
             if (isCompress) {
-                return sender.syncSendMessage(new EncodeObject(bodyList, 
"groupId=" + groupId + "&streamId=" + streamId
-                        + "&dt=" + dt + "&cp=snappy" + "&cnt=" + 
bodyList.size() + proxySend,
-                        idGenerator.getNextId(), this.getMsgtype(), true, 
groupId), msgUUID, timeout, timeUnit);
+                sendOperation = (sender) -> sender.syncSendMessage(new 
EncodeObject(bodyList,
+                        "groupId=" + groupId + "&streamId=" + streamId + 
"&dt=" + finalDt + "&cp=snappy" + "&cnt="
+                                + bodyList.size() + finalProxySend,
+                        idGenerator.getNextId(), this.getMsgtype(),
+                        true, groupId), msgUUID, timeout, timeUnit);
             } else {
-                return sender.syncSendMessage(new EncodeObject(bodyList, 
"groupId=" + groupId + "&streamId=" + streamId
-                        + "&dt=" + dt + "&cnt=" + bodyList.size() + proxySend, 
idGenerator.getNextId(),
-                        this.getMsgtype(),
+                sendOperation = (sender) -> sender.syncSendMessage(new 
EncodeObject(bodyList,
+                        "groupId=" + groupId + "&streamId=" + streamId + 
"&dt=" + finalDt + "&cnt=" + bodyList.size()
+                                + finalProxySend,
+                        idGenerator.getNextId(), this.getMsgtype(),
                         false, groupId), msgUUID, timeout, timeUnit);
             }
+            return attemptSendMessage(sendOperation);
         }
         return null;
     }
@@ -404,19 +459,24 @@ public class DefaultMessageSender implements 
MessageSender {
                     isGroupIdTransfer, dt / 1000,
                     idGenerator.getNextInt(), groupId, streamId, 
attrs.toString());
             encodeObject.setSupportLF(isSupportLF);
-            return sender.syncSendMessage(encodeObject, msgUUID, timeout, 
timeUnit);
+            Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(encodeObject, msgUUID,
+                    timeout, timeUnit);
+            return attemptSendMessage(sendOperation);
         } else if (msgtype == 3 || msgtype == 5) {
             
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
                     
.append("&dt=").append(dt).append("&cnt=").append(bodyList.size());
             if (isCompress) {
                 attrs.append("&cp=snappy");
-                return sender.syncSendMessage(new EncodeObject(bodyList, 
attrs.toString(),
-                        idGenerator.getNextId(), this.getMsgtype(), true, 
groupId),
-                        msgUUID, timeout, timeUnit);
+                Function<Sender, SendResult> sendOperation =
+                        (sender) -> sender.syncSendMessage(new 
EncodeObject(bodyList, attrs.toString(),
+                                idGenerator.getNextId(), this.getMsgtype(), 
true, groupId), msgUUID, timeout, timeUnit);
+                return attemptSendMessage(sendOperation);
             } else {
-                return sender.syncSendMessage(new EncodeObject(bodyList, 
attrs.toString(),
-                        idGenerator.getNextId(), this.getMsgtype(), false, 
groupId),
-                        msgUUID, timeout, timeUnit);
+                Function<Sender, SendResult> sendOperation =
+                        (sender) -> sender.syncSendMessage(new 
EncodeObject(bodyList, attrs.toString(),
+                                idGenerator.getNextId(), this.getMsgtype(), 
false, groupId), msgUUID, timeout,
+                                timeUnit);
+                return attemptSendMessage(sendOperation);
             }
         }
         return null;
@@ -807,7 +867,9 @@ public class DefaultMessageSender implements MessageSender {
                     isReport, isGroupIdTransfer, dt / 1000,
                     sid, groupId, streamId, attrs.toString(), "data", "");
             encodeObject.setSupportLF(isSupportLF);
-            return sender.syncSendMessageIndex(encodeObject, msgUUID, timeout, 
timeUnit);
+            Function<Sender, String> sendOperation = (sender) -> 
sender.syncSendMessageIndex(encodeObject, msgUUID,
+                    timeout, timeUnit);
+            return attemptSendMessageIndex(sendOperation);
         }
         return null;
     }
@@ -822,7 +884,9 @@ public class DefaultMessageSender implements MessageSender {
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(body, msgtype, false, 
isReport,
                     isGroupIdTransfer, dt / 1000, sid, groupId, streamId, "", 
messageKey, ip);
-            return sender.syncSendMessageIndex(encodeObject, msgUUID, timeout, 
timeUnit);
+            Function<Sender, String> sendOperation = (sender) -> 
sender.syncSendMessageIndex(encodeObject, msgUUID,
+                    timeout, timeUnit);
+            return attemptSendMessageIndex(sendOperation);
         }
         return null;
     }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index 05b439d7f1..f866b4b76d 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -100,6 +100,7 @@ public class ProxyClientConfig {
     private LoadBalance loadBalance;
 
     private int maxRetry;
+    private int senderMaxAttempt = ConfigConstants.DEFAULT_SENDER_MAX_ATTEMPT;
 
     /* pay attention to the last url parameter ip */
     public ProxyClientConfig(String localHost, boolean requestByHttp, String 
managerIp,
@@ -548,4 +549,11 @@ public class ProxyClientConfig {
     public void setMaxRetry(int maxRetry) {
         this.maxRetry = maxRetry;
     }
+    public int getSenderMaxAttempt() {
+        return senderMaxAttempt;
+    }
+
+    public void setSenderAttempt(int senderMaxAttempt) {
+        this.senderMaxAttempt = senderMaxAttempt;
+    }
 }

Reply via email to