This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 99dec0578b [INLONG-11469][SDK] Optimize the single message processing 
logic in the EncodeObject class (#11470)
99dec0578b is described below

commit 99dec0578b1168d0eb61465cd6f473659f4f3ba1
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Fri Nov 8 14:53:25 2024 +0800

    [INLONG-11469][SDK] Optimize the single message processing logic in the 
EncodeObject class (#11470)
    
    Co-authored-by: gosonzhang <gosonzh...@tencent.com>
---
 .../inlong/common/msg/AttributeConstants.java      |   1 +
 .../inlong/sdk/dataproxy/DefaultMessageSender.java |  68 ++++-----
 .../apache/inlong/sdk/dataproxy/LoadBalance.java   |   6 +-
 .../apache/inlong/sdk/dataproxy/MessageSender.java |  44 ++----
 .../inlong/sdk/dataproxy/codec/EncodeObject.java   |  71 +---------
 .../sdk/dataproxy/codec/ProtocolDecoder.java       |   9 +-
 .../sdk/dataproxy/codec/ProtocolEncoder.java       | 154 ++++++++-------------
 .../sdk/dataproxy/config/EncryptConfigEntry.java   |   3 +-
 .../sdk/dataproxy/example/UdpClientExample.java    |  46 +++---
 .../inlong/sdk/dataproxy/network/ClientMgr.java    |   3 +-
 .../sdk/dataproxy/pb/PbProtocolMessageSender.java  |  34 -----
 .../sdk/dataproxy/threads/MetricWorkerThread.java  |   5 +-
 .../inlong/sdk/dataproxy/utils/LogCounter.java     |  47 +++++++
 13 files changed, 182 insertions(+), 309 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
index ceaeef3054..3402fc6455 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
@@ -21,6 +21,7 @@ public interface AttributeConstants {
 
     String SEPARATOR = "&";
     String KEY_VALUE_SEPARATOR = "=";
+    String LINE_FEED_SEP = "\n";
 
     /**
      * group id
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 623e84d8c0..9e2c8c06b5 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -36,6 +36,7 @@ import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -229,15 +230,6 @@ public class DefaultMessageSender implements MessageSender 
{
         return sendIndexResult;
     }
 
-    @Deprecated
-    public SendResult sendMessage(byte[] body, String attributes, String 
msgUUID,
-            long timeout, TimeUnit timeUnit) {
-        Function<Sender, SendResult> sendOperation =
-                (sender) -> sender.syncSendMessage(new EncodeObject(body, 
attributes, idGenerator.getNextId()), msgUUID,
-                        timeout, timeUnit);
-        return attemptSendMessage(sendOperation);
-    }
-
     public SendResult sendMessage(byte[] body, String groupId, String 
streamId, long dt, String msgUUID,
             long timeout, TimeUnit timeUnit) {
         return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, 
timeUnit, false);
@@ -275,8 +267,9 @@ public class DefaultMessageSender implements MessageSender {
         boolean isCompressEnd = (isCompress && (body.length > cpsSize));
 
         if (msgtype == 7 || msgtype == 8) {
-            EncodeObject encodeObject = new EncodeObject(body, msgtype, 
isCompressEnd, isReport,
-                    isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), 
groupId, streamId, proxySend);
+            EncodeObject encodeObject =
+                    new EncodeObject(Collections.singletonList(body), msgtype, 
isCompressEnd, isReport,
+                            isGroupIdTransfer, dt / 1000, 
idGenerator.getNextInt(), groupId, streamId, proxySend);
             encodeObject.setSupportLF(isSupportLF);
             Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(encodeObject, msgUUID,
                     timeout, timeUnit);
@@ -289,13 +282,13 @@ public class DefaultMessageSender implements 
MessageSender {
             final long finalDt = dt;
             Function<Sender, SendResult> sendOperation;
             if (isCompressEnd) {
-                sendOperation = (sender) -> sender.syncSendMessage(new 
EncodeObject(body,
+                sendOperation = (sender) -> sender.syncSendMessage(new 
EncodeObject(Collections.singletonList(body),
                         "groupId=" + groupId + "&streamId=" + streamId + 
"&dt=" + finalDt + "&cp=snappy"
                                 + finalProxySend,
                         idGenerator.getNextId(), this.getMsgtype(),
                         true, groupId), msgUUID, timeout, timeUnit);
             } else {
-                sendOperation = (sender) -> sender.syncSendMessage(new 
EncodeObject(body,
+                sendOperation = (sender) -> sender.syncSendMessage(new 
EncodeObject(Collections.singletonList(body),
                         "groupId=" + groupId + "&streamId=" + streamId + 
"&dt=" + finalDt
                                 + finalProxySend,
                         idGenerator.getNextId(), this.getMsgtype(),
@@ -347,9 +340,10 @@ public class DefaultMessageSender implements MessageSender 
{
         boolean isCompressEnd = (isCompress && (body.length > cpsSize));
 
         if (msgtype == 7 || msgtype == 8) {
-            EncodeObject encodeObject = new EncodeObject(body, msgtype, 
isCompressEnd, isReport,
-                    isGroupIdTransfer, dt / 1000,
-                    idGenerator.getNextInt(), groupId, streamId, 
attrs.toString());
+            EncodeObject encodeObject =
+                    new EncodeObject(Collections.singletonList(body), msgtype, 
isCompressEnd, isReport,
+                            isGroupIdTransfer, dt / 1000,
+                            idGenerator.getNextInt(), groupId, streamId, 
attrs.toString());
             encodeObject.setSupportLF(isSupportLF);
             Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(encodeObject, msgUUID,
                     timeout, timeUnit);
@@ -358,13 +352,15 @@ public class DefaultMessageSender implements 
MessageSender {
             
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
             if (isCompressEnd) {
                 attrs.append("&cp=snappy");
-                Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(new EncodeObject(body,
-                        attrs.toString(), idGenerator.getNextId(), 
this.getMsgtype(), true, groupId),
+                Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(
+                        new EncodeObject(Collections.singletonList(body),
+                                attrs.toString(), idGenerator.getNextId(), 
this.getMsgtype(), true, groupId),
                         msgUUID, timeout, timeUnit);
                 return attemptSendMessage(sendOperation);
             } else {
-                Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(new EncodeObject(body,
-                        attrs.toString(), idGenerator.getNextId(), 
this.getMsgtype(), false, groupId),
+                Function<Sender, SendResult> sendOperation = (sender) -> 
sender.syncSendMessage(
+                        new EncodeObject(Collections.singletonList(body),
+                                attrs.toString(), idGenerator.getNextId(), 
this.getMsgtype(), false, groupId),
                         msgUUID, timeout, timeUnit);
                 return attemptSendMessage(sendOperation);
             }
@@ -503,13 +499,6 @@ public class DefaultMessageSender implements MessageSender 
{
         return null;
     }
 
-    @Deprecated
-    public void asyncSendMessage(SendMessageCallback callback, byte[] body, 
String attributes,
-            String msgUUID, long timeout, TimeUnit timeUnit) throws 
ProxysdkException {
-        sender.asyncSendMessage(new EncodeObject(body, attributes, 
idGenerator.getNextId()),
-                callback, msgUUID, timeout, timeUnit);
-    }
-
     public void asyncSendMessage(SendMessageCallback callback, byte[] body, 
String groupId, String streamId, long dt,
             String msgUUID, long timeout, TimeUnit timeUnit) throws 
ProxysdkException {
         asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, 
timeout, timeUnit, false);
@@ -546,9 +535,10 @@ public class DefaultMessageSender implements MessageSender 
{
         }
         boolean isCompressEnd = (isCompress && (body.length > cpsSize));
         if (msgtype == 7 || msgtype == 8) {
-            EncodeObject encodeObject = new EncodeObject(body, 
this.getMsgtype(), isCompressEnd, isReport,
-                    isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
-                    groupId, streamId, proxySend);
+            EncodeObject encodeObject =
+                    new EncodeObject(Collections.singletonList(body), 
this.getMsgtype(), isCompressEnd, isReport,
+                            isGroupIdTransfer, dt / 1000, 
idGenerator.getNextInt(),
+                            groupId, streamId, proxySend);
             encodeObject.setSupportLF(isSupportLF);
             sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, 
timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
@@ -556,13 +546,13 @@ public class DefaultMessageSender implements 
MessageSender {
                 if (isProxySend) {
                     proxySend = "&" + proxySend;
                 }
-                sender.asyncSendMessage(new EncodeObject(body, "groupId="
+                sender.asyncSendMessage(new 
EncodeObject(Collections.singletonList(body), "groupId="
                         + groupId + "&streamId=" + streamId + "&dt=" + dt + 
"&cp=snappy" + proxySend,
                         idGenerator.getNextId(), this.getMsgtype(), true, 
groupId),
                         callback, msgUUID, timeout, timeUnit);
             } else {
                 sender.asyncSendMessage(
-                        new EncodeObject(body, "groupId=" + groupId + 
"&streamId="
+                        new EncodeObject(Collections.singletonList(body), 
"groupId=" + groupId + "&streamId="
                                 + streamId + "&dt=" + dt + proxySend, 
idGenerator.getNextId(),
                                 this.getMsgtype(), false, groupId),
                         callback,
@@ -611,21 +601,23 @@ public class DefaultMessageSender implements 
MessageSender {
 
         boolean isCompressEnd = (isCompress && (body.length > cpsSize));
         if (msgtype == 7 || msgtype == 8) {
-            EncodeObject encodeObject = new EncodeObject(body, 
this.getMsgtype(), isCompressEnd,
-                    isReport, isGroupIdTransfer, dt / 1000, 
idGenerator.getNextInt(),
-                    groupId, streamId, attrs.toString());
+            EncodeObject encodeObject =
+                    new EncodeObject(Collections.singletonList(body), 
this.getMsgtype(), isCompressEnd,
+                            isReport, isGroupIdTransfer, dt / 1000, 
idGenerator.getNextInt(),
+                            groupId, streamId, attrs.toString());
             encodeObject.setSupportLF(isSupportLF);
             sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, 
timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
             
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
             if (isCompressEnd) {
                 attrs.append("&cp=snappy");
-                sender.asyncSendMessage(new EncodeObject(body, 
attrs.toString(),
+                sender.asyncSendMessage(new 
EncodeObject(Collections.singletonList(body), attrs.toString(),
                         idGenerator.getNextId(), this.getMsgtype(), true, 
groupId),
                         callback, msgUUID, timeout, timeUnit);
             } else {
-                sender.asyncSendMessage(new EncodeObject(body, 
attrs.toString(), idGenerator.getNextId(),
-                        this.getMsgtype(), false, groupId),
+                sender.asyncSendMessage(
+                        new EncodeObject(Collections.singletonList(body), 
attrs.toString(), idGenerator.getNextId(),
+                                this.getMsgtype(), false, groupId),
                         callback, msgUUID, timeout, timeUnit);
             }
         }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java
index 9ee1726c52..e71838ae76 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java
@@ -25,10 +25,10 @@ public enum LoadBalance {
     WEIGHT_RANDOM("weight random", 3),
     WEIGHT_ROBIN("weight robin", 4);
 
-    private String name;
-    private int index;
+    private final String name;
+    private final int index;
 
-    private LoadBalance(String name, int index) {
+    LoadBalance(String name, int index) {
         this.name = name;
         this.index = index;
     }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
index 155031bee6..1b18096229 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
@@ -27,25 +27,13 @@ import java.util.concurrent.TimeUnit;
 
 public interface MessageSender {
 
-    /**
-     * This method provides a synchronized function which you want to send data
-     * with extra attributes except  groupId,streamId,dt,etc
-     * This method is deprecated,we suggest you don't use it.
-     *
-     * @param body       The data will be sent
-     * @param attributes The attributes you want to add
-     */
-    @Deprecated
-    public SendResult sendMessage(byte[] body, String attributes, String 
msgUUID,
-            long timeout, TimeUnit timeUnit);
-
     /**
      * This method provides a synchronized  function which you want to send 
data  without packing
      *
      * @param body The data will be sent
      *             
      */
-    public SendResult sendMessage(byte[] body, String groupId, String 
streamId, long dt, String msgUUID,
+    SendResult sendMessage(byte[] body, String groupId, String streamId, long 
dt, String msgUUID,
             long timeout, TimeUnit timeUnit);
 
     /**
@@ -57,7 +45,7 @@ public interface MessageSender {
      * @param extraAttrMap The attributes you want to add,
      *                     and each element of extraAttrMap contains a pair 
like attrKey,attrValue
      */
-    public SendResult sendMessage(byte[] body, String groupId, String 
streamId, long dt, String msgUUID,
+    SendResult sendMessage(byte[] body, String groupId, String streamId, long 
dt, String msgUUID,
             long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap);
 
     /**
@@ -66,7 +54,7 @@ public interface MessageSender {
      *
      * @param bodyList The data will be sent,which is a collection consisting 
of byte arrays
      */
-    public SendResult sendMessage(List<byte[]> bodyList, String groupId, 
String streamId, long dt, String msgUUID,
+    SendResult sendMessage(List<byte[]> bodyList, String groupId, String 
streamId, long dt, String msgUUID,
             long timeout, TimeUnit timeUnit);
 
     /**
@@ -78,23 +66,9 @@ public interface MessageSender {
      * @param extraAttrMap The attributes you want to add,
      *                     and each element of extraAttrMap contains a pair 
like attrKey,attrValue
      */
-    public SendResult sendMessage(List<byte[]> bodyList, String groupId, 
String streamId, long dt, String msgUUID,
+    SendResult sendMessage(List<byte[]> bodyList, String groupId, String 
streamId, long dt, String msgUUID,
             long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap);
 
-    /**
-     * This method provides an asynchronized  function which you want to send 
data
-     * with extra attributes except  groupId,streamId,dt,etc
-     * This method is deprecated,we suggest you don't use it.
-     * 
-     *
-     * @param body       The data will be sent
-     * @param attributes The attributes you want to add
-     */
-    @Deprecated
-    public void asyncSendMessage(SendMessageCallback callback,
-            byte[] body, String attributes, String msgUUID,
-            long timeout, TimeUnit timeUnit) throws ProxysdkException;
-
     /**
      * This method provides a synchronized  function which you want to send 
data without packing
      * with extra attributes except  groupId,streamId,dt,etc
@@ -104,7 +78,7 @@ public interface MessageSender {
      * @param extraAttrMap The attributes you want to add,
      *                     and each element of extraAttrMap contains a pair 
like attrKey,attrValue
      */
-    public void asyncSendMessage(SendMessageCallback callback,
+    void asyncSendMessage(SendMessageCallback callback,
             byte[] body, String groupId, String streamId, long dt, String 
msgUUID,
             long timeout, TimeUnit timeUnit,
             Map<String, String> extraAttrMap) throws ProxysdkException;
@@ -116,7 +90,7 @@ public interface MessageSender {
      * @param callback The implementation of callback function
      * @param body     The data will be sent
      */
-    public void asyncSendMessage(SendMessageCallback callback,
+    void asyncSendMessage(SendMessageCallback callback,
             byte[] body, String groupId, String streamId, long dt, String 
msgUUID,
             long timeout, TimeUnit timeUnit) throws ProxysdkException;
 
@@ -126,7 +100,7 @@ public interface MessageSender {
      *
      * @param bodyList The data will be sent,which is a collection consisting 
of byte arrays
      */
-    public void asyncSendMessage(SendMessageCallback callback,
+    void asyncSendMessage(SendMessageCallback callback,
             List<byte[]> bodyList, String groupId, String streamId, long dt, 
String msgUUID,
             long timeout, TimeUnit timeUnit) throws ProxysdkException;
 
@@ -139,7 +113,7 @@ public interface MessageSender {
      * @param extraAttrMap The attributes you want to add, and each
      *                     element of extraAttrMap contains a pair like 
attrKey,attrValue
      */
-    public void asyncSendMessage(SendMessageCallback callback,
+    void asyncSendMessage(SendMessageCallback callback,
             List<byte[]> bodyList, String groupId, String streamId, long dt, 
String msgUUID,
             long timeout, TimeUnit timeUnit,
             Map<String, String> extraAttrMap) throws ProxysdkException;
@@ -170,5 +144,5 @@ public interface MessageSender {
     void asyncSendMessage(String inlongGroupId, String inlongStreamId, 
List<byte[]> bodyList,
             SendMessageCallback callback) throws ProxysdkException;
 
-    public void close();
+    void close();
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
index e875d01e77..a89fef4fe9 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
@@ -35,7 +35,6 @@ public class EncodeObject {
     private static final Splitter.MapSplitter MAP_SPLITTER = 
Splitter.on(AttributeConstants.SEPARATOR).trimResults()
             .withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
 
-    private byte[] bodyBytes;
     private String attributes;
     private String messageId;
     private int msgtype;
@@ -74,33 +73,12 @@ public class EncodeObject {
     }
 
     /* Used by de_serialization. */
-    public EncodeObject(byte[] bodyBytes, String attributes) {
-        this.bodyBytes = bodyBytes;
+    public EncodeObject(List<byte[]> bodyList, String attributes) {
+        this.bodylist = bodyList;
         this.attributes = attributes;
         handleAttr(attributes);
     }
 
-    /* Used by serialization.But never used */
-    // old version:we need add message id by attr
-    public EncodeObject(byte[] bodyBytes, String attributes, String messageId) 
{
-        this.bodyBytes = bodyBytes;
-        this.messageId = messageId;
-        this.attributes = attributes + "&messageId=" + messageId;
-        addRTMS(MsgType.MSG_COMMON_SERVICE.getValue());
-    }
-
-    // used for bytes initializtion,msgtype=3/5
-    public EncodeObject(byte[] bodyBytes, String attributes, String messageId,
-            int msgtype, boolean isCompress, final String groupId) {
-        this.bodyBytes = bodyBytes;
-        this.messageId = messageId;
-        this.attributes = attributes + "&messageId=" + messageId;
-        this.msgtype = msgtype;
-        this.groupId = groupId;
-        this.isCompress = isCompress;
-        addRTMS(msgtype);
-    }
-
     // used for bodylist initializtion,msgtype=3/5
     public EncodeObject(List<byte[]> bodyList, String attributes, String 
messageId,
             int msgtype, boolean isCompress, final String groupId) {
@@ -113,23 +91,6 @@ public class EncodeObject {
         addRTMS(msgtype);
     }
 
-    // used for bytes initializtion,msgtype=7/8
-    public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress, 
boolean isReport,
-            boolean isGroupIdTransfer, long dt, long seqId, String groupId,
-            String streamId, String commonattr) {
-        this.bodyBytes = bodyBytes;
-        this.msgtype = msgtype;
-        this.isCompress = isCompress;
-        this.isReport = isReport;
-        this.dt = dt;
-        this.isGroupIdTransfer = isGroupIdTransfer;
-        this.commonattr = commonattr;
-        this.messageId = String.valueOf(seqId);
-        this.groupId = groupId;
-        this.streamId = streamId;
-        addRTMS(msgtype);
-    }
-
     // used for bodylist initializtion,msgtype=7/8
     public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress,
             boolean isReport, boolean isGroupIdTransfer, long dt,
@@ -147,26 +108,6 @@ public class EncodeObject {
         addRTMS(msgtype);
     }
 
-    // file agent, used for bytes initializtion,msgtype=7/8
-    public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress,
-            boolean isReport, boolean isGroupIdTransfer, long dt,
-            long seqId, String groupId, String streamId, String commonattr,
-            String messageKey, String proxyIp) {
-        this.bodyBytes = bodyBytes;
-        this.msgtype = msgtype;
-        this.isCompress = isCompress;
-        this.isReport = isReport;
-        this.dt = dt;
-        this.isGroupIdTransfer = isGroupIdTransfer;
-        this.commonattr = commonattr;
-        this.messageId = String.valueOf(seqId);
-        this.groupId = groupId;
-        this.streamId = streamId;
-        this.messageKey = messageKey;
-        this.proxyIp = proxyIp;
-        addRTMS(msgtype);
-    }
-
     // file agent, used for bodylist initializtion,msgtype=7/8
     public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress,
             boolean isReport, boolean isGroupIdTransfer, long dt,
@@ -395,14 +336,6 @@ public class EncodeObject {
         this.msgtype = msgtype;
     }
 
-    public byte[] getBodyBytes() {
-        return bodyBytes;
-    }
-
-    public void setBodyBytes(byte[] bodyBytes) {
-        this.bodyBytes = bodyBytes;
-    }
-
     public String getAttributes() {
         return attributes;
     }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
index 63476e7a50..2038a8b8d6 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.List;
 
 public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
@@ -71,7 +72,13 @@ public class ProtocolDecoder extends 
MessageToMessageDecoder<ByteBuf> {
                 attrBytes = new byte[attrLength];
                 buffer.readBytes(attrBytes);
             }
-            EncodeObject object = new EncodeObject(bodyBytes, new 
String(attrBytes, StandardCharsets.UTF_8));
+            EncodeObject object;
+            if (bodyBytes == null) {
+                object = new EncodeObject(new String(attrBytes, 
StandardCharsets.UTF_8));
+            } else {
+                object = new EncodeObject(Collections.singletonList(bodyBytes),
+                        new String(attrBytes, StandardCharsets.UTF_8));
+            }
             object.setMsgtype(5);
             out.add(object);
         } else if (msgType == 7) {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
index fe6dec9733..ecc1e1de91 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java
@@ -17,10 +17,12 @@
 
 package org.apache.inlong.sdk.dataproxy.codec;
 
+import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.EncryptInfo;
 import org.apache.inlong.sdk.dataproxy.network.Utils;
 import org.apache.inlong.sdk.dataproxy.utils.EncryptUtil;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -31,11 +33,10 @@ import org.slf4j.LoggerFactory;
 import org.xerial.snappy.Snappy;
 
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.security.SecureRandom;
-import java.util.Iterator;
 import java.util.List;
 
 import static org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_AUTH;
@@ -44,30 +45,26 @@ import static 
org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_ENCRYPT
 
 public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> {
 
-    private static final Logger logger = LoggerFactory
-            .getLogger(ProtocolEncoder.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(ProtocolEncoder.class);
+    private static final LogCounter exptCounter = new LogCounter(10, 100000, 
60 * 1000L);
 
     protected void encode(ChannelHandlerContext ctx,
             EncodeObject message, List<Object> out) throws Exception {
         ByteBuf buf = null;
         try {
-            EncodeObject object = message;
-            if (object.getMsgtype() == 3) {
-                buf = writeToBuf3(object);
+            if (message.getMsgtype() == 3) {
+                buf = writeToBuf3(message);
+            } else if (message.getMsgtype() == 5) {
+                buf = writeToBuf5(message);
+            } else if (message.getMsgtype() == 7) {
+                buf = writeToBuf7(message);
+            } else if (message.getMsgtype() == 8) {
+                buf = writeToBuf8(message);
             }
-            if (object.getMsgtype() == 5) {
-                buf = writeToBuf5(object);
+        } catch (Throwable ex) {
+            if (exptCounter.shouldPrint()) {
+                logger.error("ProtocolEncoder encode message failure", ex);
             }
-
-            if (object.getMsgtype() == 7) {
-                buf = writeToBuf7(object);
-            }
-            if (object.getMsgtype() == 8) {
-                buf = writeToBuf8(object);
-            }
-        } catch (Exception e) {
-            logger.error("{}", e.getMessage());
-            e.printStackTrace();
         }
         if (buf != null) {
             out.add(buf);
@@ -113,8 +110,10 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
                 buf.writeBytes(endAttr.getBytes("utf8"));
             }
             buf.writeShort(0xee01);
-        } catch (Exception e) {
-            logger.error(e.getMessage());
+        } catch (Throwable ex) {
+            if (exptCounter.shouldPrint()) {
+                logger.error("Write type8 data exception", ex);
+            }
         }
         return buf;
     }
@@ -176,7 +175,7 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
             buf.writeInt((int) object.getDt());
 
             buf.writeShort(cnt);
-            buf.writeInt(Integer.valueOf(object.getMessageId()));
+            buf.writeInt(Integer.parseInt(object.getMessageId()));
 
             buf.writeInt(body.length);
             buf.writeBytes(body);
@@ -195,53 +194,41 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
             byte[] body = null;
             int cnt = 1;
 
-            if (object.getBodylist() != null && object.getBodylist().size() != 
0) {
+            if (object.getBodylist() != null && 
!object.getBodylist().isEmpty()) {
                 if (object.getCnt() > 0) {
                     cnt = object.getCnt();
                 } else {
                     cnt = object.getBodylist().size();
                 }
-
                 ByteArrayOutputStream out = new ByteArrayOutputStream();
-                Iterator<byte[]> iter = object.getBodylist().iterator();
-
                 if (object.isSupportLF()) {
+                    int totalCnt = 0;
                     ByteArrayOutputStream data = new ByteArrayOutputStream();
-                    int len = object.getBodylist().size();
-                    for (int i = 0; i < len - 1; i++) {
-                        data.write(object.getBodylist().get(i));
-                        data.write("\n".getBytes("utf8"));
+                    for (byte[] entry : object.getBodylist()) {
+                        if (totalCnt++ > 0) {
+                            data.write("\n".getBytes("utf8"));
+                        }
+                        data.write(entry);
                     }
-                    data.write(object.getBodylist().get(len - 1));
-                    ByteBuffer databuffer = ByteBuffer.allocate(4);
-                    databuffer.putInt(data.toByteArray().length);
-                    out.write(databuffer.array());
+                    ByteBuffer dataBuffer = ByteBuffer.allocate(4);
+                    dataBuffer.putInt(data.toByteArray().length);
+                    out.write(dataBuffer.array());
                     out.write(data.toByteArray());
                 } else {
-                    while (iter.hasNext()) {
-                        byte[] entry = iter.next();
-                        ByteBuffer databuffer = ByteBuffer.allocate(4);
-                        databuffer.putInt(entry.length);
-                        out.write(databuffer.array());
+                    for (byte[] entry : object.getBodylist()) {
+                        ByteBuffer dataBuffer = ByteBuffer.allocate(4);
+                        dataBuffer.putInt(entry.length);
+                        out.write(dataBuffer.array());
                         out.write(entry);
                     }
                 }
                 body = out.toByteArray();
             }
-            // send single message one time
-            if (object.getBodyBytes() != null && object.getBodyBytes().length 
!= 0) {
-                ByteArrayOutputStream out = new ByteArrayOutputStream();
-
-                ByteBuffer databuffer = ByteBuffer.allocate(4);
-                databuffer.putInt(object.getBodyBytes().length);
-                out.write(databuffer.array());
-                out.write(object.getBodyBytes());
-                body = out.toByteArray();
-            }
-
             buf = constructBody(body, object, totalLength, cnt);
-        } catch (Exception e) {
-            logger.error("writeToBuf7 has {}", e);
+        } catch (Throwable ex) {
+            if (exptCounter.shouldPrint()) {
+                logger.error("Write type7 data exception", ex);
+            }
         }
         return buf;
     }
@@ -253,11 +240,9 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
             byte[] body = null;
 
             // send multiple messages one time
-            if (object.getBodylist() != null && object.getBodylist().size() != 
0) {
+            if (object.getBodylist() != null && 
!object.getBodylist().isEmpty()) {
                 ByteArrayOutputStream out = new ByteArrayOutputStream();
-                Iterator<byte[]> iter = object.getBodylist().iterator();
-                while (iter.hasNext()) {
-                    byte[] entry = iter.next();
+                for (byte[] entry : object.getBodylist()) {
                     ByteBuffer byteBuffer = ByteBuffer.allocate(4);
                     byteBuffer.putInt(entry.length);
                     out.write(byteBuffer.array());
@@ -265,15 +250,6 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
                 }
                 body = out.toByteArray();
             }
-            // send single message one time
-            if (object.getBodyBytes() != null && object.getBodyBytes().length 
!= 0) {
-                ByteArrayOutputStream out = new ByteArrayOutputStream();
-                ByteBuffer byteBuffer = ByteBuffer.allocate(4);
-                byteBuffer.putInt(object.getBodyBytes().length);
-                out.write(byteBuffer.array());
-                out.write(object.getBodyBytes());
-                body = out.toByteArray();
-            }
             if (body != null) {
                 String msgAttrs = object.getAttributes();
                 if (object.isCompress()) {
@@ -312,25 +288,14 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
                 buf.writeInt(msgAttrs.getBytes("utf8").length);
                 buf.writeBytes(msgAttrs.getBytes("utf8"));
             }
-        } catch (Exception e) {
-            logger.error("{}", e.getMessage());
-            e.printStackTrace();
+        } catch (Throwable ex) {
+            if (exptCounter.shouldPrint()) {
+                logger.error("Write type5 data exception", ex);
+            }
         }
         return buf;
     }
 
-    /*
-     * private ChannelBuffer writeToBuf4(EncodeObject object) { ChannelBuffer 
buf = ChannelBuffers.dynamicBuffer(); try
-     * { int totalLength = 1 + 4 + 4; byte[] body = null;
-     * 
-     * //send single message one time if (object.getBodyBytes() != null && 
object.getBodyBytes().length != 0) { body =
-     * object.getBodyBytes(); } totalLength = totalLength + body.length +
-     * object.getAttributes().getBytes("utf8").length; 
buf.writeInt(totalLength); buf.writeByte(4);
-     * buf.writeInt(body.length); buf.writeBytes(body); 
buf.writeInt(object.getAttributes().getBytes().length);
-     * buf.writeBytes(object.getAttributes().getBytes()); } catch (Exception 
e) { logger.error(e.getMessage()); } return
-     * buf; }
-     */
-
     private ByteBuf writeToBuf3(EncodeObject object) {
         ByteBuf buf = null;
         try {
@@ -338,20 +303,17 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
             byte[] body = null;
 
             // send multiple messages one time
-            if (object.getBodylist() != null && object.getBodylist().size() != 
0) {
+            if (object.getBodylist() != null && 
!object.getBodylist().isEmpty()) {
+                int totalCnt = 0;
                 ByteArrayOutputStream out = new ByteArrayOutputStream();
-                Iterator<byte[]> iter = object.getBodylist().iterator();
-                while (iter.hasNext()) {
-                    byte[] entry = iter.next();
+                for (byte[] entry : object.getBodylist()) {
+                    if (totalCnt++ > 0) {
+                        
out.write(AttributeConstants.LINE_FEED_SEP.getBytes(StandardCharsets.UTF_8));
+                    }
                     out.write(entry);
-                    out.write("\n".getBytes("utf8"));
                 }
                 body = out.toByteArray();
             }
-            // send single message one time
-            if (object.getBodyBytes() != null && object.getBodyBytes().length 
!= 0) {
-                body = object.getBodyBytes();
-            }
             if (body != null) {
                 String msgAttrs = object.getAttributes();
                 if (object.isCompress()) {
@@ -390,9 +352,10 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
                 buf.writeInt(msgAttrs.getBytes("utf8").length);
                 buf.writeBytes(msgAttrs.getBytes("utf8"));
             }
-        } catch (Exception e) {
-            logger.error("{}", e.getMessage());
-            e.printStackTrace();
+        } catch (Throwable ex) {
+            if (exptCounter.shouldPrint()) {
+                logger.error("Write type3 data exception", ex);
+            }
         }
         return buf;
     }
@@ -407,9 +370,10 @@ public class ProtocolEncoder extends 
MessageToMessageEncoder<EncodeObject> {
                     tmpData, 0);
             body = new byte[len];
             System.arraycopy(tmpData, 0, body, 0, len);
-        } catch (IOException e) {
-            logger.error("{}", e.getMessage());
-            e.printStackTrace();
+        } catch (Throwable ex) {
+            if (exptCounter.shouldPrint()) {
+                logger.error("Compress data exception", ex);
+            }
         }
         return body;
     }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
index f27506fe22..6acfe09d8a 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java
@@ -35,7 +35,7 @@ public class EncryptConfigEntry implements 
java.io.Serializable {
     private String pubKey;
     private byte[] aesKey;
     private String rsaEncryptedKey;
-    private AtomicLong lastUpdateTime = new AtomicLong(0);
+    private final AtomicLong lastUpdateTime = new AtomicLong(0);
 
     public EncryptConfigEntry(final String userName, final String version, 
final String pubKey) {
         this.userName = userName;
@@ -43,7 +43,6 @@ public class EncryptConfigEntry implements 
java.io.Serializable {
         this.pubKey = pubKey;
         this.aesKey = null;
         this.rsaEncryptedKey = null;
-        // this.rsaKey = EncryptUtil.loadPublicKeyByText(pubKey);
     }
 
     public String getVersion() {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
index 26e490fc37..863f197353 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
@@ -45,7 +45,7 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.security.SecureRandom;
-import java.util.Iterator;
+import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_COMPRESS;
@@ -116,9 +116,8 @@ public class UdpClientExample {
             boolean isGroupIdTransfer, long dt, long seqId, String groupId, 
String streamId,
             String attr) throws UnsupportedEncodingException {
         EncodeObject encodeObject =
-                new EncodeObject(getRandomString(5).getBytes("UTF-8"), msgType,
-                        isCompress,
-                        isReport, isGroupIdTransfer, dt, seqId, groupId, 
streamId, attr);
+                new 
EncodeObject(Collections.singletonList(getRandomString(5).getBytes("UTF-8")),
+                        msgType, isCompress, isReport, isGroupIdTransfer, dt, 
seqId, groupId, streamId, attr);
         return encodeObject;
     }
 
@@ -142,48 +141,37 @@ public class UdpClientExample {
             byte[] body = null;
             int cnt = 1;
 
-            if (object.getBodylist() != null && object.getBodylist().size() != 
0) {
+            if (object.getBodylist() != null && 
!object.getBodylist().isEmpty()) {
                 if (object.getCnt() > 0) {
                     cnt = object.getCnt();
                 } else {
                     cnt = object.getBodylist().size();
                 }
                 ByteArrayOutputStream out = new ByteArrayOutputStream();
-                Iterator<byte[]> iter = object.getBodylist().iterator();
 
                 if (object.isSupportLF()) {
+                    int totalCnt = 0;
                     ByteArrayOutputStream data = new ByteArrayOutputStream();
-                    int len = object.getBodylist().size();
-                    for (int i = 0; i < len - 1; i++) {
-                        data.write(object.getBodylist().get(i));
-                        data.write("\n".getBytes("utf8"));
+                    for (byte[] entry : object.getBodylist()) {
+                        if (totalCnt++ > 0) {
+                            data.write("\n".getBytes("utf8"));
+                        }
+                        data.write(entry);
                     }
-                    data.write(object.getBodylist().get(len - 1));
-                    ByteBuffer databuffer = ByteBuffer.allocate(4);
-                    databuffer.putInt(data.toByteArray().length);
-                    out.write(databuffer.array());
+                    ByteBuffer dataBuffer = ByteBuffer.allocate(4);
+                    dataBuffer.putInt(data.toByteArray().length);
+                    out.write(dataBuffer.array());
                     out.write(data.toByteArray());
                 } else {
-                    while (iter.hasNext()) {
-                        byte[] entry = iter.next();
-                        ByteBuffer databuffer = ByteBuffer.allocate(4);
-                        databuffer.putInt(entry.length);
-                        out.write(databuffer.array());
+                    for (byte[] entry : object.getBodylist()) {
+                        ByteBuffer dataBuffer = ByteBuffer.allocate(4);
+                        dataBuffer.putInt(entry.length);
+                        out.write(dataBuffer.array());
                         out.write(entry);
                     }
                 }
                 body = out.toByteArray();
             }
-            if (object.getBodyBytes() != null && object.getBodyBytes().length 
!= 0) {
-                ByteArrayOutputStream out = new ByteArrayOutputStream();
-
-                ByteBuffer databuffer = ByteBuffer.allocate(4);
-                databuffer.putInt(object.getBodyBytes().length);
-                out.write(databuffer.array());
-                out.write(object.getBodyBytes());
-                body = out.toByteArray();
-            }
-
             if (body != null) {
                 if (object.isCompress()) {
                     body = processCompress(body);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
index c891788465..e1412a9936 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java
@@ -847,7 +847,8 @@ public class ClientMgr {
         }
         logger.debug("active host to send heartbeat! {}", 
hostInfo.getReferenceName());
         String hbMsg = "heartbeat:" + hostInfo.getHostName();
-        EncodeObject encodeObject = new 
EncodeObject(hbMsg.getBytes(StandardCharsets.UTF_8),
+        EncodeObject encodeObject = new EncodeObject(
+                
Collections.singletonList(hbMsg.getBytes(StandardCharsets.UTF_8)),
                 8, false, false, false, System.currentTimeMillis() / 1000, 1, 
"", "", "");
         try {
             if (configure.isNeedAuthentication()) {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
index d9817b7a03..f0bd45d13e 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
@@ -171,22 +171,6 @@ public class PbProtocolMessageSender implements 
MessageSender, Configurable {
         return context;
     }
 
-    /**
-     * sendMessage
-     * 
-     * @param      body
-     * @param      attributes
-     * @param      msgUUID
-     * @param      timeout
-     * @param      timeUnit
-     * @return                SendResult
-     * @deprecated
-     */
-    @Override
-    public SendResult sendMessage(byte[] body, String attributes, String 
msgUUID, long timeout, TimeUnit timeUnit) {
-        return SendResult.INVALID_ATTRIBUTES;
-    }
-
     /**
      * sendMessage
      * 
@@ -345,24 +329,6 @@ public class PbProtocolMessageSender implements 
MessageSender, Configurable {
         return refResult.get();
     }
 
-    /**
-     * asyncSendMessage
-     * 
-     * @param      callback
-     * @param      body
-     * @param      attributes
-     * @param      msgUUID
-     * @param      timeout
-     * @param      timeUnit
-     * @throws     ProxysdkException
-     * @deprecated
-     */
-    @Override
-    public void asyncSendMessage(SendMessageCallback callback, byte[] body, 
String attributes, String msgUUID,
-            long timeout, TimeUnit timeUnit) throws ProxysdkException {
-        throw new ProxysdkException("Not support");
-    }
-
     /**
      * asyncSendMessage
      * 
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
index e735850360..ed62ccb4b6 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.util.Collections;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -195,7 +196,7 @@ public class MetricWorkerThread extends Thread implements 
Closeable {
                 sender.asyncSendMessage(encodeObject, callBack,
                         String.valueOf(System.currentTimeMillis()), 20, 
TimeUnit.SECONDS);
             } else {
-                logger.error("Send metric failure: {} {}", 
encodeObject.getBodyBytes(), encodeObject.getBodylist());
+                logger.error("Send metric failure: {}", 
encodeObject.getBodylist());
             }
         } catch (Throwable ex) {
             logger.warn("Send metric throw exception", ex);
@@ -204,7 +205,7 @@ public class MetricWorkerThread extends Thread implements 
Closeable {
     }
 
     private void sendSingleLine(String line, String streamId, long dtTime) {
-        EncodeObject encodeObject = new EncodeObject(line.getBytes(), 7,
+        EncodeObject encodeObject = new 
EncodeObject(Collections.singletonList(line.getBytes()), 7,
                 false, false, false,
                 dtTime, idGenerator.getNextInt(),
                 metricConfig.getMetricGroupId(), streamId, "", "", 
Utils.getLocalIp());
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java
new file mode 100644
index 0000000000..edb14c62b2
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.utils;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LogCounter {
+
+    private final AtomicLong counter = new AtomicLong(0);
+
+    private long start = 10L;
+    private long control = 100000L;
+    private long reset = 60 * 1000L;
+
+    private AtomicLong lastLogTime = new 
AtomicLong(System.currentTimeMillis());
+
+    public LogCounter(long start, long control, long reset) {
+        this.start = start;
+        this.control = control;
+        this.reset = reset;
+    }
+
+    public boolean shouldPrint() {
+        long curTime = lastLogTime.get();
+        if (System.currentTimeMillis() - curTime > reset) {
+            if (lastLogTime.compareAndSet(curTime, 
System.currentTimeMillis())) {
+                counter.set(0);
+            }
+        }
+        return counter.incrementAndGet() <= start || counter.get() % control 
== 0;
+    }
+}

Reply via email to