This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6915519ac2 [INLONG-11475][SDK] Remove the timeout parameter in the 
MessageSender class functions (#11476)
6915519ac2 is described below

commit 6915519ac2fa755bb35408399dcdc992c3c6af85
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Mon Nov 11 08:58:52 2024 +0800

    [INLONG-11475][SDK] Remove the timeout parameter in the MessageSender class 
functions (#11476)
    
    Co-authored-by: gosonzhang <gosonzh...@tencent.com>
---
 .../inlong/agent/core/AgentStatusManager.java      |  3 +-
 .../inlong/agent/core/FileStaticManager.java       |  3 +-
 .../apache/inlong/agent/core/HeartbeatManager.java |  1 +
 .../inlong/sdk/dataproxy/ConfigConstants.java      | 15 ++--
 .../inlong/sdk/dataproxy/DefaultMessageSender.java | 96 ++++++++++++----------
 .../apache/inlong/sdk/dataproxy/MessageSender.java | 23 ++----
 .../inlong/sdk/dataproxy/ProxyClientConfig.java    | 52 ++++++++----
 .../sdk/dataproxy/example/SendMsgThread.java       |  4 +-
 .../sdk/dataproxy/example/TcpClientExample.java    |  4 +-
 .../inlong/sdk/dataproxy/network/NettyClient.java  |  5 +-
 .../inlong/sdk/dataproxy/network/Sender.java       | 16 ++--
 .../sdk/dataproxy/pb/PbProtocolMessageSender.java  | 74 +++++++----------
 12 files changed, 148 insertions(+), 148 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
index 307c79f1da..7292c1a577 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
@@ -42,7 +42,6 @@ import java.math.RoundingMode;
 import java.nio.charset.StandardCharsets;
 import java.text.SimpleDateFormat;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
@@ -168,7 +167,7 @@ public class AgentStatusManager {
                 INLONG_AGENT_SYSTEM,
                 INLONG_AGENT_STATUS,
                 AgentUtils.getCurrentTime(),
-                "", 30, TimeUnit.SECONDS);
+                "");
         if (ret != SendResult.OK) {
             LOGGER.error("send status failed: ret {}", ret);
         }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
index abda6a2eab..774dee4247 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
@@ -34,7 +34,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
@@ -135,7 +134,7 @@ public class FileStaticManager {
                     INLONG_AGENT_SYSTEM,
                     INLONG_FILE_STATIC,
                     AgentUtils.getCurrentTime(),
-                    "", 30, TimeUnit.SECONDS);
+                    "");
             if (ret != SendResult.OK) {
                 LOGGER.error("send static failed: ret {}", ret);
             }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index d436a32636..ead9439114 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -201,6 +201,7 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
             
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
             
proxyClientConfig.setIoThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
             proxyClientConfig.setProtocolType(ProtocolType.TCP);
+            proxyClientConfig.setRequestTimeoutMs(30000L);
             ThreadFactory SHARED_FACTORY = new 
DefaultThreadFactory("agent-sender-manager-heartbeat",
                     Thread.currentThread().isDaemon());
             sender = new DefaultMessageSender(proxyClientConfig, 
SHARED_FACTORY);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
index 40d71d2859..26f8d131b4 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.sdk.dataproxy;
 
-import java.util.concurrent.TimeUnit;
-
 public class ConfigConstants {
 
     public static final String PROXY_SDK_VERSION = "1.2.11";
@@ -49,12 +47,13 @@ public class ConfigConstants {
 
     public static final int MAX_LINE_CNT = 30;
 
-    /* Default connection,connect timeout in milliseconds. */
-    public static final long DEFAULT_CONNECT_TIMEOUT_MILLIS =
-            TimeUnit.MILLISECONDS.convert(40, TimeUnit.SECONDS);
-
-    public static final long DEFAULT_REQUEST_TIMEOUT_MILLIS =
-            TimeUnit.MILLISECONDS.convert(40, TimeUnit.SECONDS);
+    // connection timeout in milliseconds
+    public static final long VAL_DEF_CONNECT_TIMEOUT_MS = 20000L;
+    public static final long VAL_MIN_CONNECT_TIMEOUT_MS = 1L;
+    public static final long VAL_DEF_CONNECT_CLOSE_DELAY_MS = 500L;
+    // request timeout in milliseconds
+    public static final long VAL_DEF_REQUEST_TIMEOUT_MS = 10000L;
+    public static final long VAL_MIN_REQUEST_TIMEOUT_MS = 1L;
 
     public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216;
     public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 9e2c8c06b5..153c43b8db 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -230,9 +230,8 @@ public class DefaultMessageSender implements MessageSender {
         return sendIndexResult;
     }
 
-    public SendResult sendMessage(byte[] body, String groupId, String 
streamId, long dt, String msgUUID,
-            long timeout, TimeUnit timeUnit) {
-        return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, 
timeUnit, false);
+    public SendResult sendMessage(byte[] body, String groupId, String 
streamId, long dt, String msgUUID) {
+        return sendMessage(body, groupId, streamId, dt, msgUUID, false);
     }
 
     /**
@@ -243,13 +242,11 @@ public class DefaultMessageSender implements 
MessageSender {
      * @param streamId streamId
      * @param dt data report timestamp
      * @param msgUUID msg uuid
-     * @param timeout
-     * @param timeUnit
      * @param isProxySend true: dataproxy doesn't return response message 
until data is sent to MQ
      * @return SendResult.OK means success
      */
-    public SendResult sendMessage(byte[] body, String groupId, String 
streamId, long dt, String msgUUID,
-            long timeout, TimeUnit timeUnit, boolean isProxySend) {
+    public SendResult sendMessage(byte[] body, String groupId,
+            String streamId, long dt, String msgUUID, boolean isProxySend) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES;
@@ -271,8 +268,7 @@ public class DefaultMessageSender implements MessageSender {
                     new EncodeObject(Collections.singletonList(body), msgtype, 
isCompressEnd, isReport,
                             isGroupIdTransfer, dt / 1000, 
idGenerator.getNextInt(), groupId, streamId, proxySend);
             encodeObject.setSupportLF(isSupportLF);
-            Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(encodeObject, msgUUID,
-                    timeout, timeUnit);
+            Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(encodeObject, msgUUID);
             return attemptSendMessage(sendOperation);
         } else if (msgtype == 3 || msgtype == 5) {
             if (isProxySend) {
@@ -286,13 +282,13 @@ public class DefaultMessageSender implements 
MessageSender {
                         "groupId=" + groupId + "&streamId=" + streamId + 
"&dt=" + finalDt + "&cp=snappy"
                                 + finalProxySend,
                         idGenerator.getNextId(), this.getMsgtype(),
-                        true, groupId), msgUUID, timeout, timeUnit);
+                        true, groupId), msgUUID);
             } else {
                 sendOperation = (sender) -> sender.syncSendMessage(new 
EncodeObject(Collections.singletonList(body),
                         "groupId=" + groupId + "&streamId=" + streamId + 
"&dt=" + finalDt
                                 + finalProxySend,
                         idGenerator.getNextId(), this.getMsgtype(),
-                        false, groupId), msgUUID, timeout, timeUnit);
+                        false, groupId), msgUUID);
 
             }
             return attemptSendMessage(sendOperation);
@@ -302,8 +298,8 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public SendResult sendMessage(byte[] body, String groupId, String 
streamId, long dt, String msgUUID,
-            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) 
{
-        return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, 
timeUnit, extraAttrMap, false);
+            Map<String, String> extraAttrMap) {
+        return sendMessage(body, groupId, streamId, dt, msgUUID, extraAttrMap, 
false);
     }
 
     /**
@@ -314,14 +310,12 @@ public class DefaultMessageSender implements 
MessageSender {
      * @param streamId streamId
      * @param dt data report timestamp
      * @param msgUUID msg uuid
-     * @param timeout
-     * @param timeUnit
      * @param extraAttrMap extra attributes
      * @param isProxySend true: dataproxy doesn't return response message 
until data is sent to MQ
      * @return SendResult.OK means success
      */
     public SendResult sendMessage(byte[] body, String groupId, String 
streamId, long dt, String msgUUID,
-            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap, 
boolean isProxySend) {
+            Map<String, String> extraAttrMap, boolean isProxySend) {
 
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || 
!ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -345,8 +339,7 @@ public class DefaultMessageSender implements MessageSender {
                             isGroupIdTransfer, dt / 1000,
                             idGenerator.getNextInt(), groupId, streamId, 
attrs.toString());
             encodeObject.setSupportLF(isSupportLF);
-            Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(encodeObject, msgUUID,
-                    timeout, timeUnit);
+            Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(encodeObject, msgUUID);
             return attemptSendMessage(sendOperation);
         } else if (msgtype == 3 || msgtype == 5) {
             
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
@@ -355,13 +348,14 @@ public class DefaultMessageSender implements 
MessageSender {
                 Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(
                         new EncodeObject(Collections.singletonList(body),
                                 attrs.toString(), idGenerator.getNextId(), 
this.getMsgtype(), true, groupId),
-                        msgUUID, timeout, timeUnit);
+                        msgUUID);
                 return attemptSendMessage(sendOperation);
             } else {
                 Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(
                         new EncodeObject(Collections.singletonList(body),
-                                attrs.toString(), idGenerator.getNextId(), 
this.getMsgtype(), false, groupId),
-                        msgUUID, timeout, timeUnit);
+                                attrs.toString(), idGenerator.getNextId(),
+                                this.getMsgtype(), false, groupId),
+                        msgUUID);
                 return attemptSendMessage(sendOperation);
             }
         }
@@ -369,9 +363,8 @@ public class DefaultMessageSender implements MessageSender {
 
     }
 
-    public SendResult sendMessage(List<byte[]> bodyList, String groupId, 
String streamId, long dt, String msgUUID,
-            long timeout, TimeUnit timeUnit) {
-        return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout, 
timeUnit, false);
+    public SendResult sendMessage(List<byte[]> bodyList, String groupId, 
String streamId, long dt, String msgUUID) {
+        return sendMessage(bodyList, groupId, streamId, dt, msgUUID, false);
     }
 
     /**
@@ -382,13 +375,11 @@ public class DefaultMessageSender implements 
MessageSender {
      * @param streamId streamId
      * @param dt data report timestamp
      * @param msgUUID msg uuid
-     * @param timeout
-     * @param timeUnit
      * @param isProxySend true: dataproxy doesn't return response message 
until data is sent to MQ
      * @return SendResult.OK means success
      */
-    public SendResult sendMessage(List<byte[]> bodyList, String groupId, 
String streamId, long dt, String msgUUID,
-            long timeout, TimeUnit timeUnit, boolean isProxySend) {
+    public SendResult sendMessage(List<byte[]> bodyList,
+            String groupId, String streamId, long dt, String msgUUID, boolean 
isProxySend) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES;
@@ -408,8 +399,7 @@ public class DefaultMessageSender implements MessageSender {
                     isGroupIdTransfer, dt / 1000,
                     idGenerator.getNextInt(), groupId, streamId, proxySend);
             encodeObject.setSupportLF(isSupportLF);
-            Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(encodeObject, msgUUID,
-                    timeout, timeUnit);
+            Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(encodeObject, msgUUID);
             return attemptSendMessage(sendOperation);
         } else if (msgtype == 3 || msgtype == 5) {
             if (isProxySend) {
@@ -422,14 +412,12 @@ public class DefaultMessageSender implements 
MessageSender {
                 sendOperation = (sender) -> sender.syncSendMessage(new 
EncodeObject(bodyList,
                         "groupId=" + groupId + "&streamId=" + streamId + 
"&dt=" + finalDt + "&cp=snappy" + "&cnt="
                                 + bodyList.size() + finalProxySend,
-                        idGenerator.getNextId(), this.getMsgtype(),
-                        true, groupId), msgUUID, timeout, timeUnit);
+                        idGenerator.getNextId(), this.getMsgtype(), true, 
groupId), msgUUID);
             } else {
                 sendOperation = (sender) -> sender.syncSendMessage(new 
EncodeObject(bodyList,
                         "groupId=" + groupId + "&streamId=" + streamId + 
"&dt=" + finalDt + "&cnt=" + bodyList.size()
                                 + finalProxySend,
-                        idGenerator.getNextId(), this.getMsgtype(),
-                        false, groupId), msgUUID, timeout, timeUnit);
+                        idGenerator.getNextId(), this.getMsgtype(), false, 
groupId), msgUUID);
             }
             return attemptSendMessage(sendOperation);
         }
@@ -437,8 +425,32 @@ public class DefaultMessageSender implements MessageSender 
{
     }
 
     public SendResult sendMessage(List<byte[]> bodyList, String groupId, 
String streamId, long dt,
-            String msgUUID, long timeout, TimeUnit timeUnit, Map<String, 
String> extraAttrMap) {
-        return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout, 
timeUnit, extraAttrMap, false);
+            String msgUUID, Map<String, String> extraAttrMap) {
+        return sendMessage(bodyList, groupId, streamId, dt, msgUUID, 
extraAttrMap, false);
+    }
+
+    @Override
+    public void asyncSendMessage(SendMessageCallback callback, byte[] body, 
String groupId, String streamId, long dt,
+            String msgUUID, Map<String, String> extraAttrMap) throws 
ProxysdkException {
+
+    }
+
+    @Override
+    public void asyncSendMessage(SendMessageCallback callback, byte[] body, 
String groupId, String streamId, long dt,
+            String msgUUID) throws ProxysdkException {
+
+    }
+
+    @Override
+    public void asyncSendMessage(SendMessageCallback callback, List<byte[]> 
bodyList, String groupId, String streamId,
+            long dt, String msgUUID) throws ProxysdkException {
+
+    }
+
+    @Override
+    public void asyncSendMessage(SendMessageCallback callback, List<byte[]> 
bodyList, String groupId, String streamId,
+            long dt, String msgUUID, Map<String, String> extraAttrMap) throws 
ProxysdkException {
+
     }
 
     /**
@@ -449,14 +461,12 @@ public class DefaultMessageSender implements 
MessageSender {
      * @param streamId streamId
      * @param dt data report timestamp
      * @param msgUUID msg uuid
-     * @param timeout
-     * @param timeUnit
      * @param extraAttrMap extra attributes
      * @param isProxySend true: dataproxy doesn't return response message 
until data is sent to MQ
      * @return SendResult.OK means success
      */
     public SendResult sendMessage(List<byte[]> bodyList, String groupId, 
String streamId, long dt,
-            String msgUUID, long timeout, TimeUnit timeUnit, Map<String, 
String> extraAttrMap, boolean isProxySend) {
+            String msgUUID, Map<String, String> extraAttrMap, boolean 
isProxySend) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || 
!ProxyUtils.isAttrKeysValid(
                 extraAttrMap)) {
@@ -476,8 +486,7 @@ public class DefaultMessageSender implements MessageSender {
                     isGroupIdTransfer, dt / 1000,
                     idGenerator.getNextInt(), groupId, streamId, 
attrs.toString());
             encodeObject.setSupportLF(isSupportLF);
-            Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(encodeObject, msgUUID,
-                    timeout, timeUnit);
+            Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(encodeObject, msgUUID);
             return attemptSendMessage(sendOperation);
         } else if (msgtype == 3 || msgtype == 5) {
             
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
@@ -486,13 +495,12 @@ public class DefaultMessageSender implements 
MessageSender {
                 attrs.append("&cp=snappy");
                 Function<Sender, SendResult> sendOperation =
                         (sender) -> sender.syncSendMessage(new 
EncodeObject(bodyList, attrs.toString(),
-                                idGenerator.getNextId(), this.getMsgtype(), 
true, groupId), msgUUID, timeout, timeUnit);
+                                idGenerator.getNextId(), this.getMsgtype(), 
true, groupId), msgUUID);
                 return attemptSendMessage(sendOperation);
             } else {
                 Function<Sender, SendResult> sendOperation =
                         (sender) -> sender.syncSendMessage(new 
EncodeObject(bodyList, attrs.toString(),
-                                idGenerator.getNextId(), this.getMsgtype(), 
false, groupId), msgUUID, timeout,
-                                timeUnit);
+                                idGenerator.getNextId(), this.getMsgtype(), 
false, groupId), msgUUID);
                 return attemptSendMessage(sendOperation);
             }
         }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
index 1b18096229..e980e65974 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
@@ -23,7 +23,6 @@ import 
org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 public interface MessageSender {
 
@@ -33,8 +32,7 @@ public interface MessageSender {
      * @param body The data will be sent
      *             
      */
-    SendResult sendMessage(byte[] body, String groupId, String streamId, long 
dt, String msgUUID,
-            long timeout, TimeUnit timeUnit);
+    SendResult sendMessage(byte[] body, String groupId, String streamId, long 
dt, String msgUUID);
 
     /**
      * This method provides a synchronized  function which you want to send 
data without packing
@@ -45,8 +43,8 @@ public interface MessageSender {
      * @param extraAttrMap The attributes you want to add,
      *                     and each element of extraAttrMap contains a pair 
like attrKey,attrValue
      */
-    SendResult sendMessage(byte[] body, String groupId, String streamId, long 
dt, String msgUUID,
-            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap);
+    SendResult sendMessage(byte[] body, String groupId,
+            String streamId, long dt, String msgUUID, Map<String, String> 
extraAttrMap);
 
     /**
      * This method provides a synchronized  function which you want to send 
data  with packing
@@ -54,8 +52,7 @@ public interface MessageSender {
      *
      * @param bodyList The data will be sent,which is a collection consisting 
of byte arrays
      */
-    SendResult sendMessage(List<byte[]> bodyList, String groupId, String 
streamId, long dt, String msgUUID,
-            long timeout, TimeUnit timeUnit);
+    SendResult sendMessage(List<byte[]> bodyList, String groupId, String 
streamId, long dt, String msgUUID);
 
     /**
      * This method provides a synchronized  function which you want to send 
data with packing
@@ -66,8 +63,8 @@ public interface MessageSender {
      * @param extraAttrMap The attributes you want to add,
      *                     and each element of extraAttrMap contains a pair 
like attrKey,attrValue
      */
-    SendResult sendMessage(List<byte[]> bodyList, String groupId, String 
streamId, long dt, String msgUUID,
-            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap);
+    SendResult sendMessage(List<byte[]> bodyList, String groupId,
+            String streamId, long dt, String msgUUID, Map<String, String> 
extraAttrMap);
 
     /**
      * This method provides a synchronized  function which you want to send 
data without packing
@@ -80,7 +77,6 @@ public interface MessageSender {
      */
     void asyncSendMessage(SendMessageCallback callback,
             byte[] body, String groupId, String streamId, long dt, String 
msgUUID,
-            long timeout, TimeUnit timeUnit,
             Map<String, String> extraAttrMap) throws ProxysdkException;
 
     /**
@@ -91,8 +87,7 @@ public interface MessageSender {
      * @param body     The data will be sent
      */
     void asyncSendMessage(SendMessageCallback callback,
-            byte[] body, String groupId, String streamId, long dt, String 
msgUUID,
-            long timeout, TimeUnit timeUnit) throws ProxysdkException;
+            byte[] body, String groupId, String streamId, long dt, String 
msgUUID) throws ProxysdkException;
 
     /**
      * This method provides an asynchronized  function which you want to send 
data  with packing
@@ -101,8 +96,7 @@ public interface MessageSender {
      * @param bodyList The data will be sent,which is a collection consisting 
of byte arrays
      */
     void asyncSendMessage(SendMessageCallback callback,
-            List<byte[]> bodyList, String groupId, String streamId, long dt, 
String msgUUID,
-            long timeout, TimeUnit timeUnit) throws ProxysdkException;
+            List<byte[]> bodyList, String groupId, String streamId, long dt, 
String msgUUID) throws ProxysdkException;
 
     /**
      * This method provides an asynchronized  function which you want to send 
data with packing
@@ -115,7 +109,6 @@ public interface MessageSender {
      */
     void asyncSendMessage(SendMessageCallback callback,
             List<byte[]> bodyList, String groupId, String streamId, long dt, 
String msgUUID,
-            long timeout, TimeUnit timeUnit,
             Map<String, String> extraAttrMap) throws ProxysdkException;
 
     /**
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index c3253805cb..3338d866c2 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -59,14 +59,18 @@ public class ProxyClientConfig {
     private MetricConfig metricConfig = new MetricConfig();
 
     private int managerConnectionTimeout = 10000;
+    // http socket timeout in milliseconds
+    private int managerSocketTimeout = 30 * 1000;
+
     private boolean readProxyIPFromLocal = false;
-    /**
-     * Default connection, handshake, and initial request timeout in 
milliseconds.
-     */
-    private long connectTimeoutMillis;
-    private long requestTimeoutMillis;
 
-    private int managerSocketTimeout = 30 * 1000;
+    // connect timeout in milliseconds
+    private long connectTimeoutMs = ConfigConstants.VAL_DEF_CONNECT_TIMEOUT_MS;
+    // request timeout in milliseconds
+    private long requestTimeoutMs = ConfigConstants.VAL_DEF_REQUEST_TIMEOUT_MS;
+    // connect close wait period in milliseconds
+    private long conCloseWaitPeriodMs =
+            ConfigConstants.VAL_DEF_REQUEST_TIMEOUT_MS + 
ConfigConstants.VAL_DEF_CONNECT_CLOSE_DELAY_MS;
 
     // configuration for http client
     // whether discard old metric when cache is full.
@@ -117,8 +121,6 @@ public class ProxyClientConfig {
         this.proxyUpdateIntervalMinutes = 
ConfigConstants.PROXY_UPDATE_INTERVAL_MINUTES;
         this.proxyHttpUpdateIntervalMinutes = 
ConfigConstants.PROXY_HTTP_UPDATE_INTERVAL_MINUTES;
         this.proxyUpdateMaxRetry = ConfigConstants.PROXY_UPDATE_MAX_RETRY;
-        this.connectTimeoutMillis = 
ConfigConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS;
-        
this.setRequestTimeoutMillis(ConfigConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS);
         this.authSecretId = authSecretId;
         this.authSecretKey = authSecretKey;
         this.loadBalance = loadBalance;
@@ -148,8 +150,6 @@ public class ProxyClientConfig {
         this.proxyUpdateIntervalMinutes = 
ConfigConstants.PROXY_UPDATE_INTERVAL_MINUTES;
         this.proxyHttpUpdateIntervalMinutes = 
ConfigConstants.PROXY_HTTP_UPDATE_INTERVAL_MINUTES;
         this.proxyUpdateMaxRetry = ConfigConstants.PROXY_UPDATE_MAX_RETRY;
-        this.connectTimeoutMillis = 
ConfigConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS;
-        
this.setRequestTimeoutMillis(ConfigConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS);
         this.authSecretId = authSecretId;
         this.authSecretKey = authSecretKey;
         this.loadBalance = loadBalance;
@@ -299,20 +299,36 @@ public class ProxyClientConfig {
         this.proxyUpdateMaxRetry = proxyUpdateMaxRetry;
     }
 
-    public long getConnectTimeoutMillis() {
-        return connectTimeoutMillis;
+    public long getConnectTimeoutMs() {
+        return connectTimeoutMs;
+    }
+
+    public void setConnectTimeoutMs(long connectTimeoutMs) {
+        if (connectTimeoutMs >= ConfigConstants.VAL_MIN_CONNECT_TIMEOUT_MS) {
+            this.connectTimeoutMs = connectTimeoutMs;
+        }
     }
 
-    public void setConnectTimeoutMillis(long connectTimeoutMillis) {
-        this.connectTimeoutMillis = connectTimeoutMillis;
+    public long getRequestTimeoutMs() {
+        return requestTimeoutMs;
     }
 
-    public long getRequestTimeoutMillis() {
-        return requestTimeoutMillis;
+    public void setRequestTimeoutMs(long requestTimeoutMs) {
+        if (requestTimeoutMs >= ConfigConstants.VAL_MIN_REQUEST_TIMEOUT_MS) {
+            this.requestTimeoutMs = requestTimeoutMs;
+            this.conCloseWaitPeriodMs =
+                    this.requestTimeoutMs + 
ConfigConstants.VAL_DEF_CONNECT_CLOSE_DELAY_MS;
+        }
     }
 
-    public void setRequestTimeoutMillis(long requestTimeoutMillis) {
-        this.requestTimeoutMillis = requestTimeoutMillis;
+    public long getConCloseWaitPeriodMs() {
+        return conCloseWaitPeriodMs;
+    }
+
+    public void setConCloseWaitPeriodMs(long conCloseWaitPeriodMs) {
+        if (conCloseWaitPeriodMs >= 0) {
+            this.conCloseWaitPeriodMs = conCloseWaitPeriodMs;
+        }
     }
 
     public String getRsaPubKeyUrl() {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
index 4c51696be0..4658bb1a05 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
@@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedReader;
 import java.io.FileReader;
 import java.io.IOException;
-import java.util.concurrent.TimeUnit;
 
 public class SendMsgThread extends Thread {
 
@@ -49,8 +48,7 @@ public class SendMsgThread extends Thread {
 
                 long startTime = System.currentTimeMillis();
                 SendResult result = 
messageSender.sendMessage("hhhh".getBytes("utf8"),
-                        "b_test", "n_test1", 0, 
String.valueOf(System.currentTimeMillis()), 1,
-                        TimeUnit.MILLISECONDS);
+                        "b_test", "n_test1", 0, 
String.valueOf(System.currentTimeMillis()));
                 long endTime = System.currentTimeMillis();
                 if (result == result.OK) {
                     logger.info("this msg is ok time {}", endTime - startTime);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
index 55b6cf6d99..eda90bdbca 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.UnsupportedEncodingException;
-import java.util.concurrent.TimeUnit;
 
 public class TcpClientExample {
 
@@ -75,6 +74,7 @@ public class TcpClientExample {
             }
             dataProxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
             dataProxyConfig.setProtocolType(ProtocolType.TCP);
+            dataProxyConfig.setRequestTimeoutMs(20000L);
             messageSender = 
DefaultMessageSender.generateSenderByClusterId(dataProxyConfig);
             messageSender.setMsgtype(msgType);
         } catch (Exception e) {
@@ -88,7 +88,7 @@ public class TcpClientExample {
         SendResult result = null;
         try {
             result = sender.sendMessage(messageBody.getBytes("utf8"), 
inlongGroupId, inlongStreamId,
-                    0, String.valueOf(dt), 20, TimeUnit.SECONDS);
+                    0, String.valueOf(dt));
 
         } catch (UnsupportedEncodingException e) {
             e.printStackTrace();
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
index cd38e5f895..f529730384 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
@@ -89,7 +89,7 @@ public class NettyClient {
 
         try {
             // Wait until the connection is built.
-            awaitLatch.await(configure.getConnectTimeoutMillis(),
+            awaitLatch.await(configure.getConnectTimeoutMs(),
                     TimeUnit.MILLISECONDS);
         } catch (Exception e) {
             logger.error("create connect exception! {}", e.getMessage());
@@ -123,8 +123,7 @@ public class NettyClient {
                     }
                 });
                 // Wait until the connection is close.
-                awaitLatch.await(configure.getRequestTimeoutMillis(),
-                        TimeUnit.MILLISECONDS);
+                awaitLatch.await(configure.getConCloseWaitPeriodMs(), 
TimeUnit.MILLISECONDS);
                 // Return if close this connection fail.
                 if (!future.isSuccess()) {
                     ret = false;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 50b3105b56..01ac56a53b 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -173,8 +173,9 @@ public class Sender {
         currentBufferSize.decrementAndGet();
     }
 
-    private SendResult syncSendInternalMessage(NettyClient client, 
EncodeObject encodeObject, String msgUUID,
-            long timeout, TimeUnit timeUnit) throws ExecutionException, 
InterruptedException, TimeoutException {
+    private SendResult syncSendInternalMessage(NettyClient client,
+            EncodeObject encodeObject, String msgUUID)
+            throws ExecutionException, InterruptedException, TimeoutException {
         if (client == null) {
             return SendResult.NO_CONNECTION;
         }
@@ -204,11 +205,12 @@ public class Sender {
             encodeObject.setEncryptEntry(false, null, null);
         }
         encodeObject.setMsgUUID(msgUUID);
-        SyncMessageCallable callable = new SyncMessageCallable(client, 
encodeObject, timeout, timeUnit);
+        SyncMessageCallable callable = new SyncMessageCallable(client, 
encodeObject,
+                configure.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
         syncCallables.put(encodeObject.getMessageId(), callable);
 
         Future<SendResult> future = threadPool.submit(callable);
-        return future.get(timeout, timeUnit);
+        return future.get(configure.getRequestTimeoutMs(), 
TimeUnit.MILLISECONDS);
     }
 
     /**
@@ -217,11 +219,9 @@ public class Sender {
      *
      * @param encodeObject
      * @param msgUUID
-     * @param timeout
-     * @param timeUnit
      * @return
      */
-    public SendResult syncSendMessage(EncodeObject encodeObject, String 
msgUUID, long timeout, TimeUnit timeUnit) {
+    public SendResult syncSendMessage(EncodeObject encodeObject, String 
msgUUID) {
         if (configure.isEnableMetric()) {
             metricWorker.recordNumByKey(encodeObject.getMessageId(), 
encodeObject.getGroupId(),
                     encodeObject.getStreamId(), Utils.getLocalIp(), 
encodeObject.getDt(),
@@ -230,7 +230,7 @@ public class Sender {
         NettyClient client = clientMgr.getClient(clientMgr.getLoadBalance(), 
encodeObject);
         SendResult message = null;
         try {
-            message = syncSendInternalMessage(client, encodeObject, msgUUID, 
timeout, timeUnit);
+            message = syncSendInternalMessage(client, encodeObject, msgUUID);
         } catch (InterruptedException e) {
             // TODO Auto-generated catch block
             LOGGER.error("send message error {} ", getExceptionStack(e));
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
index f0bd45d13e..fc584f5a5a 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
@@ -46,13 +46,16 @@ import java.util.concurrent.atomic.AtomicReference;
 public class PbProtocolMessageSender implements MessageSender, Configurable {
 
     public static final Logger LOG = 
LoggerFactory.getLogger(PbProtocolMessageSender.class);
-
+    private static final String KEY_REQUEST_TIMEOUT_MS = "request.timeout.ms";
+    private static final long VAL_DEF_REQUEST_TIMEOUT_MS = 20000L;
+    private static final long VAL_MIN_REQUEST_TIMEOUT_MS = 1L;
     private String name;
     private String localIp;
     private LifecycleState lifecycleState;
     private Context context;
     private BufferQueueChannel channel;
     private ProxySdkSink sink;
+    private long requestTimeoutMs = VAL_DEF_REQUEST_TIMEOUT_MS;
 
     /**
      * Constructor
@@ -101,6 +104,10 @@ public class PbProtocolMessageSender implements 
MessageSender, Configurable {
         this.sink.setName(name + "-sink");
         this.sink.configure(context);
         this.sink.setChannel(channel);
+        long tmpTimeoutMs = context.getLong(KEY_REQUEST_TIMEOUT_MS, 
VAL_DEF_REQUEST_TIMEOUT_MS);
+        if (tmpTimeoutMs >= VAL_MIN_REQUEST_TIMEOUT_MS) {
+            this.requestTimeoutMs = tmpTimeoutMs;
+        }
     }
 
     /**
@@ -179,14 +186,11 @@ public class PbProtocolMessageSender implements 
MessageSender, Configurable {
      * @param  streamId
      * @param  dt
      * @param  msgUUID
-     * @param  timeout
-     * @param  timeUnit
      * @return          SendResult
      */
     @Override
-    public SendResult sendMessage(byte[] body, String groupId, String 
streamId, long dt, String msgUUID, long timeout,
-            TimeUnit timeUnit) {
-        return this.sendMessage(body, groupId, streamId, dt, msgUUID, timeout, 
timeUnit, null);
+    public SendResult sendMessage(byte[] body, String groupId, String 
streamId, long dt, String msgUUID) {
+        return this.sendMessage(body, groupId, streamId, dt, msgUUID, null);
     }
 
     /**
@@ -197,14 +201,12 @@ public class PbProtocolMessageSender implements 
MessageSender, Configurable {
      * @param  streamId
      * @param  dt
      * @param  msgUUID
-     * @param  timeout
-     * @param  timeUnit
      * @param  extraAttrMap
      * @return              SendResult
      */
     @Override
-    public SendResult sendMessage(byte[] body, String groupId, String 
streamId, long dt, String msgUUID, long timeout,
-            TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+    public SendResult sendMessage(byte[] body, String groupId,
+            String streamId, long dt, String msgUUID, Map<String, String> 
extraAttrMap) {
         // prepare
         SdkEvent sdkEvent = new SdkEvent();
         sdkEvent.setInlongGroupId(groupId);
@@ -237,7 +239,7 @@ public class PbProtocolMessageSender implements 
MessageSender, Configurable {
         this.put(profile);
         // wait
         try {
-            boolean success = latch.await(timeout, timeUnit);
+            boolean success = latch.await(requestTimeoutMs, 
TimeUnit.MILLISECONDS);
             if (!success) {
                 refResult.set(SendResult.TIMEOUT);
             }
@@ -256,14 +258,11 @@ public class PbProtocolMessageSender implements 
MessageSender, Configurable {
      * @param  streamId
      * @param  dt
      * @param  msgUUID
-     * @param  timeout
-     * @param  timeUnit
      * @return          SendResult
      */
     @Override
-    public SendResult sendMessage(List<byte[]> bodyList, String groupId, 
String streamId, long dt, String msgUUID,
-            long timeout, TimeUnit timeUnit) {
-        return this.sendMessage(bodyList, groupId, streamId, dt, msgUUID, 
timeout, timeUnit, null);
+    public SendResult sendMessage(List<byte[]> bodyList, String groupId, 
String streamId, long dt, String msgUUID) {
+        return this.sendMessage(bodyList, groupId, streamId, dt, msgUUID, 
null);
     }
 
     /**
@@ -274,14 +273,12 @@ public class PbProtocolMessageSender implements 
MessageSender, Configurable {
      * @param  streamId
      * @param  dt
      * @param  msgUUID
-     * @param  timeout
-     * @param  timeUnit
      * @param  extraAttrMap
      * @return              SendResult
      */
     @Override
-    public SendResult sendMessage(List<byte[]> bodyList, String groupId, 
String streamId, long dt, String msgUUID,
-            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) 
{
+    public SendResult sendMessage(List<byte[]> bodyList,
+            String groupId, String streamId, long dt, String msgUUID, 
Map<String, String> extraAttrMap) {
         final AtomicReference<SendResult> refResult = new AtomicReference<>();
         CountDownLatch latch = new CountDownLatch(bodyList.size());
         // prepare
@@ -318,7 +315,7 @@ public class PbProtocolMessageSender implements 
MessageSender, Configurable {
         this.putAll(events);
         // wait
         try {
-            boolean success = latch.await(timeout, timeUnit);
+            boolean success = latch.await(requestTimeoutMs, 
TimeUnit.MILLISECONDS);
             if (!success) {
                 refResult.set(SendResult.TIMEOUT);
             }
@@ -338,14 +335,12 @@ public class PbProtocolMessageSender implements 
MessageSender, Configurable {
      * @param  streamId
      * @param  dt
      * @param  msgUUID
-     * @param  timeout
-     * @param  timeUnit
      * @param  extraAttrMap
      * @throws ProxysdkException
      */
     @Override
     public void asyncSendMessage(SendMessageCallback callback, byte[] body, 
String groupId, String streamId, long dt,
-            String msgUUID, long timeout, TimeUnit timeUnit, Map<String, 
String> extraAttrMap)
+            String msgUUID, Map<String, String> extraAttrMap)
             throws ProxysdkException {
         SdkEvent sdkEvent = new SdkEvent();
         sdkEvent.setInlongGroupId(groupId);
@@ -369,14 +364,12 @@ public class PbProtocolMessageSender implements 
MessageSender, Configurable {
      * @param  streamId
      * @param  dt
      * @param  msgUUID
-     * @param  timeout
-     * @param  timeUnit
      * @throws ProxysdkException
      */
     @Override
-    public void asyncSendMessage(SendMessageCallback callback, byte[] body, 
String groupId, String streamId, long dt,
-            String msgUUID, long timeout, TimeUnit timeUnit) throws 
ProxysdkException {
-        this.asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, 
timeout, timeUnit, null);
+    public void asyncSendMessage(SendMessageCallback callback, byte[] body,
+            String groupId, String streamId, long dt, String msgUUID) throws 
ProxysdkException {
+        this.asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, 
null);
     }
 
     /**
@@ -388,14 +381,12 @@ public class PbProtocolMessageSender implements 
MessageSender, Configurable {
      * @param  streamId
      * @param  dt
      * @param  msgUUID
-     * @param  timeout
-     * @param  timeUnit
      * @throws ProxysdkException
      */
     @Override
-    public void asyncSendMessage(SendMessageCallback callback, List<byte[]> 
bodyList, String groupId, String streamId,
-            long dt, String msgUUID, long timeout, TimeUnit timeUnit) throws 
ProxysdkException {
-        this.asyncSendMessage(callback, bodyList, groupId, streamId, dt, 
msgUUID, timeout, timeUnit, null);
+    public void asyncSendMessage(SendMessageCallback callback, List<byte[]> 
bodyList,
+            String groupId, String streamId, long dt, String msgUUID) throws 
ProxysdkException {
+        this.asyncSendMessage(callback, bodyList, groupId, streamId, dt, 
msgUUID, null);
     }
 
     /**
@@ -407,15 +398,13 @@ public class PbProtocolMessageSender implements 
MessageSender, Configurable {
      * @param  streamId
      * @param  dt
      * @param  msgUUID
-     * @param  timeout
-     * @param  timeUnit
      * @param  extraAttrMap
      * @throws ProxysdkException
      */
     @Override
-    public void asyncSendMessage(SendMessageCallback callback, List<byte[]> 
bodyList, String groupId, String streamId,
-            long dt, String msgUUID, long timeout, TimeUnit timeUnit, 
Map<String, String> extraAttrMap)
-            throws ProxysdkException {
+    public void asyncSendMessage(SendMessageCallback callback,
+            List<byte[]> bodyList, String groupId, String streamId, long dt, 
String msgUUID,
+            Map<String, String> extraAttrMap) throws ProxysdkException {
         List<CallbackProfile> events = new ArrayList<>(bodyList.size());
         for (byte[] body : bodyList) {
             SdkEvent sdkEvent = new SdkEvent();
@@ -445,8 +434,7 @@ public class PbProtocolMessageSender implements 
MessageSender, Configurable {
     @Override
     public void asyncSendMessage(String inlongGroupId, String inlongStreamId, 
byte[] body, SendMessageCallback callback)
             throws ProxysdkException {
-        this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, 
System.currentTimeMillis(), null, 0L, null,
-                null);
+        this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, 
System.currentTimeMillis(), null, null);
     }
 
     /**
@@ -461,8 +449,8 @@ public class PbProtocolMessageSender implements 
MessageSender, Configurable {
     @Override
     public void asyncSendMessage(String inlongGroupId, String inlongStreamId, 
List<byte[]> bodyList,
             SendMessageCallback callback) throws ProxysdkException {
-        this.asyncSendMessage(callback, bodyList, inlongGroupId, 
inlongStreamId, System.currentTimeMillis(), null, 0L,
-                null, null);
+        this.asyncSendMessage(callback, bodyList, inlongGroupId, 
inlongStreamId, System.currentTimeMillis(), null,
+                null);
     }
 
 }


Reply via email to