This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d6cd5ef93b [INLONG-11719][SDK] Replace the Sender object in the 
InlongSdkDirtySender class with TcpMsgSender (#11724)
d6cd5ef93b is described below

commit d6cd5ef93b5b989718bcc117829515b7c943d00e
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Fri Feb 7 19:26:19 2025 +0800

    [INLONG-11719][SDK] Replace the Sender object in the InlongSdkDirtySender 
class with TcpMsgSender (#11724)
    
    Co-authored-by: gosonzhang <gosonzh...@tencent.com>
---
 .../inlong/sdk/dirtydata/InlongSdkDirtySender.java | 43 ++++++++++++----------
 1 file changed, 23 insertions(+), 20 deletions(-)

diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
index af7a62fb6d..195df99d9c 100644
--- 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
@@ -17,9 +17,11 @@
 
 package org.apache.inlong.sdk.dirtydata;
 
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 
 import com.google.common.base.Preconditions;
@@ -48,7 +50,8 @@ public class InlongSdkDirtySender {
     private boolean closed = false;
 
     private LinkedBlockingQueue<DirtyMessageWrapper> dirtyDataQueue;
-    private DefaultMessageSender sender;
+    private TcpMsgSender sender;
+    private MsgSenderSingleFactory messageSenderFactory;
     private Executor executor;
 
     public void init() throws Exception {
@@ -57,14 +60,13 @@ public class InlongSdkDirtySender {
         Preconditions.checkNotNull(inlongManagerAddr, "inlongManagerAddr 
cannot be null");
         Preconditions.checkNotNull(authId, "authId cannot be null");
         Preconditions.checkNotNull(authKey, "authKey cannot be null");
-
+        // build sender configure
         TcpMsgSenderConfig proxyClientConfig =
-                new TcpMsgSenderConfig(true,
+                new TcpMsgSenderConfig(false,
                         inlongManagerAddr, inlongManagerPort, inlongGroupId, 
authId, authKey);
-        proxyClientConfig.setOnlyUseLocalProxyConfig(false);
-        proxyClientConfig.setTotalAsyncCallbackSize(maxCallbackSize);
-        this.sender = 
DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
-
+        // build sender factory
+        this.messageSenderFactory = new MsgSenderSingleFactory();
+        this.sender = 
this.messageSenderFactory.genTcpSenderByClusterId(proxyClientConfig);
         this.dirtyDataQueue = new LinkedBlockingQueue<>(maxCallbackSize);
         this.executor = Executors.newSingleThreadExecutor();
         executor.execute(this::doSendDirtyMessage);
@@ -80,6 +82,7 @@ public class InlongSdkDirtySender {
     }
 
     private void doSendDirtyMessage() {
+        ProcessResult procResult = new ProcessResult();
         while (!closed) {
             try {
                 DirtyMessageWrapper messageWrapper = dirtyDataQueue.poll();
@@ -93,30 +96,30 @@ public class InlongSdkDirtySender {
                             messageWrapper);
                     continue;
                 }
-
-                sender.asyncSendMessage(inlongGroupId, inlongStreamId,
-                        messageWrapper.format().getBytes(), new 
LogCallBack(messageWrapper));
-
+                if (!sender.asyncSendMessage(new TcpEventInfo(inlongGroupId, 
inlongStreamId,
+                        System.currentTimeMillis(), null, 
messageWrapper.format().getBytes()),
+                        new LogCallBack(messageWrapper), procResult)) {
+                    dirtyDataQueue.offer(messageWrapper);
+                }
             } catch (Throwable t) {
                 log.error("failed to send inlong dirty message", t);
                 if (!ignoreErrors) {
                     throw new RuntimeException("writing dirty message to 
inlong sdk failed", t);
                 }
             }
-
         }
     }
 
     public void close() {
         closed = true;
         dirtyDataQueue.clear();
-        if (sender != null) {
-            sender.close();
+        if (messageSenderFactory != null) {
+            messageSenderFactory.shutdownAll();
         }
     }
 
     @Getter
-    class LogCallBack implements SendMessageCallback {
+    class LogCallBack implements MsgSendCallback {
 
         private final DirtyMessageWrapper wrapper;
 
@@ -125,8 +128,8 @@ public class InlongSdkDirtySender {
         }
 
         @Override
-        public void onMessageAck(SendResult result) {
-            if (SendResult.OK != result) {
+        public void onMessageAck(ProcessResult result) {
+            if (!result.isSuccess()) {
                 dirtyDataQueue.offer(wrapper);
             }
         }

Reply via email to