This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bfbeae27e1 [INLONG-11698][SDK] Optimize TCP encode and decode 
implementation (#11699)
bfbeae27e1 is described below

commit bfbeae27e125edd8b5849ffbf4cfa415193be5a3
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Tue Jan 21 16:39:09 2025 +0800

    [INLONG-11698][SDK] Optimize TCP encode and decode implementation (#11699)
    
    * [INLONG-11698][SDK] Optimize TCP encode and decode implementation
    
    * [INLONG-11698][SDK] Optimize TCP encode and decode implementation
    
    ---------
    
    Co-authored-by: gosonzhang <gosonzh...@tencent.com>
---
 .../inlong/sdk/dataproxy/common/ErrorCode.java     |  19 +
 .../sdk/dataproxy/network/tcp/TcpCallFuture.java   | 135 ++++++
 .../sdk/dataproxy/network/tcp/TcpNettyClient.java  | 480 +++++++++++++++++++++
 .../dataproxy/network/tcp/codec/DecodeObject.java  | 144 +++++++
 .../dataproxy/network/tcp/codec/EncodeObject.java  | 188 ++++++++
 .../network/tcp/codec/ProtocolDecoder.java         | 114 +++++
 .../network/tcp/codec/ProtocolEncoder.java         | 121 ++++++
 7 files changed, 1201 insertions(+)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
index 5f4cd0cdc8..d2cb671a58 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
@@ -62,6 +62,25 @@ public enum ErrorCode {
     META_FIELD_VALUE_ILLEGAL(54, "Meta field value illegal"),
 
     //
+    CONNECTION_UNAVAILABLE(111, "Connection unavailable"),
+    CONNECTION_BREAK(112, "Connection break"),
+    CONNECTION_UNWRITABLE(113, "Connection unwritable"),
+    CONNECTION_WRITE_EXCEPTION(114, "Connection write exception"),
+    DUPLICATED_MESSAGE_ID(115, "Duplicated message id"),
+    SEND_WAIT_INTERRUPT(116, "Send wait interrupted"),
+    //
+    SEND_WAIT_TIMEOUT(121, "Send wait timeout"),
+    SEND_ON_EXCEPTION(122, "Send on exception"),
+
+    // dataproxy return failure
+    DP_SINK_SERVICE_UNREADY(151, "DataProxy sink service unready"),
+    DP_INVALID_ATTRS(152, "DataProxy return invalid attributes"),
+    DP_EMPTY_BODY(153, "DataProxy return empty body"),
+    DP_BODY_EXCEED_MAX_LEN(154, "DataProxy return body length over max"),
+    DP_UNCONFIGURED_GROUPID_OR_STREAMID(155, "DataProxy return unconfigured 
groupId or streamId"),
+    //
+    DP_RECEIVE_FAILURE(160, "DataProxy return message receive failure"),
+
     UNKNOWN_ERROR(9999, "Unknown error");
 
     public static ErrorCode valueOf(int value) {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpCallFuture.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpCallFuture.java
new file mode 100644
index 0000000000..90132018a7
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpCallFuture.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp;
+
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.network.tcp.codec.EncodeObject;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * TCP Call Future class
+ *
+ * a future implementation for tcp RPCs.
+ */
+public class TcpCallFuture implements MsgSendCallback {
+
+    private final int messageId;
+    private final String groupId;
+    private final String streamId;
+    private final int msgCnt;
+    private final long rtTime;
+    private final String clientAddr;
+    private final long chanTerm;
+    private final String chanStr;
+    private final MsgSendCallback callback;
+    private final CountDownLatch latch = new CountDownLatch(1);
+    private final boolean isAsyncCall;
+    private ProcessResult result = null;
+    private Throwable error = null;
+
+    public TcpCallFuture(EncodeObject encObject,
+            String clientAddr, long chanTerm, String chanStr, MsgSendCallback 
callback) {
+        this.messageId = encObject.getMessageId();
+        this.groupId = encObject.getGroupId();
+        this.streamId = encObject.getStreamId();
+        this.rtTime = encObject.getRtms();
+        this.msgCnt = encObject.getMsgCnt();
+        this.clientAddr = clientAddr;
+        this.chanTerm = chanTerm;
+        this.chanStr = chanStr;
+        this.callback = callback;
+        this.isAsyncCall = (callback != null);
+    }
+
+    @Override
+    public void onMessageAck(ProcessResult result) {
+        this.result = result;
+        latch.countDown();
+        if (isAsyncCall) {
+            callback.onMessageAck(result);
+        }
+    }
+
+    @Override
+    public void onException(Throwable ex) {
+        this.error = ex;
+        latch.countDown();
+        if (isAsyncCall) {
+            callback.onException(error);
+        }
+    }
+
+    public boolean get(ProcessResult processResult, long timeout, TimeUnit 
unit) {
+        try {
+            if (latch.await(timeout, unit)) {
+                if (error != null) {
+                    return 
processResult.setFailResult(ErrorCode.SEND_ON_EXCEPTION, error.getMessage());
+                }
+                return processResult.setFailResult(result);
+            } else {
+                return 
processResult.setFailResult(ErrorCode.SEND_WAIT_TIMEOUT);
+            }
+        } catch (Throwable ex) {
+            if (ex instanceof InterruptedException) {
+                return 
processResult.setFailResult(ErrorCode.SEND_WAIT_INTERRUPT);
+            } else {
+                return processResult.setFailResult(ErrorCode.UNKNOWN_ERROR, 
ex.getMessage());
+            }
+        }
+    }
+
+    public int getMessageId() {
+        return messageId;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+
+    public int getMsgCnt() {
+        return msgCnt;
+    }
+
+    public long getRtTime() {
+        return rtTime;
+    }
+
+    public String getClientAddr() {
+        return clientAddr;
+    }
+
+    public String getChanStr() {
+        return chanStr;
+    }
+
+    public long getChanTerm() {
+        return chanTerm;
+    }
+
+    public boolean isAsyncCall() {
+        return isAsyncCall;
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
new file mode 100644
index 0000000000..29899e6f58
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp;
+
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
+import org.apache.inlong.sdk.dataproxy.config.HostInfo;
+import org.apache.inlong.sdk.dataproxy.network.tcp.codec.EncodeObject;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.utils.AuthzUtils;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * TCP Netty client class
+ *
+ * Used to manage TCP netty client, including connection, closing, keep-alive, 
sending status check, etc.
+ */
+public class TcpNettyClient {
+
+    private final static int CLIENT_FAIL_CONNECT_WAIT_CNT = 3;
+    private static final Logger logger = 
LoggerFactory.getLogger(TcpNettyClient.class);
+    private static final LogCounter conExptCnt = new LogCounter(10, 100000, 60 
* 1000L);
+    private static final LogCounter hbExptCnt = new LogCounter(10, 100000, 60 
* 1000L);
+    private final static int CLIENT_STATUS_INIT = -1;
+    private final static int CLIENT_STATUS_READY = 0;
+    private final static int CLIENT_STATUS_FROZEN = 1;
+    private final static int CLIENT_STATUS_DEAD = 2;
+    private final static int CLIENT_STATUS_BUSY = 3;
+
+    private final String senderId;
+    private final TcpMsgSenderConfig tcpConfig;
+    private final Bootstrap bootstrap;
+    private final HostInfo hostInfo;
+    private final AtomicInteger conStatus = new 
AtomicInteger(CLIENT_STATUS_INIT);
+    private final AtomicLong channelTermId = new AtomicLong(0);
+    private final AtomicInteger clientUsingCnt = new AtomicInteger(0);
+    private final AtomicInteger msgSentCnt = new AtomicInteger(0);
+    private final AtomicInteger msgInflightCnt = new AtomicInteger(0);
+    private final AtomicLong chanInvalidTime = new AtomicLong(0);
+    private final AtomicInteger conFailCnt = new AtomicInteger(0);
+    private final AtomicLong lstConFailTime = new AtomicLong(0);
+    private final AtomicInteger chanSyncTimeoutCnt = new AtomicInteger(0);
+    private final AtomicLong chanFstBusyTime = new AtomicLong(0);
+    private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
+    private Channel channel = null;
+    private String channelStr = "";
+    private int lstRoundSentCnt = -1;
+    private int clientIdleRounds = 0;
+    private long fstIdleTime = 0;
+
+    public TcpNettyClient(String senderId,
+            Bootstrap bootstrap, HostInfo hostInfo, TcpMsgSenderConfig 
tcpConfig) {
+        this.conStatus.set(CLIENT_STATUS_INIT);
+        this.hostInfo = hostInfo;
+        this.tcpConfig = tcpConfig;
+        this.senderId = senderId;
+        this.bootstrap = bootstrap;
+    }
+
+    public boolean connect(boolean needPrint, long termId) {
+        // Initial status
+        this.conStatus.set(CLIENT_STATUS_INIT);
+        long curTime = System.currentTimeMillis();
+        final CountDownLatch awaitLatch = new CountDownLatch(1);
+        // Build connect to server
+        ChannelFuture future = bootstrap.connect(
+                new InetSocketAddress(hostInfo.getHostName(), 
hostInfo.getPortNumber()));
+        future.addListener(new ChannelFutureListener() {
+
+            public void operationComplete(ChannelFuture arg0) throws Exception 
{
+                awaitLatch.countDown();
+            }
+        });
+        try {
+            // Wait until the connection is built.
+            awaitLatch.await(tcpConfig.getConnectTimeoutMs(), 
TimeUnit.MILLISECONDS);
+        } catch (Throwable ex) {
+            if (conExptCnt.shouldPrint()) {
+                logger.warn("NettyClient({}) connect to {} exception",
+                        senderId, hostInfo.getReferenceName(), ex);
+            }
+            return false;
+        }
+        // Return if no connection is built.
+        if (!future.isSuccess()) {
+            this.conFailCnt.getAndIncrement();
+            this.lstConFailTime.set(System.currentTimeMillis());
+            if (conExptCnt.shouldPrint()) {
+                logger.warn("NettyClient({}) connect to {} failure, wast {}ms",
+                        senderId, hostInfo.getReferenceName(), 
(System.currentTimeMillis() - curTime));
+            }
+            return false;
+        }
+        this.channelTermId.set(termId);
+        this.channel = future.channel();
+        this.channelStr = this.channel.toString();
+        this.conFailCnt.set(0);
+        this.msgSentCnt.set(0);
+        this.chanSyncTimeoutCnt.set(0);
+        this.msgInflightCnt.set(0);
+        this.conStatus.set(CLIENT_STATUS_READY);
+        if (needPrint) {
+            logger.info("NettyClient({}) connect to {} success, wast {}ms",
+                    senderId, channel.toString(), (System.currentTimeMillis() 
- curTime));
+        }
+        return true;
+    }
+
+    public boolean close(boolean needPrint) {
+        this.conStatus.set(CLIENT_STATUS_DEAD);
+        long curTime = System.currentTimeMillis();
+        this.chanInvalidTime.set(curTime);
+        final CountDownLatch awaitLatch = new CountDownLatch(1);
+        boolean ret = true;
+        String channelStr = "";
+        try {
+            if (channel == null) {
+                channelStr = hostInfo.getReferenceName();
+            } else {
+                channelStr = channel.toString();
+                ChannelFuture future = channel.close();
+                future.addListener(new ChannelFutureListener() {
+
+                    public void operationComplete(ChannelFuture arg0) throws 
Exception {
+                        awaitLatch.countDown();
+                    }
+                });
+                // Wait until the connection is close.
+                awaitLatch.await(tcpConfig.getConCloseWaitPeriodMs(), 
TimeUnit.MILLISECONDS);
+                // Return if close this connection fail.
+                if (!future.isSuccess()) {
+                    ret = false;
+                }
+            }
+        } catch (Throwable ex) {
+            if (conExptCnt.shouldPrint()) {
+                logger.warn("NettyClient({}) close {} exception", senderId, 
channelStr, ex);
+            }
+            ret = false;
+        } finally {
+            this.channel = null;
+            this.channelStr = "";
+            this.msgInflightCnt.set(0);
+        }
+        if (needPrint) {
+            if (ret) {
+                logger.info("NettyClient({}) close {} success, wast {}ms",
+                        this.senderId, channelStr, (System.currentTimeMillis() 
- curTime));
+            } else {
+                logger.info("NettyClient({}) close {} failure, wast {}ms",
+                        this.senderId, channelStr, (System.currentTimeMillis() 
- curTime));
+            }
+        }
+        return ret;
+    }
+
+    public boolean reconnect(boolean needPrint, long termId) {
+        long curTime = System.currentTimeMillis();
+        if ((this.conFailCnt.get() >= CLIENT_FAIL_CONNECT_WAIT_CNT)
+                && (curTime - lstConFailTime.get() < 
this.tcpConfig.getReconFailWaitMs())) {
+            return false;
+        }
+        int curStatus = this.conStatus.get();
+        if (curStatus == CLIENT_STATUS_READY) {
+            return true;
+        } else if (curStatus == CLIENT_STATUS_BUSY) {
+            if (curTime - this.chanInvalidTime.get() < 
this.tcpConfig.getBusyReconnectWaitMs()) {
+                return false;
+            }
+        } else if (curStatus == CLIENT_STATUS_FROZEN) {
+            if (curTime - this.chanInvalidTime.get() < 
this.tcpConfig.getFrozenReconnectWaitMs()) {
+                return false;
+            }
+        }
+        rw.writeLock().lock();
+        try {
+            if (this.conStatus.get() == CLIENT_STATUS_READY) {
+                return true;
+            }
+            curTime = System.currentTimeMillis();
+            this.close(false);
+            if (this.connect(false, termId)) {
+                if (needPrint) {
+                    logger.info("NettyClient({}) re-connect to {} success, 
wast {}ms",
+                            senderId, this.channel.toString(), 
System.currentTimeMillis() - curTime);
+                }
+                return true;
+            } else {
+                if (needPrint) {
+                    logger.info("NettyClient({}) re-connect to {} failure",
+                            senderId, hostInfo.getReferenceName());
+                }
+                return false;
+            }
+        } finally {
+            rw.writeLock().unlock();
+        }
+    }
+
+    public int incClientUsingCnt() {
+        return clientUsingCnt.incrementAndGet();
+    }
+
+    public int getClientUsingCnt() {
+        return clientUsingCnt.get();
+    }
+
+    public int decClientUsingCnt() {
+        return clientUsingCnt.decrementAndGet();
+    }
+
+    public boolean write(long termId, EncodeObject encodeObject, ProcessResult 
procResult) {
+        if (this.conStatus.get() != CLIENT_STATUS_READY) {
+            return procResult.setFailResult(ErrorCode.CONNECTION_UNAVAILABLE);
+        }
+        this.rw.readLock().lock();
+        if (this.conStatus.get() != CLIENT_STATUS_READY) {
+            return procResult.setFailResult(ErrorCode.CONNECTION_UNAVAILABLE);
+        }
+        if (this.channelTermId.get() != termId) {
+            return procResult.setFailResult(ErrorCode.CONNECTION_BREAK);
+        }
+        if (!(this.channel.isOpen()
+                && this.channel.isActive() && this.channel.isWritable())) {
+            return procResult.setFailResult(ErrorCode.CONNECTION_UNWRITABLE);
+        }
+        try {
+            this.msgSentCnt.incrementAndGet();
+            this.channel.writeAndFlush(encodeObject);
+            this.msgInflightCnt.incrementAndGet();
+        } catch (Throwable ex) {
+            if (conExptCnt.shouldPrint()) {
+                logger.warn("NettyClient({}) write {} exception",
+                        this.senderId, this.channel.toString(), ex);
+            }
+            return 
procResult.setFailResult(ErrorCode.CONNECTION_WRITE_EXCEPTION, ex.getMessage());
+        } finally {
+            this.rw.readLock().unlock();
+        }
+        return procResult.setSuccess();
+    }
+
+    public void setFrozen(long termId) {
+        if (this.channelTermId.get() != termId
+                || this.conStatus.get() != CLIENT_STATUS_READY) {
+            return;
+        }
+        boolean changed = false;
+        rw.readLock().lock();
+        try {
+            if (this.channelTermId.get() != termId) {
+                return;
+            }
+            int curStatus = this.conStatus.get();
+            if (curStatus == CLIENT_STATUS_READY) {
+                this.conStatus.compareAndSet(curStatus, CLIENT_STATUS_FROZEN);
+                this.chanInvalidTime.set(System.currentTimeMillis());
+                changed = true;
+            }
+        } finally {
+            rw.readLock().unlock();
+        }
+        if (changed) {
+            logger.warn("NettyClient({}) set {} frozen!", senderId, 
hostInfo.getReferenceName());
+        }
+    }
+
+    public void setBusy(long termId) {
+        if (this.channelTermId.get() != termId
+                || this.conStatus.get() != CLIENT_STATUS_READY) {
+            return;
+        }
+        boolean changed = false;
+        long befTime;
+        int curTimeoutCnt;
+        long curTime = System.currentTimeMillis();
+        rw.readLock().lock();
+        try {
+            if (this.channelTermId.get() != termId) {
+                return;
+            }
+            befTime = this.chanFstBusyTime.get();
+            if (curTime - befTime >= tcpConfig.getSyncMsgTimeoutChkDurMs()) {
+                if (this.chanFstBusyTime.compareAndSet(befTime, curTime)) {
+                    this.chanSyncTimeoutCnt.set(0);
+                }
+            }
+            curTimeoutCnt = this.chanSyncTimeoutCnt.incrementAndGet();
+            if (tcpConfig.getMaxAllowedSyncMsgTimeoutCnt() >= 0
+                    && curTimeoutCnt < 
tcpConfig.getMaxAllowedSyncMsgTimeoutCnt()) {
+                return;
+            }
+            int curStatus = this.conStatus.get();
+            if (curStatus == CLIENT_STATUS_READY) {
+                this.conStatus.compareAndSet(curStatus, CLIENT_STATUS_BUSY);
+                this.chanInvalidTime.set(System.currentTimeMillis());
+                changed = true;
+            }
+        } finally {
+            rw.readLock().unlock();
+        }
+        if (changed) {
+            logger.warn("NettyClient({}) set {} busy!", senderId, 
hostInfo.getReferenceName());
+        }
+    }
+
+    public boolean isActive() {
+        if (this.conStatus.get() != CLIENT_STATUS_READY) {
+            return false;
+        }
+        rw.readLock().lock();
+        try {
+            return ((this.conStatus.get() == CLIENT_STATUS_READY)
+                    && channel != null && channel.isOpen() && 
channel.isActive());
+        } finally {
+            rw.readLock().unlock();
+        }
+    }
+
+    public void sendHeartBeatMsg(ProcessResult procResult) {
+        if (!isActive()) {
+            logger.warn("NettyClient({}) to {} hb inActive",
+                    this.senderId, hostInfo.getReferenceName());
+            return;
+        }
+        if (!channel.isWritable()) {
+            if (hbExptCnt.shouldPrint()) {
+                logger.warn("NettyClient({}) to {} hb write_over_water", 
this.senderId, channelStr);
+            }
+            return;
+        }
+        EncodeObject encodeObject = buildHeartBeatMsg(this.senderId, 
tcpConfig);
+        if (encodeObject == null) {
+            if (hbExptCnt.shouldPrint()) {
+                logger.warn("NettyClient({}) to {} hb failure:{}!",
+                        this.senderId, channelStr, procResult.getErrMsg());
+            }
+        }
+        try {
+            write(channelTermId.get(), encodeObject, procResult);
+        } catch (Throwable ex) {
+            if (hbExptCnt.shouldPrint()) {
+                logger.warn("NettyClient({}) send to {} hb exception ",
+                        this.senderId, channelStr, ex);
+            }
+        }
+        procResult.setSuccess();
+    }
+
+    public boolean isIdleClient(long curTime) {
+        int curSentCnt = this.msgSentCnt.get();
+        if (curSentCnt != this.lstRoundSentCnt) {
+            this.lstRoundSentCnt = curSentCnt;
+            this.clientIdleRounds = 0;
+            return false;
+        }
+        if (this.clientIdleRounds++ == 0) {
+            this.fstIdleTime = curTime;
+            return false;
+        }
+        return curTime - this.fstIdleTime >= 30000L;
+    }
+
+    public String getClientAddr() {
+        return hostInfo.getReferenceName();
+    }
+
+    public Channel getChannel() {
+        return channel;
+    }
+
+    public String getChanStr() {
+        return channelStr;
+    }
+
+    public void decInFlightMsgCnt(long termId) {
+        if (this.channelTermId.get() != termId) {
+            return;
+        }
+        this.msgInflightCnt.decrementAndGet();
+    }
+
+    public int getMsgInflightCnt() {
+        return msgInflightCnt.get();
+    }
+
+    public boolean isConFailNodes() {
+        return (conFailCnt.get() >= CLIENT_FAIL_CONNECT_WAIT_CNT);
+    }
+
+    public long getChanTermId() {
+        return channelTermId.get();
+    }
+
+    public long getChanInvalidTime() {
+        return chanInvalidTime.get();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        TcpNettyClient other = (TcpNettyClient) obj;
+        if (channel == null) {
+            return other.channel == null;
+        } else {
+            return channel.equals(other.channel);
+        }
+    }
+
+    private EncodeObject buildHeartBeatMsg(String senderId, ProxyClientConfig 
configure) {
+        EncodeObject encObject = new EncodeObject(null, null,
+                MsgType.MSG_BIN_HEARTBEAT, System.currentTimeMillis());
+        encObject.setMessageIdInfo(0);
+        int intMsgType = encObject.getMsgType().getValue();
+        Map<String, String> newAttrs = new HashMap<>();
+        if (configure.isEnableReportAuthz()) {
+            intMsgType |= SdkConsts.FLAG_ALLOW_AUTH;
+            long timestamp = System.currentTimeMillis();
+            int nonce = new 
SecureRandom(String.valueOf(timestamp).getBytes()).nextInt(Integer.MAX_VALUE);
+            String signature = AuthzUtils.generateSignature(
+                    configure.getRptUserName(), timestamp, nonce, 
configure.getRptSecretKey());
+            if (StringUtils.isBlank(signature)) {
+                return null;
+            }
+            newAttrs.put("_userName", configure.getRptUserName());
+            newAttrs.put("_clientIP", ProxyUtils.getLocalIp());
+            newAttrs.put("_signature", signature);
+            newAttrs.put("_timeStamp", String.valueOf(timestamp));
+            newAttrs.put("_nonce", String.valueOf(nonce));
+        }
+        encObject.setAttrInfo(intMsgType, false, null, newAttrs);
+        return encObject;
+    }
+
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
new file mode 100644
index 0000000000..669f8330de
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp.codec;
+
+import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+
+import com.google.common.base.Splitter;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Decode Object class
+ *
+ * Used to carry the decoded information of the response
+ */
+public class DecodeObject {
+
+    private static final Splitter.MapSplitter MAP_SPLITTER =
+            Splitter.on(AttributeConstants.SEPARATOR).trimResults()
+                    
.withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
+
+    private final MsgType msgType;
+    private int messageId;
+    private String dpIp;
+    private ProcessResult procResult;
+    private String addErrMsg;
+    private Map<String, String> retAttr;
+
+    public DecodeObject(MsgType msgType, String attributes) {
+        this.msgType = msgType;
+        handleAttr(attributes);
+    }
+
+    public DecodeObject(MsgType msgType, int messageId, String attributes) {
+        this.msgType = msgType;
+        this.messageId = messageId;
+        handleAttr(attributes);
+    }
+
+    public MsgType getMsgType() {
+        return msgType;
+    }
+
+    public int getMessageId() {
+        return messageId;
+    }
+
+    public String getDpIp() {
+        return dpIp;
+    }
+
+    public ProcessResult getSendResult() {
+        return procResult;
+    }
+
+    public String getAddErrMsg() {
+        return addErrMsg;
+    }
+
+    public Map<String, String> getRetAttr() {
+        return retAttr;
+    }
+
+    private void handleAttr(String attributes) {
+        if (StringUtils.isBlank(attributes)) {
+            return;
+        }
+        retAttr = new HashMap<>(MAP_SPLITTER.split(attributes));
+        if (retAttr.containsKey(AttributeConstants.MESSAGE_ID)) {
+            this.messageId = 
Integer.parseInt(retAttr.get(AttributeConstants.MESSAGE_ID));
+        }
+        dpIp = retAttr.get(AttributeConstants.MESSAGE_DP_IP);
+
+        String errCode = 
retAttr.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE);
+        // errCode is empty or equals 0 -> success
+        if (StringUtils.isBlank(errCode) || Integer.parseInt(errCode) == 0) {
+            this.procResult = new ProcessResult(ErrorCode.OK);
+        } else {
+            // get errMsg
+            this.addErrMsg = 
retAttr.get(AttributeConstants.MESSAGE_PROCESS_ERRMSG);
+            if (StringUtils.isBlank(addErrMsg)) {
+                this.addErrMsg = 
DataProxyErrCode.valueOf(Integer.parseInt(errCode)).getErrMsg();
+            }
+            // sendResult
+            this.procResult = convertToSendResult(Integer.parseInt(errCode));
+        }
+    }
+
+    private ProcessResult convertToSendResult(int errCode) {
+        DataProxyErrCode dpErrCode = DataProxyErrCode.valueOf(errCode);
+        switch (dpErrCode) {
+            case SINK_SERVICE_UNREADY:
+                return new ProcessResult(ErrorCode.DP_SINK_SERVICE_UNREADY);
+
+            case MISS_REQUIRED_GROUPID_ARGUMENT:
+            case MISS_REQUIRED_STREAMID_ARGUMENT:
+            case MISS_REQUIRED_DT_ARGUMENT:
+            case UNSUPPORTED_EXTEND_FIELD_VALUE:
+                return new ProcessResult(ErrorCode.DP_INVALID_ATTRS, 
String.valueOf(dpErrCode));
+
+            case MISS_REQUIRED_BODY_ARGUMENT:
+            case EMPTY_MSG:
+                return new ProcessResult(ErrorCode.DP_EMPTY_BODY, 
String.valueOf(dpErrCode));
+
+            case BODY_EXCEED_MAX_LEN:
+                return new ProcessResult(ErrorCode.DP_BODY_EXCEED_MAX_LEN);
+
+            case UNCONFIGURED_GROUPID_OR_STREAMID:
+                return new 
ProcessResult(ErrorCode.DP_UNCONFIGURED_GROUPID_OR_STREAMID);
+
+            case PUT_EVENT_TO_CHANNEL_FAILURE:
+            case NO_AVAILABLE_PRODUCER:
+            case PRODUCER_IS_NULL:
+            case SEND_REQUEST_TO_MQ_FAILURE:
+            case MQ_RETURN_ERROR:
+            case DUPLICATED_MESSAGE:
+                return new ProcessResult(ErrorCode.DP_RECEIVE_FAILURE, 
String.valueOf(dpErrCode));
+
+            default:
+                return new ProcessResult(ErrorCode.UNKNOWN_ERROR, 
String.valueOf(dpErrCode));
+        }
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
new file mode 100644
index 0000000000..a32227c1af
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp.codec;
+
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.common.msg.MsgType;
+
+import com.google.common.base.Joiner;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Encode Object class
+ *
+ * Used to encapsulate the reported event information to be sent
+ */
+public class EncodeObject {
+
+    private static final Joiner.MapJoiner mapJoiner = 
Joiner.on(AttributeConstants.SEPARATOR)
+            .withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
+
+    private final MsgType msgType;
+    private int intMsgType;
+    private final String groupId;
+    private final String streamId;
+    private final long dtMs;
+    private final long rtms;
+    private int messageId;
+    private int msgCnt = 0;
+    private int extField = 0;
+    private int attrDataLength = 0;
+    private byte[] attrData = null;
+    private int bodyDataLength = 0;
+    private byte[] bodyData = null;
+    private int groupIdNum = 0;
+    private int streamIdNum = 0;
+    //
+    private final Map<String, String> attrMap = new HashMap<>();
+    private boolean compress;
+    private byte[] aesKey;
+
+    public EncodeObject(String groupId, String streamId, MsgType msgType, long 
dtMs) {
+        this.groupId = groupId;
+        this.streamId = streamId;
+        this.msgType = msgType;
+        this.intMsgType = this.msgType.getValue();
+        this.rtms = System.currentTimeMillis();
+        if (this.msgType == MsgType.MSG_BIN_MULTI_BODY) {
+            this.dtMs = dtMs / 1000;
+        } else {
+            this.dtMs = dtMs;
+        }
+    }
+
+    public void setGroupAndStreamId2Num(int groupIdNum, int streamIdNum) {
+        this.groupIdNum = groupIdNum;
+        this.streamIdNum = streamIdNum;
+    }
+
+    public void setExtField(int extField) {
+        this.extField = extField;
+    }
+
+    public void setAttrInfo(int intMsgType, boolean isCompress, byte[] aesKey, 
Map<String, String> tgtAttrs) {
+        this.intMsgType = intMsgType;
+        this.compress = isCompress;
+        this.aesKey = aesKey;
+        if (tgtAttrs != null && !tgtAttrs.isEmpty()) {
+            for (Map.Entry<String, String> entry : tgtAttrs.entrySet()) {
+                if (entry == null || entry.getKey() == null) {
+                    continue;
+                }
+                this.attrMap.put(entry.getKey(), entry.getValue());
+            }
+            String preAttrStr = mapJoiner.join(this.attrMap);
+            this.attrData = preAttrStr.getBytes(StandardCharsets.UTF_8);
+            this.attrDataLength = this.attrData.length;
+        }
+    }
+
+    public void setMessageIdInfo(int messageId) {
+        this.messageId = messageId;
+        if (msgType == MsgType.MSG_ACK_SERVICE
+                || msgType == MsgType.MSG_MULTI_BODY) {
+            this.attrMap.put(AttributeConstants.MESSAGE_ID, 
String.valueOf(this.messageId));
+            String preAttrStr = mapJoiner.join(this.attrMap);
+            this.attrData = preAttrStr.getBytes(StandardCharsets.UTF_8);
+            this.attrDataLength = this.attrData.length;
+        }
+    }
+
+    public void setBodyData(int msgCnt, byte[] bodyBytes) {
+        this.msgCnt = msgCnt;
+        this.bodyData = bodyBytes;
+        if (this.bodyData != null) {
+            this.bodyDataLength = this.bodyData.length;
+        }
+    }
+
+    public MsgType getMsgType() {
+        return msgType;
+    }
+
+    public int getIntMsgType() {
+        return intMsgType;
+    }
+
+    public int getMessageId() {
+        return messageId;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+
+    public long getRtms() {
+        return rtms;
+    }
+
+    public int getStreamIdNum() {
+        return streamIdNum;
+    }
+
+    public int getGroupIdNum() {
+        return groupIdNum;
+    }
+
+    public boolean isCompress() {
+        return compress;
+    }
+
+    public byte[] getAesKey() {
+        return aesKey;
+    }
+
+    public int getBodyDataLength() {
+        return bodyDataLength;
+    }
+
+    public byte[] getBodyData() {
+        return bodyData;
+    }
+
+    public int getAttrDataLength() {
+        return attrDataLength;
+    }
+
+    public byte[] getAttrData() {
+        return attrData;
+    }
+
+    public int getExtField() {
+        return extField;
+    }
+
+    public int getMsgSize() {
+        return attrDataLength + bodyDataLength;
+    }
+
+    public int getMsgCnt() {
+        return msgCnt;
+    }
+
+    public long getDtMs() {
+        return dtMs;
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolDecoder.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolDecoder.java
new file mode 100644
index 0000000000..2132c72747
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolDecoder.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp.codec;
+
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * TCP protocol decoder class
+ *
+ * Used to decode the response package returned from DataProxy
+ */
+public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ProtocolDecoder.class);
+    private static final LogCounter decExptCounter = new LogCounter(10, 
200000, 60 * 1000L);
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx,
+            ByteBuf buffer, List<Object> out) throws Exception {
+        buffer.markReaderIndex();
+        // totallen
+        int totalLen = buffer.readInt();
+        if (totalLen != buffer.readableBytes()) {
+            if (decExptCounter.shouldPrint()) {
+                logger.error("Length not equal, 
totalLen={},readableBytes={},from={}",
+                        totalLen, buffer.readableBytes(), ctx.channel());
+            }
+            buffer.resetReaderIndex();
+            throw new Exception("totalLen is not equal readableBytes.total");
+        }
+        // msgtype
+        int msgType = buffer.readByte() & 0x1f;
+
+        if (msgType == 4) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("debug decode");
+            }
+        } else if (msgType == 3 | msgType == 5) {
+            // bodylen
+            int bodyLength = buffer.readInt();
+            if (bodyLength >= totalLen) {
+                if (decExptCounter.shouldPrint()) {
+                    logger.error("bodyLen greater than totalLen, 
totalLen={},bodyLen={},from={}",
+                            totalLen, bodyLength, ctx.channel());
+                }
+                buffer.resetReaderIndex();
+                throw new Exception("bodyLen is greater than 
totalLen.totalLen");
+            }
+            byte[] bodyBytes;
+            if (bodyLength > 0) {
+                bodyBytes = new byte[bodyLength];
+                buffer.readBytes(bodyBytes);
+            }
+            // attrlen
+            String attrInfo = "";
+            int attrLength = buffer.readInt();
+            if (attrLength > 0) {
+                byte[] attrBytes = new byte[attrLength];
+                buffer.readBytes(attrBytes);
+                attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
+            }
+            out.add(new DecodeObject(MsgType.valueOf(msgType), attrInfo));
+        } else if (msgType == 7) {
+            int seqId = buffer.readInt();
+            int attrLen = buffer.readShort();
+            String attrInfo = "";
+            if (attrLen > 0) {
+                byte[] attrBytes = new byte[attrLen];
+                buffer.readBytes(attrBytes);
+                attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
+            }
+            buffer.readShort();
+            out.add(new DecodeObject(MsgType.valueOf(msgType), seqId, 
attrInfo));
+        } else if (msgType == 8) {
+            // dataTime(4) + body_ver(1) + body_len(4) + body + attr_len(2) + 
attr + magic(2)
+            buffer.skipBytes(4 + 1 + 4); // skip datatime, body_ver and 
body_len
+            final short load = buffer.readShort(); // read from body
+            int attrLen = buffer.readShort();
+            String attrInfo = "";
+            if (attrLen > 0) {
+                byte[] attrBytes = new byte[attrLen];
+                buffer.readBytes(attrBytes);
+                attrInfo = new String(attrBytes, StandardCharsets.UTF_8);
+            }
+            buffer.skipBytes(2); // skip magic
+            out.add(new DecodeObject(MsgType.MSG_BIN_HEARTBEAT, attrInfo));
+        }
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolEncoder.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolEncoder.java
new file mode 100644
index 0000000000..d6cafdb85b
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolEncoder.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.tcp.codec;
+
+import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * TCP protocol encoder class
+ *
+ * Used to encode the request package sent to DataProxy
+ */
+public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ProtocolEncoder.class);
+    private static final LogCounter exptCounter = new LogCounter(10, 100000, 
60 * 1000L);
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx,
+            EncodeObject encObject, List<Object> out) throws Exception {
+        ByteBuf buf = null;
+        int totalLength;
+        try {
+            if (encObject.getMsgType() == MsgType.MSG_ACK_SERVICE) {
+                totalLength = 1 + 4 + 4 + encObject.getMsgSize();
+                buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
+                buf.writeInt(totalLength);
+                buf.writeByte(encObject.getIntMsgType());
+                buf.writeInt(encObject.getBodyDataLength());
+                if (encObject.getBodyDataLength() > 0) {
+                    buf.writeBytes(encObject.getBodyData());
+                }
+                buf.writeInt(encObject.getAttrDataLength());
+                if (encObject.getAttrDataLength() > 0) {
+                    buf.writeBytes(encObject.getAttrData());
+                }
+            } else if (encObject.getMsgType() == MsgType.MSG_MULTI_BODY) {
+                totalLength = 1 + 4 + 4 + encObject.getMsgSize();
+                buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
+                buf.writeInt(totalLength);
+                buf.writeByte(encObject.getIntMsgType());
+                buf.writeInt(encObject.getBodyDataLength());
+                if (encObject.getBodyDataLength() > 0) {
+                    buf.writeBytes(encObject.getBodyData());
+                }
+                buf.writeInt(encObject.getAttrDataLength());
+                if (encObject.getAttrDataLength() > 0) {
+                    buf.writeBytes(encObject.getAttrData());
+                }
+            } else if (encObject.getMsgType() == MsgType.MSG_BIN_MULTI_BODY) {
+                totalLength = 1 + 2 + 2 + 2 + 4 + 2 + 4 + 4 + 2 + 2 + 
encObject.getMsgSize();
+                buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
+                buf.writeInt(totalLength);
+                buf.writeByte(encObject.getIntMsgType());
+                buf.writeShort(encObject.getGroupIdNum());
+                buf.writeShort(encObject.getStreamIdNum());
+                buf.writeShort(encObject.getExtField());
+                buf.writeInt((int) encObject.getDtMs());
+                buf.writeShort(encObject.getMsgCnt());
+                buf.writeInt(encObject.getMessageId());
+                buf.writeInt(encObject.getBodyDataLength());
+                if (encObject.getBodyDataLength() > 0) {
+                    buf.writeBytes(encObject.getBodyData());
+                }
+                buf.writeShort(encObject.getAttrDataLength());
+                if (encObject.getAttrDataLength() > 0) {
+                    buf.writeBytes(encObject.getAttrData());
+                }
+                buf.writeShort(0xee01);
+            } else if (encObject.getMsgType() == MsgType.MSG_BIN_HEARTBEAT) {
+                totalLength = 1 + 4 + 1 + 4 + 2 + 
encObject.getAttrDataLength() + 2;
+                buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
+                buf.writeInt(totalLength);
+                buf.writeByte(encObject.getIntMsgType());
+                buf.writeInt((int) encObject.getDtMs());
+                buf.writeByte(2);
+                buf.writeInt(0);
+                buf.writeShort(encObject.getAttrDataLength());
+                if (encObject.getAttrDataLength() > 0) {
+                    buf.writeBytes(encObject.getAttrData());
+                }
+                buf.writeShort(0xee01);
+            }
+        } catch (Throwable ex) {
+            if (exptCounter.shouldPrint()) {
+                logger.warn("ProtocolEncoder encode({}) message failure", 
encObject.getMsgType(), ex);
+            }
+        }
+        if (buf != null) {
+            out.add(buf);
+        } else {
+            if (exptCounter.shouldPrint()) {
+                logger.warn("ProtocolEncoder write({}) buffer is null!", 
encObject.getMsgType());
+            }
+        }
+    }
+}

Reply via email to