This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new c3807e6fd [INLONG-6550][SDK] Fix the decode heartbeat response error (#6551) c3807e6fd is described below commit c3807e6fdcf9245d59f4ed5451cb09f16593d2cf Author: xueyingzhang <86780714+poc...@users.noreply.github.com> AuthorDate: Tue Nov 15 19:24:19 2022 +0800 [INLONG-6550][SDK] Fix the decode heartbeat response error (#6551) --- .../inlong/sdk/dataproxy/codec/EncodeObject.java | 6 +----- .../inlong/sdk/dataproxy/codec/ProtocolDecoder.java | 18 ++++++++++++++---- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java index 8634621cf..fd8ab8c8c 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java @@ -67,11 +67,7 @@ public class EncodeObject { private String errMsg; private String dpIp; - /* Used by de_serialization. msgtype=8*/ - public EncodeObject() { - } - - /* Used by de_serialization. msgtype=7*/ + /* Used by de_serialization. msgtype=7/8 */ public EncodeObject(String attributes) { handleAttr(attributes); } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java index 7bf9bd883..af829f8ef 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java @@ -93,11 +93,21 @@ public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> { out.add(object); } else if (msgType == 8) { - int attrlen = buffer.getShort(4 + 1 + 4 + 1 + 4 + 2); - buffer.skipBytes(13 + attrlen + 2); - EncodeObject object = new EncodeObject(); + // dataTime(4) + body_ver(1) + body_len(4) + body + attr_len(2) + attr + magic(2) + buffer.skipBytes(4 + 1 + 4); // skip datatime, body_ver and body_len + final short load = buffer.readShort(); // read from body + int attrLen = buffer.readShort(); + byte[] attrBytes = null; + if (attrLen > 0) { + attrBytes = new byte[attrLen]; + buffer.readBytes(attrBytes); + } + buffer.skipBytes(2); // skip magic + + String attrs = (attrBytes == null ? "" : new String(attrBytes, StandardCharsets.UTF_8)); + EncodeObject object = new EncodeObject(attrs); object.setMsgtype(8); - object.setLoad(buffer.getShort(4 + 1 + 4 + 1 + 4)); + object.setLoad(load); out.add(object); } }