This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d6cd5ef93b [INLONG-11719][SDK] Replace the Sender object in the InlongSdkDirtySender class with TcpMsgSender (#11724) d6cd5ef93b is described below commit d6cd5ef93b5b989718bcc117829515b7c943d00e Author: Goson Zhang <4675...@qq.com> AuthorDate: Fri Feb 7 19:26:19 2025 +0800 [INLONG-11719][SDK] Replace the Sender object in the InlongSdkDirtySender class with TcpMsgSender (#11724) Co-authored-by: gosonzhang <gosonzh...@tencent.com> --- .../inlong/sdk/dirtydata/InlongSdkDirtySender.java | 43 ++++++++++++---------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java index af7a62fb6d..195df99d9c 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java @@ -17,9 +17,11 @@ package org.apache.inlong.sdk.dirtydata; -import org.apache.inlong.sdk.dataproxy.DefaultMessageSender; -import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; -import org.apache.inlong.sdk.dataproxy.common.SendResult; +import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory; +import org.apache.inlong.sdk.dataproxy.common.ProcessResult; +import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback; +import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo; +import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; import com.google.common.base.Preconditions; @@ -48,7 +50,8 @@ public class InlongSdkDirtySender { private boolean closed = false; private LinkedBlockingQueue<DirtyMessageWrapper> dirtyDataQueue; - private DefaultMessageSender sender; + private TcpMsgSender sender; + private MsgSenderSingleFactory messageSenderFactory; private Executor executor; public void init() throws Exception { @@ -57,14 +60,13 @@ public class InlongSdkDirtySender { Preconditions.checkNotNull(inlongManagerAddr, "inlongManagerAddr cannot be null"); Preconditions.checkNotNull(authId, "authId cannot be null"); Preconditions.checkNotNull(authKey, "authKey cannot be null"); - + // build sender configure TcpMsgSenderConfig proxyClientConfig = - new TcpMsgSenderConfig(true, + new TcpMsgSenderConfig(false, inlongManagerAddr, inlongManagerPort, inlongGroupId, authId, authKey); - proxyClientConfig.setOnlyUseLocalProxyConfig(false); - proxyClientConfig.setTotalAsyncCallbackSize(maxCallbackSize); - this.sender = DefaultMessageSender.generateSenderByClusterId(proxyClientConfig); - + // build sender factory + this.messageSenderFactory = new MsgSenderSingleFactory(); + this.sender = this.messageSenderFactory.genTcpSenderByClusterId(proxyClientConfig); this.dirtyDataQueue = new LinkedBlockingQueue<>(maxCallbackSize); this.executor = Executors.newSingleThreadExecutor(); executor.execute(this::doSendDirtyMessage); @@ -80,6 +82,7 @@ public class InlongSdkDirtySender { } private void doSendDirtyMessage() { + ProcessResult procResult = new ProcessResult(); while (!closed) { try { DirtyMessageWrapper messageWrapper = dirtyDataQueue.poll(); @@ -93,30 +96,30 @@ public class InlongSdkDirtySender { messageWrapper); continue; } - - sender.asyncSendMessage(inlongGroupId, inlongStreamId, - messageWrapper.format().getBytes(), new LogCallBack(messageWrapper)); - + if (!sender.asyncSendMessage(new TcpEventInfo(inlongGroupId, inlongStreamId, + System.currentTimeMillis(), null, messageWrapper.format().getBytes()), + new LogCallBack(messageWrapper), procResult)) { + dirtyDataQueue.offer(messageWrapper); + } } catch (Throwable t) { log.error("failed to send inlong dirty message", t); if (!ignoreErrors) { throw new RuntimeException("writing dirty message to inlong sdk failed", t); } } - } } public void close() { closed = true; dirtyDataQueue.clear(); - if (sender != null) { - sender.close(); + if (messageSenderFactory != null) { + messageSenderFactory.shutdownAll(); } } @Getter - class LogCallBack implements SendMessageCallback { + class LogCallBack implements MsgSendCallback { private final DirtyMessageWrapper wrapper; @@ -125,8 +128,8 @@ public class InlongSdkDirtySender { } @Override - public void onMessageAck(SendResult result) { - if (SendResult.OK != result) { + public void onMessageAck(ProcessResult result) { + if (!result.isSuccess()) { dirtyDataQueue.offer(wrapper); } }