This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fd6cfba554 [INLONG-11727][SDK] Replace the Sender used in the 
agent-plugins module with TcpMsgSender (#11728)
fd6cfba554 is described below

commit fd6cfba554b99eb0f424f4e97550ffee23ce8990
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Sat Feb 8 16:12:20 2025 +0800

    [INLONG-11727][SDK] Replace the Sender used in the agent-plugins module 
with TcpMsgSender (#11728)
    
    * [INLONG-11727][SDK] Replace the Sender used in the agent-plugins module 
with TcpMsgSender
    
    * fix ut test fails
    
    ---------
    
    Co-authored-by: gosonzhang <gosonzh...@tencent.com>
---
 .../apache/inlong/agent/core/HeartbeatManager.java |  7 +++
 .../plugin/sinks/filecollect/SenderManager.java    | 55 ++++++++++++++--------
 .../sinks/filecollect/TestSenderManager.java       | 13 ++---
 .../dataproxy/network/tcp/codec/DecodeObject.java  |  1 +
 .../sdk/dataproxy/ProxyClientConfigTest.java       |  1 +
 5 files changed, 51 insertions(+), 26 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 1599de55f7..50282be586 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -28,6 +28,8 @@ import org.apache.inlong.common.enums.ComponentTypeEnum;
 import org.apache.inlong.common.enums.NodeSrvStatus;
 import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
 import org.apache.inlong.common.heartbeat.HeartbeatMsg;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
@@ -206,6 +208,11 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
             ThreadFactory SHARED_FACTORY = new 
DefaultThreadFactory("agent-sender-manager-heartbeat",
                     Thread.currentThread().isDaemon());
             sender = new InLongTcpMsgSender(proxyClientConfig, SHARED_FACTORY);
+            // start sender object
+            ProcessResult procResult = new ProcessResult();
+            if (!sender.start(procResult)) {
+                throw new ProxySdkException("Sender start failure, " + 
procResult);
+            }
         } catch (Throwable ex) {
             LOGGER.error("heartbeat manager create sdk failed: ", ex);
         }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index b2303feafe..34c1b67f70 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -30,10 +30,13 @@ import org.apache.inlong.agent.plugin.message.SequentialID;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.ThreadUtils;
 import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
 import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -71,7 +74,7 @@ public class SenderManager {
     private static final SequentialID SEQUENTIAL_ID = 
SequentialID.getInstance();
     public static final int RESEND_QUEUE_WAIT_MS = 10;
     // cache for group and sender list, share the map cross agent lifecycle.
-    private DefaultMessageSender sender;
+    private TcpMsgSender sender;
     private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
     private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
             0, Integer.MAX_VALUE,
@@ -200,20 +203,22 @@ public class SenderManager {
     private void createMessageSender() throws Exception {
         TcpMsgSenderConfig proxyClientConfig = new TcpMsgSenderConfig(
                 managerAddr, inlongGroupId, authSecretId, authSecretKey);
-        proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
+        proxyClientConfig.setSendBufferSize(totalAsyncBufSize);
         proxyClientConfig.setAliveConnections(aliveConnectionNum);
         proxyClientConfig.setRequestTimeoutMs(maxSenderTimeout * 1000L);
-
         proxyClientConfig.setNettyWorkerThreadNum(ioThreadNum);
         proxyClientConfig.setEnableEpollBusyWait(enableBusyWait);
-
+        proxyClientConfig.setSdkMsgType(MsgType.valueOf(msgType));
+        proxyClientConfig.setEnableDataCompress(isCompress);
         SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-" + 
sourcePath,
                 Thread.currentThread().isDaemon());
-
-        DefaultMessageSender sender = new 
DefaultMessageSender(proxyClientConfig, SHARED_FACTORY);
-        sender.setMsgtype(msgType);
-        sender.setCompress(isCompress);
-        this.sender = sender;
+        // build sender object
+        this.sender = new InLongTcpMsgSender(proxyClientConfig, 
SHARED_FACTORY);
+        ProcessResult procResult = new ProcessResult();
+        // start sender object
+        if (!sender.start(procResult)) {
+            throw new ProxySdkException("Start sender failure, " + procResult);
+        }
     }
 
     public void sendBatch(SenderMessage message) {
@@ -230,7 +235,7 @@ public class SenderManager {
      */
     private void sendBatchWithRetryCount(SenderMessage message, int retry) {
         boolean suc = false;
-        while (!suc) {
+        while (!suc && !shutdown) {
             try {
                 AgentSenderCallback cb = new AgentSenderCallback(message, 
retry);
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND, 
message.getGroupId(),
@@ -267,11 +272,21 @@ public class SenderManager {
         }
     }
 
-    private void asyncSendByMessageSender(SendMessageCallback cb,
+    private void asyncSendByMessageSender(MsgSendCallback cb,
             List<byte[]> bodyList, String groupId, String streamId, long 
dataTime, String msgUUID,
-            Map<String, String> extraAttrMap, boolean isProxySend) throws 
ProxySdkException {
-        sender.asyncSendMessage(cb, bodyList, groupId,
-                streamId, dataTime, msgUUID, extraAttrMap, isProxySend);
+            Map<String, String> extraAttrMap, boolean isProxySend) throws 
Exception {
+        boolean isSuccess;
+        ProcessResult procResult = new ProcessResult();
+        if (isProxySend) {
+            isSuccess = sender.asyncSendMsgWithSinkAck(new TcpEventInfo(
+                    groupId, streamId, dataTime, msgUUID, extraAttrMap, 
bodyList), cb, procResult);
+        } else {
+            isSuccess = sender.asyncSendMessage(new TcpEventInfo(
+                    groupId, streamId, dataTime, msgUUID, extraAttrMap, 
bodyList), cb, procResult);
+        }
+        if (!isSuccess) {
+            throw new ProxySdkException("Send message failure, " + procResult);
+        }
     }
 
     /**
@@ -330,7 +345,7 @@ public class SenderManager {
     /**
      * sender callback
      */
-    private class AgentSenderCallback implements SendMessageCallback {
+    private class AgentSenderCallback implements MsgSendCallback {
 
         private final int retry;
         private final SenderMessage message;
@@ -343,13 +358,13 @@ public class SenderManager {
         }
 
         @Override
-        public void onMessageAck(SendResult result) {
+        public void onMessageAck(ProcessResult result) {
             String groupId = message.getGroupId();
             String streamId = message.getStreamId();
             String taskId = message.getTaskId();
             String instanceId = message.getInstanceId();
             long dataTime = message.getDataTime();
-            if (result != null && result.equals(SendResult.OK)) {
+            if (result.isSuccess()) {
                 message.getOffsetAckList().forEach(ack -> ack.setHasAck(true));
                 getMetricItem(groupId, 
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, 
groupId, streamId,
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
index 508e21588f..4e068f5930 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -27,8 +27,9 @@ import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
 import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.common.enums.TaskStateEnum;
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
 
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -84,14 +85,14 @@ public class TestSenderManager {
 
     @Test
     public void testNormalAck() {
-        List<SendMessageCallback> cbList = new ArrayList<>();
+        List<MsgSendCallback> cbList = new ArrayList<>();
         try {
             profile.set(TaskConstants.INODE_INFO, 
FileDataUtils.getInodeInfo(profile.getInstanceId()));
             SenderManager senderManager = PowerMockito.spy(new 
SenderManager(profile, "inlongGroupId", "sourceName"));
             PowerMockito.doNothing().when(senderManager, 
"createMessageSender");
 
             PowerMockito.doAnswer(invocation -> {
-                SendMessageCallback cb = invocation.getArgument(0);
+                MsgSendCallback cb = invocation.getArgument(0);
                 cbList.add(cb);
                 return null;
             }).when(senderManager, "asyncSendByMessageSender", Mockito.any(),
@@ -115,11 +116,11 @@ public class TestSenderManager {
             }
             Assert.assertTrue(cbList.size() == 10);
             for (int i = 0; i < 5; i++) {
-                cbList.get(4 - i).onMessageAck(SendResult.OK);
+                cbList.get(4 - i).onMessageAck(new 
ProcessResult(ErrorCode.OK));
             }
             Assert.assertTrue(calHasAckCount(ackInfoListTotal) == 5);
             for (int i = 5; i < 10; i++) {
-                cbList.get(i).onMessageAck(SendResult.OK);
+                cbList.get(i).onMessageAck(new ProcessResult(ErrorCode.OK));
                 AgentUtils.silenceSleepInMs(10);
             }
             
Assert.assertTrue(String.valueOf(calHasAckCount(ackInfoListTotal)), 
calHasAckCount(ackInfoListTotal) == 10);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
index 669f8330de..d9b24b0aae 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
@@ -84,6 +84,7 @@ public class DecodeObject {
 
     private void handleAttr(String attributes) {
         if (StringUtils.isBlank(attributes)) {
+            this.procResult = new ProcessResult(ErrorCode.OK);
             return;
         }
         retAttr = new HashMap<>(MAP_SPLITTER.split(attributes));
diff --git 
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
index 0173651169..68a08a00b9 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
@@ -29,6 +29,7 @@ public class ProxyClientConfigTest {
     public void testManagerConfig() throws Exception {
         HttpMsgSenderConfig httpConfig = new HttpMsgSenderConfig(
                 "http://127.0.0.1:800";, "test_id", "secretId", "secretKey");
+        httpConfig.setHttpAsyncRptPoolConfig(30, 20);
         HttpMsgSenderConfig httpConfig1 = httpConfig.clone();
         Assert.assertEquals(httpConfig, httpConfig1);
         httpConfig1.setRegionName("sz");

Reply via email to