This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new fd6cfba554 [INLONG-11727][SDK] Replace the Sender used in the agent-plugins module with TcpMsgSender (#11728) fd6cfba554 is described below commit fd6cfba554b99eb0f424f4e97550ffee23ce8990 Author: Goson Zhang <4675...@qq.com> AuthorDate: Sat Feb 8 16:12:20 2025 +0800 [INLONG-11727][SDK] Replace the Sender used in the agent-plugins module with TcpMsgSender (#11728) * [INLONG-11727][SDK] Replace the Sender used in the agent-plugins module with TcpMsgSender * fix ut test fails --------- Co-authored-by: gosonzhang <gosonzh...@tencent.com> --- .../apache/inlong/agent/core/HeartbeatManager.java | 7 +++ .../plugin/sinks/filecollect/SenderManager.java | 55 ++++++++++++++-------- .../sinks/filecollect/TestSenderManager.java | 13 ++--- .../dataproxy/network/tcp/codec/DecodeObject.java | 1 + .../sdk/dataproxy/ProxyClientConfigTest.java | 1 + 5 files changed, 51 insertions(+), 26 deletions(-) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java index 1599de55f7..50282be586 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java @@ -28,6 +28,8 @@ import org.apache.inlong.common.enums.ComponentTypeEnum; import org.apache.inlong.common.enums.NodeSrvStatus; import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager; import org.apache.inlong.common.heartbeat.HeartbeatMsg; +import org.apache.inlong.sdk.dataproxy.common.ProcessResult; +import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; @@ -206,6 +208,11 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea ThreadFactory SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-heartbeat", Thread.currentThread().isDaemon()); sender = new InLongTcpMsgSender(proxyClientConfig, SHARED_FACTORY); + // start sender object + ProcessResult procResult = new ProcessResult(); + if (!sender.start(procResult)) { + throw new ProxySdkException("Sender start failure, " + procResult); + } } catch (Throwable ex) { LOGGER.error("heartbeat manager create sdk failed: ", ex); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index b2303feafe..34c1b67f70 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -30,10 +30,13 @@ import org.apache.inlong.agent.plugin.message.SequentialID; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.agent.utils.ThreadUtils; import org.apache.inlong.common.metric.MetricRegister; -import org.apache.inlong.sdk.dataproxy.DefaultMessageSender; -import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; -import org.apache.inlong.sdk.dataproxy.common.SendResult; +import org.apache.inlong.common.msg.MsgType; +import org.apache.inlong.sdk.dataproxy.common.ProcessResult; import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; +import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback; +import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender; +import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo; +import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; import io.netty.util.concurrent.DefaultThreadFactory; @@ -71,7 +74,7 @@ public class SenderManager { private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance(); public static final int RESEND_QUEUE_WAIT_MS = 10; // cache for group and sender list, share the map cross agent lifecycle. - private DefaultMessageSender sender; + private TcpMsgSender sender; private LinkedBlockingQueue<AgentSenderCallback> resendQueue; private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, @@ -200,20 +203,22 @@ public class SenderManager { private void createMessageSender() throws Exception { TcpMsgSenderConfig proxyClientConfig = new TcpMsgSenderConfig( managerAddr, inlongGroupId, authSecretId, authSecretKey); - proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize); + proxyClientConfig.setSendBufferSize(totalAsyncBufSize); proxyClientConfig.setAliveConnections(aliveConnectionNum); proxyClientConfig.setRequestTimeoutMs(maxSenderTimeout * 1000L); - proxyClientConfig.setNettyWorkerThreadNum(ioThreadNum); proxyClientConfig.setEnableEpollBusyWait(enableBusyWait); - + proxyClientConfig.setSdkMsgType(MsgType.valueOf(msgType)); + proxyClientConfig.setEnableDataCompress(isCompress); SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-" + sourcePath, Thread.currentThread().isDaemon()); - - DefaultMessageSender sender = new DefaultMessageSender(proxyClientConfig, SHARED_FACTORY); - sender.setMsgtype(msgType); - sender.setCompress(isCompress); - this.sender = sender; + // build sender object + this.sender = new InLongTcpMsgSender(proxyClientConfig, SHARED_FACTORY); + ProcessResult procResult = new ProcessResult(); + // start sender object + if (!sender.start(procResult)) { + throw new ProxySdkException("Start sender failure, " + procResult); + } } public void sendBatch(SenderMessage message) { @@ -230,7 +235,7 @@ public class SenderManager { */ private void sendBatchWithRetryCount(SenderMessage message, int retry) { boolean suc = false; - while (!suc) { + while (!suc && !shutdown) { try { AgentSenderCallback cb = new AgentSenderCallback(message, retry); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TRY_SEND, message.getGroupId(), @@ -267,11 +272,21 @@ public class SenderManager { } } - private void asyncSendByMessageSender(SendMessageCallback cb, + private void asyncSendByMessageSender(MsgSendCallback cb, List<byte[]> bodyList, String groupId, String streamId, long dataTime, String msgUUID, - Map<String, String> extraAttrMap, boolean isProxySend) throws ProxySdkException { - sender.asyncSendMessage(cb, bodyList, groupId, - streamId, dataTime, msgUUID, extraAttrMap, isProxySend); + Map<String, String> extraAttrMap, boolean isProxySend) throws Exception { + boolean isSuccess; + ProcessResult procResult = new ProcessResult(); + if (isProxySend) { + isSuccess = sender.asyncSendMsgWithSinkAck(new TcpEventInfo( + groupId, streamId, dataTime, msgUUID, extraAttrMap, bodyList), cb, procResult); + } else { + isSuccess = sender.asyncSendMessage(new TcpEventInfo( + groupId, streamId, dataTime, msgUUID, extraAttrMap, bodyList), cb, procResult); + } + if (!isSuccess) { + throw new ProxySdkException("Send message failure, " + procResult); + } } /** @@ -330,7 +345,7 @@ public class SenderManager { /** * sender callback */ - private class AgentSenderCallback implements SendMessageCallback { + private class AgentSenderCallback implements MsgSendCallback { private final int retry; private final SenderMessage message; @@ -343,13 +358,13 @@ public class SenderManager { } @Override - public void onMessageAck(SendResult result) { + public void onMessageAck(ProcessResult result) { String groupId = message.getGroupId(); String streamId = message.getStreamId(); String taskId = message.getTaskId(); String instanceId = message.getInstanceId(); long dataTime = message.getDataTime(); - if (result != null && result.equals(SendResult.OK)) { + if (result.isSuccess()) { message.getOffsetAckList().forEach(ack -> ack.setHasAck(true)); getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(msgCnt); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java index 508e21588f..4e068f5930 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java @@ -27,8 +27,9 @@ import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; import org.apache.inlong.agent.plugin.task.logcollection.local.FileDataUtils; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.common.enums.TaskStateEnum; -import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; -import org.apache.inlong.sdk.dataproxy.common.SendResult; +import org.apache.inlong.sdk.dataproxy.common.ErrorCode; +import org.apache.inlong.sdk.dataproxy.common.ProcessResult; +import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback; import org.junit.AfterClass; import org.junit.Assert; @@ -84,14 +85,14 @@ public class TestSenderManager { @Test public void testNormalAck() { - List<SendMessageCallback> cbList = new ArrayList<>(); + List<MsgSendCallback> cbList = new ArrayList<>(); try { profile.set(TaskConstants.INODE_INFO, FileDataUtils.getInodeInfo(profile.getInstanceId())); SenderManager senderManager = PowerMockito.spy(new SenderManager(profile, "inlongGroupId", "sourceName")); PowerMockito.doNothing().when(senderManager, "createMessageSender"); PowerMockito.doAnswer(invocation -> { - SendMessageCallback cb = invocation.getArgument(0); + MsgSendCallback cb = invocation.getArgument(0); cbList.add(cb); return null; }).when(senderManager, "asyncSendByMessageSender", Mockito.any(), @@ -115,11 +116,11 @@ public class TestSenderManager { } Assert.assertTrue(cbList.size() == 10); for (int i = 0; i < 5; i++) { - cbList.get(4 - i).onMessageAck(SendResult.OK); + cbList.get(4 - i).onMessageAck(new ProcessResult(ErrorCode.OK)); } Assert.assertTrue(calHasAckCount(ackInfoListTotal) == 5); for (int i = 5; i < 10; i++) { - cbList.get(i).onMessageAck(SendResult.OK); + cbList.get(i).onMessageAck(new ProcessResult(ErrorCode.OK)); AgentUtils.silenceSleepInMs(10); } Assert.assertTrue(String.valueOf(calHasAckCount(ackInfoListTotal)), calHasAckCount(ackInfoListTotal) == 10); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java index 669f8330de..d9b24b0aae 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java @@ -84,6 +84,7 @@ public class DecodeObject { private void handleAttr(String attributes) { if (StringUtils.isBlank(attributes)) { + this.procResult = new ProcessResult(ErrorCode.OK); return; } retAttr = new HashMap<>(MAP_SPLITTER.split(attributes)); diff --git a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java index 0173651169..68a08a00b9 100644 --- a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java +++ b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java @@ -29,6 +29,7 @@ public class ProxyClientConfigTest { public void testManagerConfig() throws Exception { HttpMsgSenderConfig httpConfig = new HttpMsgSenderConfig( "http://127.0.0.1:800", "test_id", "secretId", "secretKey"); + httpConfig.setHttpAsyncRptPoolConfig(30, 20); HttpMsgSenderConfig httpConfig1 = httpConfig.clone(); Assert.assertEquals(httpConfig, httpConfig1); httpConfig1.setRegionName("sz");