This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new bfbeae27e1 [INLONG-11698][SDK] Optimize TCP encode and decode implementation (#11699) bfbeae27e1 is described below commit bfbeae27e125edd8b5849ffbf4cfa415193be5a3 Author: Goson Zhang <4675...@qq.com> AuthorDate: Tue Jan 21 16:39:09 2025 +0800 [INLONG-11698][SDK] Optimize TCP encode and decode implementation (#11699) * [INLONG-11698][SDK] Optimize TCP encode and decode implementation * [INLONG-11698][SDK] Optimize TCP encode and decode implementation --------- Co-authored-by: gosonzhang <gosonzh...@tencent.com> --- .../inlong/sdk/dataproxy/common/ErrorCode.java | 19 + .../sdk/dataproxy/network/tcp/TcpCallFuture.java | 135 ++++++ .../sdk/dataproxy/network/tcp/TcpNettyClient.java | 480 +++++++++++++++++++++ .../dataproxy/network/tcp/codec/DecodeObject.java | 144 +++++++ .../dataproxy/network/tcp/codec/EncodeObject.java | 188 ++++++++ .../network/tcp/codec/ProtocolDecoder.java | 114 +++++ .../network/tcp/codec/ProtocolEncoder.java | 121 ++++++ 7 files changed, 1201 insertions(+) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java index 5f4cd0cdc8..d2cb671a58 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java @@ -62,6 +62,25 @@ public enum ErrorCode { META_FIELD_VALUE_ILLEGAL(54, "Meta field value illegal"), // + CONNECTION_UNAVAILABLE(111, "Connection unavailable"), + CONNECTION_BREAK(112, "Connection break"), + CONNECTION_UNWRITABLE(113, "Connection unwritable"), + CONNECTION_WRITE_EXCEPTION(114, "Connection write exception"), + DUPLICATED_MESSAGE_ID(115, "Duplicated message id"), + SEND_WAIT_INTERRUPT(116, "Send wait interrupted"), + // + SEND_WAIT_TIMEOUT(121, "Send wait timeout"), + SEND_ON_EXCEPTION(122, "Send on exception"), + + // dataproxy return failure + DP_SINK_SERVICE_UNREADY(151, "DataProxy sink service unready"), + DP_INVALID_ATTRS(152, "DataProxy return invalid attributes"), + DP_EMPTY_BODY(153, "DataProxy return empty body"), + DP_BODY_EXCEED_MAX_LEN(154, "DataProxy return body length over max"), + DP_UNCONFIGURED_GROUPID_OR_STREAMID(155, "DataProxy return unconfigured groupId or streamId"), + // + DP_RECEIVE_FAILURE(160, "DataProxy return message receive failure"), + UNKNOWN_ERROR(9999, "Unknown error"); public static ErrorCode valueOf(int value) { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpCallFuture.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpCallFuture.java new file mode 100644 index 0000000000..90132018a7 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpCallFuture.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.network.tcp; + +import org.apache.inlong.sdk.dataproxy.common.ErrorCode; +import org.apache.inlong.sdk.dataproxy.common.ProcessResult; +import org.apache.inlong.sdk.dataproxy.network.tcp.codec.EncodeObject; +import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * TCP Call Future class + * + * a future implementation for tcp RPCs. + */ +public class TcpCallFuture implements MsgSendCallback { + + private final int messageId; + private final String groupId; + private final String streamId; + private final int msgCnt; + private final long rtTime; + private final String clientAddr; + private final long chanTerm; + private final String chanStr; + private final MsgSendCallback callback; + private final CountDownLatch latch = new CountDownLatch(1); + private final boolean isAsyncCall; + private ProcessResult result = null; + private Throwable error = null; + + public TcpCallFuture(EncodeObject encObject, + String clientAddr, long chanTerm, String chanStr, MsgSendCallback callback) { + this.messageId = encObject.getMessageId(); + this.groupId = encObject.getGroupId(); + this.streamId = encObject.getStreamId(); + this.rtTime = encObject.getRtms(); + this.msgCnt = encObject.getMsgCnt(); + this.clientAddr = clientAddr; + this.chanTerm = chanTerm; + this.chanStr = chanStr; + this.callback = callback; + this.isAsyncCall = (callback != null); + } + + @Override + public void onMessageAck(ProcessResult result) { + this.result = result; + latch.countDown(); + if (isAsyncCall) { + callback.onMessageAck(result); + } + } + + @Override + public void onException(Throwable ex) { + this.error = ex; + latch.countDown(); + if (isAsyncCall) { + callback.onException(error); + } + } + + public boolean get(ProcessResult processResult, long timeout, TimeUnit unit) { + try { + if (latch.await(timeout, unit)) { + if (error != null) { + return processResult.setFailResult(ErrorCode.SEND_ON_EXCEPTION, error.getMessage()); + } + return processResult.setFailResult(result); + } else { + return processResult.setFailResult(ErrorCode.SEND_WAIT_TIMEOUT); + } + } catch (Throwable ex) { + if (ex instanceof InterruptedException) { + return processResult.setFailResult(ErrorCode.SEND_WAIT_INTERRUPT); + } else { + return processResult.setFailResult(ErrorCode.UNKNOWN_ERROR, ex.getMessage()); + } + } + } + + public int getMessageId() { + return messageId; + } + + public String getGroupId() { + return groupId; + } + + public String getStreamId() { + return streamId; + } + + public int getMsgCnt() { + return msgCnt; + } + + public long getRtTime() { + return rtTime; + } + + public String getClientAddr() { + return clientAddr; + } + + public String getChanStr() { + return chanStr; + } + + public long getChanTerm() { + return chanTerm; + } + + public boolean isAsyncCall() { + return isAsyncCall; + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java new file mode 100644 index 0000000000..29899e6f58 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.network.tcp; + +import org.apache.inlong.common.msg.MsgType; +import org.apache.inlong.sdk.dataproxy.common.ErrorCode; +import org.apache.inlong.sdk.dataproxy.common.ProcessResult; +import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig; +import org.apache.inlong.sdk.dataproxy.common.SdkConsts; +import org.apache.inlong.sdk.dataproxy.config.HostInfo; +import org.apache.inlong.sdk.dataproxy.network.tcp.codec.EncodeObject; +import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; +import org.apache.inlong.sdk.dataproxy.utils.AuthzUtils; +import org.apache.inlong.sdk.dataproxy.utils.LogCounter; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.security.SecureRandom; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * TCP Netty client class + * + * Used to manage TCP netty client, including connection, closing, keep-alive, sending status check, etc. + */ +public class TcpNettyClient { + + private final static int CLIENT_FAIL_CONNECT_WAIT_CNT = 3; + private static final Logger logger = LoggerFactory.getLogger(TcpNettyClient.class); + private static final LogCounter conExptCnt = new LogCounter(10, 100000, 60 * 1000L); + private static final LogCounter hbExptCnt = new LogCounter(10, 100000, 60 * 1000L); + private final static int CLIENT_STATUS_INIT = -1; + private final static int CLIENT_STATUS_READY = 0; + private final static int CLIENT_STATUS_FROZEN = 1; + private final static int CLIENT_STATUS_DEAD = 2; + private final static int CLIENT_STATUS_BUSY = 3; + + private final String senderId; + private final TcpMsgSenderConfig tcpConfig; + private final Bootstrap bootstrap; + private final HostInfo hostInfo; + private final AtomicInteger conStatus = new AtomicInteger(CLIENT_STATUS_INIT); + private final AtomicLong channelTermId = new AtomicLong(0); + private final AtomicInteger clientUsingCnt = new AtomicInteger(0); + private final AtomicInteger msgSentCnt = new AtomicInteger(0); + private final AtomicInteger msgInflightCnt = new AtomicInteger(0); + private final AtomicLong chanInvalidTime = new AtomicLong(0); + private final AtomicInteger conFailCnt = new AtomicInteger(0); + private final AtomicLong lstConFailTime = new AtomicLong(0); + private final AtomicInteger chanSyncTimeoutCnt = new AtomicInteger(0); + private final AtomicLong chanFstBusyTime = new AtomicLong(0); + private final ReentrantReadWriteLock rw = new ReentrantReadWriteLock(); + private Channel channel = null; + private String channelStr = ""; + private int lstRoundSentCnt = -1; + private int clientIdleRounds = 0; + private long fstIdleTime = 0; + + public TcpNettyClient(String senderId, + Bootstrap bootstrap, HostInfo hostInfo, TcpMsgSenderConfig tcpConfig) { + this.conStatus.set(CLIENT_STATUS_INIT); + this.hostInfo = hostInfo; + this.tcpConfig = tcpConfig; + this.senderId = senderId; + this.bootstrap = bootstrap; + } + + public boolean connect(boolean needPrint, long termId) { + // Initial status + this.conStatus.set(CLIENT_STATUS_INIT); + long curTime = System.currentTimeMillis(); + final CountDownLatch awaitLatch = new CountDownLatch(1); + // Build connect to server + ChannelFuture future = bootstrap.connect( + new InetSocketAddress(hostInfo.getHostName(), hostInfo.getPortNumber())); + future.addListener(new ChannelFutureListener() { + + public void operationComplete(ChannelFuture arg0) throws Exception { + awaitLatch.countDown(); + } + }); + try { + // Wait until the connection is built. + awaitLatch.await(tcpConfig.getConnectTimeoutMs(), TimeUnit.MILLISECONDS); + } catch (Throwable ex) { + if (conExptCnt.shouldPrint()) { + logger.warn("NettyClient({}) connect to {} exception", + senderId, hostInfo.getReferenceName(), ex); + } + return false; + } + // Return if no connection is built. + if (!future.isSuccess()) { + this.conFailCnt.getAndIncrement(); + this.lstConFailTime.set(System.currentTimeMillis()); + if (conExptCnt.shouldPrint()) { + logger.warn("NettyClient({}) connect to {} failure, wast {}ms", + senderId, hostInfo.getReferenceName(), (System.currentTimeMillis() - curTime)); + } + return false; + } + this.channelTermId.set(termId); + this.channel = future.channel(); + this.channelStr = this.channel.toString(); + this.conFailCnt.set(0); + this.msgSentCnt.set(0); + this.chanSyncTimeoutCnt.set(0); + this.msgInflightCnt.set(0); + this.conStatus.set(CLIENT_STATUS_READY); + if (needPrint) { + logger.info("NettyClient({}) connect to {} success, wast {}ms", + senderId, channel.toString(), (System.currentTimeMillis() - curTime)); + } + return true; + } + + public boolean close(boolean needPrint) { + this.conStatus.set(CLIENT_STATUS_DEAD); + long curTime = System.currentTimeMillis(); + this.chanInvalidTime.set(curTime); + final CountDownLatch awaitLatch = new CountDownLatch(1); + boolean ret = true; + String channelStr = ""; + try { + if (channel == null) { + channelStr = hostInfo.getReferenceName(); + } else { + channelStr = channel.toString(); + ChannelFuture future = channel.close(); + future.addListener(new ChannelFutureListener() { + + public void operationComplete(ChannelFuture arg0) throws Exception { + awaitLatch.countDown(); + } + }); + // Wait until the connection is close. + awaitLatch.await(tcpConfig.getConCloseWaitPeriodMs(), TimeUnit.MILLISECONDS); + // Return if close this connection fail. + if (!future.isSuccess()) { + ret = false; + } + } + } catch (Throwable ex) { + if (conExptCnt.shouldPrint()) { + logger.warn("NettyClient({}) close {} exception", senderId, channelStr, ex); + } + ret = false; + } finally { + this.channel = null; + this.channelStr = ""; + this.msgInflightCnt.set(0); + } + if (needPrint) { + if (ret) { + logger.info("NettyClient({}) close {} success, wast {}ms", + this.senderId, channelStr, (System.currentTimeMillis() - curTime)); + } else { + logger.info("NettyClient({}) close {} failure, wast {}ms", + this.senderId, channelStr, (System.currentTimeMillis() - curTime)); + } + } + return ret; + } + + public boolean reconnect(boolean needPrint, long termId) { + long curTime = System.currentTimeMillis(); + if ((this.conFailCnt.get() >= CLIENT_FAIL_CONNECT_WAIT_CNT) + && (curTime - lstConFailTime.get() < this.tcpConfig.getReconFailWaitMs())) { + return false; + } + int curStatus = this.conStatus.get(); + if (curStatus == CLIENT_STATUS_READY) { + return true; + } else if (curStatus == CLIENT_STATUS_BUSY) { + if (curTime - this.chanInvalidTime.get() < this.tcpConfig.getBusyReconnectWaitMs()) { + return false; + } + } else if (curStatus == CLIENT_STATUS_FROZEN) { + if (curTime - this.chanInvalidTime.get() < this.tcpConfig.getFrozenReconnectWaitMs()) { + return false; + } + } + rw.writeLock().lock(); + try { + if (this.conStatus.get() == CLIENT_STATUS_READY) { + return true; + } + curTime = System.currentTimeMillis(); + this.close(false); + if (this.connect(false, termId)) { + if (needPrint) { + logger.info("NettyClient({}) re-connect to {} success, wast {}ms", + senderId, this.channel.toString(), System.currentTimeMillis() - curTime); + } + return true; + } else { + if (needPrint) { + logger.info("NettyClient({}) re-connect to {} failure", + senderId, hostInfo.getReferenceName()); + } + return false; + } + } finally { + rw.writeLock().unlock(); + } + } + + public int incClientUsingCnt() { + return clientUsingCnt.incrementAndGet(); + } + + public int getClientUsingCnt() { + return clientUsingCnt.get(); + } + + public int decClientUsingCnt() { + return clientUsingCnt.decrementAndGet(); + } + + public boolean write(long termId, EncodeObject encodeObject, ProcessResult procResult) { + if (this.conStatus.get() != CLIENT_STATUS_READY) { + return procResult.setFailResult(ErrorCode.CONNECTION_UNAVAILABLE); + } + this.rw.readLock().lock(); + if (this.conStatus.get() != CLIENT_STATUS_READY) { + return procResult.setFailResult(ErrorCode.CONNECTION_UNAVAILABLE); + } + if (this.channelTermId.get() != termId) { + return procResult.setFailResult(ErrorCode.CONNECTION_BREAK); + } + if (!(this.channel.isOpen() + && this.channel.isActive() && this.channel.isWritable())) { + return procResult.setFailResult(ErrorCode.CONNECTION_UNWRITABLE); + } + try { + this.msgSentCnt.incrementAndGet(); + this.channel.writeAndFlush(encodeObject); + this.msgInflightCnt.incrementAndGet(); + } catch (Throwable ex) { + if (conExptCnt.shouldPrint()) { + logger.warn("NettyClient({}) write {} exception", + this.senderId, this.channel.toString(), ex); + } + return procResult.setFailResult(ErrorCode.CONNECTION_WRITE_EXCEPTION, ex.getMessage()); + } finally { + this.rw.readLock().unlock(); + } + return procResult.setSuccess(); + } + + public void setFrozen(long termId) { + if (this.channelTermId.get() != termId + || this.conStatus.get() != CLIENT_STATUS_READY) { + return; + } + boolean changed = false; + rw.readLock().lock(); + try { + if (this.channelTermId.get() != termId) { + return; + } + int curStatus = this.conStatus.get(); + if (curStatus == CLIENT_STATUS_READY) { + this.conStatus.compareAndSet(curStatus, CLIENT_STATUS_FROZEN); + this.chanInvalidTime.set(System.currentTimeMillis()); + changed = true; + } + } finally { + rw.readLock().unlock(); + } + if (changed) { + logger.warn("NettyClient({}) set {} frozen!", senderId, hostInfo.getReferenceName()); + } + } + + public void setBusy(long termId) { + if (this.channelTermId.get() != termId + || this.conStatus.get() != CLIENT_STATUS_READY) { + return; + } + boolean changed = false; + long befTime; + int curTimeoutCnt; + long curTime = System.currentTimeMillis(); + rw.readLock().lock(); + try { + if (this.channelTermId.get() != termId) { + return; + } + befTime = this.chanFstBusyTime.get(); + if (curTime - befTime >= tcpConfig.getSyncMsgTimeoutChkDurMs()) { + if (this.chanFstBusyTime.compareAndSet(befTime, curTime)) { + this.chanSyncTimeoutCnt.set(0); + } + } + curTimeoutCnt = this.chanSyncTimeoutCnt.incrementAndGet(); + if (tcpConfig.getMaxAllowedSyncMsgTimeoutCnt() >= 0 + && curTimeoutCnt < tcpConfig.getMaxAllowedSyncMsgTimeoutCnt()) { + return; + } + int curStatus = this.conStatus.get(); + if (curStatus == CLIENT_STATUS_READY) { + this.conStatus.compareAndSet(curStatus, CLIENT_STATUS_BUSY); + this.chanInvalidTime.set(System.currentTimeMillis()); + changed = true; + } + } finally { + rw.readLock().unlock(); + } + if (changed) { + logger.warn("NettyClient({}) set {} busy!", senderId, hostInfo.getReferenceName()); + } + } + + public boolean isActive() { + if (this.conStatus.get() != CLIENT_STATUS_READY) { + return false; + } + rw.readLock().lock(); + try { + return ((this.conStatus.get() == CLIENT_STATUS_READY) + && channel != null && channel.isOpen() && channel.isActive()); + } finally { + rw.readLock().unlock(); + } + } + + public void sendHeartBeatMsg(ProcessResult procResult) { + if (!isActive()) { + logger.warn("NettyClient({}) to {} hb inActive", + this.senderId, hostInfo.getReferenceName()); + return; + } + if (!channel.isWritable()) { + if (hbExptCnt.shouldPrint()) { + logger.warn("NettyClient({}) to {} hb write_over_water", this.senderId, channelStr); + } + return; + } + EncodeObject encodeObject = buildHeartBeatMsg(this.senderId, tcpConfig); + if (encodeObject == null) { + if (hbExptCnt.shouldPrint()) { + logger.warn("NettyClient({}) to {} hb failure:{}!", + this.senderId, channelStr, procResult.getErrMsg()); + } + } + try { + write(channelTermId.get(), encodeObject, procResult); + } catch (Throwable ex) { + if (hbExptCnt.shouldPrint()) { + logger.warn("NettyClient({}) send to {} hb exception ", + this.senderId, channelStr, ex); + } + } + procResult.setSuccess(); + } + + public boolean isIdleClient(long curTime) { + int curSentCnt = this.msgSentCnt.get(); + if (curSentCnt != this.lstRoundSentCnt) { + this.lstRoundSentCnt = curSentCnt; + this.clientIdleRounds = 0; + return false; + } + if (this.clientIdleRounds++ == 0) { + this.fstIdleTime = curTime; + return false; + } + return curTime - this.fstIdleTime >= 30000L; + } + + public String getClientAddr() { + return hostInfo.getReferenceName(); + } + + public Channel getChannel() { + return channel; + } + + public String getChanStr() { + return channelStr; + } + + public void decInFlightMsgCnt(long termId) { + if (this.channelTermId.get() != termId) { + return; + } + this.msgInflightCnt.decrementAndGet(); + } + + public int getMsgInflightCnt() { + return msgInflightCnt.get(); + } + + public boolean isConFailNodes() { + return (conFailCnt.get() >= CLIENT_FAIL_CONNECT_WAIT_CNT); + } + + public long getChanTermId() { + return channelTermId.get(); + } + + public long getChanInvalidTime() { + return chanInvalidTime.get(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + TcpNettyClient other = (TcpNettyClient) obj; + if (channel == null) { + return other.channel == null; + } else { + return channel.equals(other.channel); + } + } + + private EncodeObject buildHeartBeatMsg(String senderId, ProxyClientConfig configure) { + EncodeObject encObject = new EncodeObject(null, null, + MsgType.MSG_BIN_HEARTBEAT, System.currentTimeMillis()); + encObject.setMessageIdInfo(0); + int intMsgType = encObject.getMsgType().getValue(); + Map<String, String> newAttrs = new HashMap<>(); + if (configure.isEnableReportAuthz()) { + intMsgType |= SdkConsts.FLAG_ALLOW_AUTH; + long timestamp = System.currentTimeMillis(); + int nonce = new SecureRandom(String.valueOf(timestamp).getBytes()).nextInt(Integer.MAX_VALUE); + String signature = AuthzUtils.generateSignature( + configure.getRptUserName(), timestamp, nonce, configure.getRptSecretKey()); + if (StringUtils.isBlank(signature)) { + return null; + } + newAttrs.put("_userName", configure.getRptUserName()); + newAttrs.put("_clientIP", ProxyUtils.getLocalIp()); + newAttrs.put("_signature", signature); + newAttrs.put("_timeStamp", String.valueOf(timestamp)); + newAttrs.put("_nonce", String.valueOf(nonce)); + } + encObject.setAttrInfo(intMsgType, false, null, newAttrs); + return encObject; + } + +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java new file mode 100644 index 0000000000..669f8330de --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.network.tcp.codec; + +import org.apache.inlong.common.enums.DataProxyErrCode; +import org.apache.inlong.common.msg.AttributeConstants; +import org.apache.inlong.common.msg.MsgType; +import org.apache.inlong.sdk.dataproxy.common.ErrorCode; +import org.apache.inlong.sdk.dataproxy.common.ProcessResult; + +import com.google.common.base.Splitter; +import org.apache.commons.lang3.StringUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * Decode Object class + * + * Used to carry the decoded information of the response + */ +public class DecodeObject { + + private static final Splitter.MapSplitter MAP_SPLITTER = + Splitter.on(AttributeConstants.SEPARATOR).trimResults() + .withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR); + + private final MsgType msgType; + private int messageId; + private String dpIp; + private ProcessResult procResult; + private String addErrMsg; + private Map<String, String> retAttr; + + public DecodeObject(MsgType msgType, String attributes) { + this.msgType = msgType; + handleAttr(attributes); + } + + public DecodeObject(MsgType msgType, int messageId, String attributes) { + this.msgType = msgType; + this.messageId = messageId; + handleAttr(attributes); + } + + public MsgType getMsgType() { + return msgType; + } + + public int getMessageId() { + return messageId; + } + + public String getDpIp() { + return dpIp; + } + + public ProcessResult getSendResult() { + return procResult; + } + + public String getAddErrMsg() { + return addErrMsg; + } + + public Map<String, String> getRetAttr() { + return retAttr; + } + + private void handleAttr(String attributes) { + if (StringUtils.isBlank(attributes)) { + return; + } + retAttr = new HashMap<>(MAP_SPLITTER.split(attributes)); + if (retAttr.containsKey(AttributeConstants.MESSAGE_ID)) { + this.messageId = Integer.parseInt(retAttr.get(AttributeConstants.MESSAGE_ID)); + } + dpIp = retAttr.get(AttributeConstants.MESSAGE_DP_IP); + + String errCode = retAttr.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE); + // errCode is empty or equals 0 -> success + if (StringUtils.isBlank(errCode) || Integer.parseInt(errCode) == 0) { + this.procResult = new ProcessResult(ErrorCode.OK); + } else { + // get errMsg + this.addErrMsg = retAttr.get(AttributeConstants.MESSAGE_PROCESS_ERRMSG); + if (StringUtils.isBlank(addErrMsg)) { + this.addErrMsg = DataProxyErrCode.valueOf(Integer.parseInt(errCode)).getErrMsg(); + } + // sendResult + this.procResult = convertToSendResult(Integer.parseInt(errCode)); + } + } + + private ProcessResult convertToSendResult(int errCode) { + DataProxyErrCode dpErrCode = DataProxyErrCode.valueOf(errCode); + switch (dpErrCode) { + case SINK_SERVICE_UNREADY: + return new ProcessResult(ErrorCode.DP_SINK_SERVICE_UNREADY); + + case MISS_REQUIRED_GROUPID_ARGUMENT: + case MISS_REQUIRED_STREAMID_ARGUMENT: + case MISS_REQUIRED_DT_ARGUMENT: + case UNSUPPORTED_EXTEND_FIELD_VALUE: + return new ProcessResult(ErrorCode.DP_INVALID_ATTRS, String.valueOf(dpErrCode)); + + case MISS_REQUIRED_BODY_ARGUMENT: + case EMPTY_MSG: + return new ProcessResult(ErrorCode.DP_EMPTY_BODY, String.valueOf(dpErrCode)); + + case BODY_EXCEED_MAX_LEN: + return new ProcessResult(ErrorCode.DP_BODY_EXCEED_MAX_LEN); + + case UNCONFIGURED_GROUPID_OR_STREAMID: + return new ProcessResult(ErrorCode.DP_UNCONFIGURED_GROUPID_OR_STREAMID); + + case PUT_EVENT_TO_CHANNEL_FAILURE: + case NO_AVAILABLE_PRODUCER: + case PRODUCER_IS_NULL: + case SEND_REQUEST_TO_MQ_FAILURE: + case MQ_RETURN_ERROR: + case DUPLICATED_MESSAGE: + return new ProcessResult(ErrorCode.DP_RECEIVE_FAILURE, String.valueOf(dpErrCode)); + + default: + return new ProcessResult(ErrorCode.UNKNOWN_ERROR, String.valueOf(dpErrCode)); + } + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java new file mode 100644 index 0000000000..a32227c1af --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.network.tcp.codec; + +import org.apache.inlong.common.msg.AttributeConstants; +import org.apache.inlong.common.msg.MsgType; + +import com.google.common.base.Joiner; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +/** + * Encode Object class + * + * Used to encapsulate the reported event information to be sent + */ +public class EncodeObject { + + private static final Joiner.MapJoiner mapJoiner = Joiner.on(AttributeConstants.SEPARATOR) + .withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR); + + private final MsgType msgType; + private int intMsgType; + private final String groupId; + private final String streamId; + private final long dtMs; + private final long rtms; + private int messageId; + private int msgCnt = 0; + private int extField = 0; + private int attrDataLength = 0; + private byte[] attrData = null; + private int bodyDataLength = 0; + private byte[] bodyData = null; + private int groupIdNum = 0; + private int streamIdNum = 0; + // + private final Map<String, String> attrMap = new HashMap<>(); + private boolean compress; + private byte[] aesKey; + + public EncodeObject(String groupId, String streamId, MsgType msgType, long dtMs) { + this.groupId = groupId; + this.streamId = streamId; + this.msgType = msgType; + this.intMsgType = this.msgType.getValue(); + this.rtms = System.currentTimeMillis(); + if (this.msgType == MsgType.MSG_BIN_MULTI_BODY) { + this.dtMs = dtMs / 1000; + } else { + this.dtMs = dtMs; + } + } + + public void setGroupAndStreamId2Num(int groupIdNum, int streamIdNum) { + this.groupIdNum = groupIdNum; + this.streamIdNum = streamIdNum; + } + + public void setExtField(int extField) { + this.extField = extField; + } + + public void setAttrInfo(int intMsgType, boolean isCompress, byte[] aesKey, Map<String, String> tgtAttrs) { + this.intMsgType = intMsgType; + this.compress = isCompress; + this.aesKey = aesKey; + if (tgtAttrs != null && !tgtAttrs.isEmpty()) { + for (Map.Entry<String, String> entry : tgtAttrs.entrySet()) { + if (entry == null || entry.getKey() == null) { + continue; + } + this.attrMap.put(entry.getKey(), entry.getValue()); + } + String preAttrStr = mapJoiner.join(this.attrMap); + this.attrData = preAttrStr.getBytes(StandardCharsets.UTF_8); + this.attrDataLength = this.attrData.length; + } + } + + public void setMessageIdInfo(int messageId) { + this.messageId = messageId; + if (msgType == MsgType.MSG_ACK_SERVICE + || msgType == MsgType.MSG_MULTI_BODY) { + this.attrMap.put(AttributeConstants.MESSAGE_ID, String.valueOf(this.messageId)); + String preAttrStr = mapJoiner.join(this.attrMap); + this.attrData = preAttrStr.getBytes(StandardCharsets.UTF_8); + this.attrDataLength = this.attrData.length; + } + } + + public void setBodyData(int msgCnt, byte[] bodyBytes) { + this.msgCnt = msgCnt; + this.bodyData = bodyBytes; + if (this.bodyData != null) { + this.bodyDataLength = this.bodyData.length; + } + } + + public MsgType getMsgType() { + return msgType; + } + + public int getIntMsgType() { + return intMsgType; + } + + public int getMessageId() { + return messageId; + } + + public String getGroupId() { + return groupId; + } + + public String getStreamId() { + return streamId; + } + + public long getRtms() { + return rtms; + } + + public int getStreamIdNum() { + return streamIdNum; + } + + public int getGroupIdNum() { + return groupIdNum; + } + + public boolean isCompress() { + return compress; + } + + public byte[] getAesKey() { + return aesKey; + } + + public int getBodyDataLength() { + return bodyDataLength; + } + + public byte[] getBodyData() { + return bodyData; + } + + public int getAttrDataLength() { + return attrDataLength; + } + + public byte[] getAttrData() { + return attrData; + } + + public int getExtField() { + return extField; + } + + public int getMsgSize() { + return attrDataLength + bodyDataLength; + } + + public int getMsgCnt() { + return msgCnt; + } + + public long getDtMs() { + return dtMs; + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolDecoder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolDecoder.java new file mode 100644 index 0000000000..2132c72747 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolDecoder.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.network.tcp.codec; + +import org.apache.inlong.common.msg.MsgType; +import org.apache.inlong.sdk.dataproxy.utils.LogCounter; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +/** + * TCP protocol decoder class + * + * Used to decode the response package returned from DataProxy + */ +public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> { + + private static final Logger logger = LoggerFactory.getLogger(ProtocolDecoder.class); + private static final LogCounter decExptCounter = new LogCounter(10, 200000, 60 * 1000L); + + @Override + protected void decode(ChannelHandlerContext ctx, + ByteBuf buffer, List<Object> out) throws Exception { + buffer.markReaderIndex(); + // totallen + int totalLen = buffer.readInt(); + if (totalLen != buffer.readableBytes()) { + if (decExptCounter.shouldPrint()) { + logger.error("Length not equal, totalLen={},readableBytes={},from={}", + totalLen, buffer.readableBytes(), ctx.channel()); + } + buffer.resetReaderIndex(); + throw new Exception("totalLen is not equal readableBytes.total"); + } + // msgtype + int msgType = buffer.readByte() & 0x1f; + + if (msgType == 4) { + if (logger.isDebugEnabled()) { + logger.debug("debug decode"); + } + } else if (msgType == 3 | msgType == 5) { + // bodylen + int bodyLength = buffer.readInt(); + if (bodyLength >= totalLen) { + if (decExptCounter.shouldPrint()) { + logger.error("bodyLen greater than totalLen, totalLen={},bodyLen={},from={}", + totalLen, bodyLength, ctx.channel()); + } + buffer.resetReaderIndex(); + throw new Exception("bodyLen is greater than totalLen.totalLen"); + } + byte[] bodyBytes; + if (bodyLength > 0) { + bodyBytes = new byte[bodyLength]; + buffer.readBytes(bodyBytes); + } + // attrlen + String attrInfo = ""; + int attrLength = buffer.readInt(); + if (attrLength > 0) { + byte[] attrBytes = new byte[attrLength]; + buffer.readBytes(attrBytes); + attrInfo = new String(attrBytes, StandardCharsets.UTF_8); + } + out.add(new DecodeObject(MsgType.valueOf(msgType), attrInfo)); + } else if (msgType == 7) { + int seqId = buffer.readInt(); + int attrLen = buffer.readShort(); + String attrInfo = ""; + if (attrLen > 0) { + byte[] attrBytes = new byte[attrLen]; + buffer.readBytes(attrBytes); + attrInfo = new String(attrBytes, StandardCharsets.UTF_8); + } + buffer.readShort(); + out.add(new DecodeObject(MsgType.valueOf(msgType), seqId, attrInfo)); + } else if (msgType == 8) { + // dataTime(4) + body_ver(1) + body_len(4) + body + attr_len(2) + attr + magic(2) + buffer.skipBytes(4 + 1 + 4); // skip datatime, body_ver and body_len + final short load = buffer.readShort(); // read from body + int attrLen = buffer.readShort(); + String attrInfo = ""; + if (attrLen > 0) { + byte[] attrBytes = new byte[attrLen]; + buffer.readBytes(attrBytes); + attrInfo = new String(attrBytes, StandardCharsets.UTF_8); + } + buffer.skipBytes(2); // skip magic + out.add(new DecodeObject(MsgType.MSG_BIN_HEARTBEAT, attrInfo)); + } + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolEncoder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolEncoder.java new file mode 100644 index 0000000000..d6cafdb85b --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/ProtocolEncoder.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.network.tcp.codec; + +import org.apache.inlong.common.msg.MsgType; +import org.apache.inlong.sdk.dataproxy.utils.LogCounter; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageEncoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * TCP protocol encoder class + * + * Used to encode the request package sent to DataProxy + */ +public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> { + + private static final Logger logger = LoggerFactory.getLogger(ProtocolEncoder.class); + private static final LogCounter exptCounter = new LogCounter(10, 100000, 60 * 1000L); + + @Override + protected void encode(ChannelHandlerContext ctx, + EncodeObject encObject, List<Object> out) throws Exception { + ByteBuf buf = null; + int totalLength; + try { + if (encObject.getMsgType() == MsgType.MSG_ACK_SERVICE) { + totalLength = 1 + 4 + 4 + encObject.getMsgSize(); + buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength); + buf.writeInt(totalLength); + buf.writeByte(encObject.getIntMsgType()); + buf.writeInt(encObject.getBodyDataLength()); + if (encObject.getBodyDataLength() > 0) { + buf.writeBytes(encObject.getBodyData()); + } + buf.writeInt(encObject.getAttrDataLength()); + if (encObject.getAttrDataLength() > 0) { + buf.writeBytes(encObject.getAttrData()); + } + } else if (encObject.getMsgType() == MsgType.MSG_MULTI_BODY) { + totalLength = 1 + 4 + 4 + encObject.getMsgSize(); + buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength); + buf.writeInt(totalLength); + buf.writeByte(encObject.getIntMsgType()); + buf.writeInt(encObject.getBodyDataLength()); + if (encObject.getBodyDataLength() > 0) { + buf.writeBytes(encObject.getBodyData()); + } + buf.writeInt(encObject.getAttrDataLength()); + if (encObject.getAttrDataLength() > 0) { + buf.writeBytes(encObject.getAttrData()); + } + } else if (encObject.getMsgType() == MsgType.MSG_BIN_MULTI_BODY) { + totalLength = 1 + 2 + 2 + 2 + 4 + 2 + 4 + 4 + 2 + 2 + encObject.getMsgSize(); + buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength); + buf.writeInt(totalLength); + buf.writeByte(encObject.getIntMsgType()); + buf.writeShort(encObject.getGroupIdNum()); + buf.writeShort(encObject.getStreamIdNum()); + buf.writeShort(encObject.getExtField()); + buf.writeInt((int) encObject.getDtMs()); + buf.writeShort(encObject.getMsgCnt()); + buf.writeInt(encObject.getMessageId()); + buf.writeInt(encObject.getBodyDataLength()); + if (encObject.getBodyDataLength() > 0) { + buf.writeBytes(encObject.getBodyData()); + } + buf.writeShort(encObject.getAttrDataLength()); + if (encObject.getAttrDataLength() > 0) { + buf.writeBytes(encObject.getAttrData()); + } + buf.writeShort(0xee01); + } else if (encObject.getMsgType() == MsgType.MSG_BIN_HEARTBEAT) { + totalLength = 1 + 4 + 1 + 4 + 2 + encObject.getAttrDataLength() + 2; + buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength); + buf.writeInt(totalLength); + buf.writeByte(encObject.getIntMsgType()); + buf.writeInt((int) encObject.getDtMs()); + buf.writeByte(2); + buf.writeInt(0); + buf.writeShort(encObject.getAttrDataLength()); + if (encObject.getAttrDataLength() > 0) { + buf.writeBytes(encObject.getAttrData()); + } + buf.writeShort(0xee01); + } + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.warn("ProtocolEncoder encode({}) message failure", encObject.getMsgType(), ex); + } + } + if (buf != null) { + out.add(buf); + } else { + if (exptCounter.shouldPrint()) { + logger.warn("ProtocolEncoder write({}) buffer is null!", encObject.getMsgType()); + } + } + } +}