This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 99dec0578b [INLONG-11469][SDK] Optimize the single message processing logic in the EncodeObject class (#11470) 99dec0578b is described below commit 99dec0578b1168d0eb61465cd6f473659f4f3ba1 Author: Goson Zhang <4675...@qq.com> AuthorDate: Fri Nov 8 14:53:25 2024 +0800 [INLONG-11469][SDK] Optimize the single message processing logic in the EncodeObject class (#11470) Co-authored-by: gosonzhang <gosonzh...@tencent.com> --- .../inlong/common/msg/AttributeConstants.java | 1 + .../inlong/sdk/dataproxy/DefaultMessageSender.java | 68 ++++----- .../apache/inlong/sdk/dataproxy/LoadBalance.java | 6 +- .../apache/inlong/sdk/dataproxy/MessageSender.java | 44 ++---- .../inlong/sdk/dataproxy/codec/EncodeObject.java | 71 +--------- .../sdk/dataproxy/codec/ProtocolDecoder.java | 9 +- .../sdk/dataproxy/codec/ProtocolEncoder.java | 154 ++++++++------------- .../sdk/dataproxy/config/EncryptConfigEntry.java | 3 +- .../sdk/dataproxy/example/UdpClientExample.java | 46 +++--- .../inlong/sdk/dataproxy/network/ClientMgr.java | 3 +- .../sdk/dataproxy/pb/PbProtocolMessageSender.java | 34 ----- .../sdk/dataproxy/threads/MetricWorkerThread.java | 5 +- .../inlong/sdk/dataproxy/utils/LogCounter.java | 47 +++++++ 13 files changed, 182 insertions(+), 309 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java index ceaeef3054..3402fc6455 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java @@ -21,6 +21,7 @@ public interface AttributeConstants { String SEPARATOR = "&"; String KEY_VALUE_SEPARATOR = "="; + String LINE_FEED_SEP = "\n"; /** * group id diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java index 623e84d8c0..9e2c8c06b5 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java @@ -36,6 +36,7 @@ import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -229,15 +230,6 @@ public class DefaultMessageSender implements MessageSender { return sendIndexResult; } - @Deprecated - public SendResult sendMessage(byte[] body, String attributes, String msgUUID, - long timeout, TimeUnit timeUnit) { - Function<Sender, SendResult> sendOperation = - (sender) -> sender.syncSendMessage(new EncodeObject(body, attributes, idGenerator.getNextId()), msgUUID, - timeout, timeUnit); - return attemptSendMessage(sendOperation); - } - public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, long timeout, TimeUnit timeUnit) { return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, timeUnit, false); @@ -275,8 +267,9 @@ public class DefaultMessageSender implements MessageSender { boolean isCompressEnd = (isCompress && (body.length > cpsSize)); if (msgtype == 7 || msgtype == 8) { - EncodeObject encodeObject = new EncodeObject(body, msgtype, isCompressEnd, isReport, - isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend); + EncodeObject encodeObject = + new EncodeObject(Collections.singletonList(body), msgtype, isCompressEnd, isReport, + isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend); encodeObject.setSupportLF(isSupportLF); Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit); @@ -289,13 +282,13 @@ public class DefaultMessageSender implements MessageSender { final long finalDt = dt; Function<Sender, SendResult> sendOperation; if (isCompressEnd) { - sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(body, + sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(Collections.singletonList(body), "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + "&cp=snappy" + finalProxySend, idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit); } else { - sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(body, + sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(Collections.singletonList(body), "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + finalDt + finalProxySend, idGenerator.getNextId(), this.getMsgtype(), @@ -347,9 +340,10 @@ public class DefaultMessageSender implements MessageSender { boolean isCompressEnd = (isCompress && (body.length > cpsSize)); if (msgtype == 7 || msgtype == 8) { - EncodeObject encodeObject = new EncodeObject(body, msgtype, isCompressEnd, isReport, - isGroupIdTransfer, dt / 1000, - idGenerator.getNextInt(), groupId, streamId, attrs.toString()); + EncodeObject encodeObject = + new EncodeObject(Collections.singletonList(body), msgtype, isCompressEnd, isReport, + isGroupIdTransfer, dt / 1000, + idGenerator.getNextInt(), groupId, streamId, attrs.toString()); encodeObject.setSupportLF(isSupportLF); Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit); @@ -358,13 +352,15 @@ public class DefaultMessageSender implements MessageSender { attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt); if (isCompressEnd) { attrs.append("&cp=snappy"); - Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(body, - attrs.toString(), idGenerator.getNextId(), this.getMsgtype(), true, groupId), + Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage( + new EncodeObject(Collections.singletonList(body), + attrs.toString(), idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit); return attemptSendMessage(sendOperation); } else { - Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage(new EncodeObject(body, - attrs.toString(), idGenerator.getNextId(), this.getMsgtype(), false, groupId), + Function<Sender, SendResult> sendOperation = (sender) -> sender.syncSendMessage( + new EncodeObject(Collections.singletonList(body), + attrs.toString(), idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit); return attemptSendMessage(sendOperation); } @@ -503,13 +499,6 @@ public class DefaultMessageSender implements MessageSender { return null; } - @Deprecated - public void asyncSendMessage(SendMessageCallback callback, byte[] body, String attributes, - String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException { - sender.asyncSendMessage(new EncodeObject(body, attributes, idGenerator.getNextId()), - callback, msgUUID, timeout, timeUnit); - } - public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException { asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, timeout, timeUnit, false); @@ -546,9 +535,10 @@ public class DefaultMessageSender implements MessageSender { } boolean isCompressEnd = (isCompress && (body.length > cpsSize)); if (msgtype == 7 || msgtype == 8) { - EncodeObject encodeObject = new EncodeObject(body, this.getMsgtype(), isCompressEnd, isReport, - isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), - groupId, streamId, proxySend); + EncodeObject encodeObject = + new EncodeObject(Collections.singletonList(body), this.getMsgtype(), isCompressEnd, isReport, + isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), + groupId, streamId, proxySend); encodeObject.setSupportLF(isSupportLF); sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit); } else if (msgtype == 3 || msgtype == 5) { @@ -556,13 +546,13 @@ public class DefaultMessageSender implements MessageSender { if (isProxySend) { proxySend = "&" + proxySend; } - sender.asyncSendMessage(new EncodeObject(body, "groupId=" + sender.asyncSendMessage(new EncodeObject(Collections.singletonList(body), "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy" + proxySend, idGenerator.getNextId(), this.getMsgtype(), true, groupId), callback, msgUUID, timeout, timeUnit); } else { sender.asyncSendMessage( - new EncodeObject(body, "groupId=" + groupId + "&streamId=" + new EncodeObject(Collections.singletonList(body), "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt + proxySend, idGenerator.getNextId(), this.getMsgtype(), false, groupId), callback, @@ -611,21 +601,23 @@ public class DefaultMessageSender implements MessageSender { boolean isCompressEnd = (isCompress && (body.length > cpsSize)); if (msgtype == 7 || msgtype == 8) { - EncodeObject encodeObject = new EncodeObject(body, this.getMsgtype(), isCompressEnd, - isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), - groupId, streamId, attrs.toString()); + EncodeObject encodeObject = + new EncodeObject(Collections.singletonList(body), this.getMsgtype(), isCompressEnd, + isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), + groupId, streamId, attrs.toString()); encodeObject.setSupportLF(isSupportLF); sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit); } else if (msgtype == 3 || msgtype == 5) { attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt); if (isCompressEnd) { attrs.append("&cp=snappy"); - sender.asyncSendMessage(new EncodeObject(body, attrs.toString(), + sender.asyncSendMessage(new EncodeObject(Collections.singletonList(body), attrs.toString(), idGenerator.getNextId(), this.getMsgtype(), true, groupId), callback, msgUUID, timeout, timeUnit); } else { - sender.asyncSendMessage(new EncodeObject(body, attrs.toString(), idGenerator.getNextId(), - this.getMsgtype(), false, groupId), + sender.asyncSendMessage( + new EncodeObject(Collections.singletonList(body), attrs.toString(), idGenerator.getNextId(), + this.getMsgtype(), false, groupId), callback, msgUUID, timeout, timeUnit); } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java index 9ee1726c52..e71838ae76 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/LoadBalance.java @@ -25,10 +25,10 @@ public enum LoadBalance { WEIGHT_RANDOM("weight random", 3), WEIGHT_ROBIN("weight robin", 4); - private String name; - private int index; + private final String name; + private final int index; - private LoadBalance(String name, int index) { + LoadBalance(String name, int index) { this.name = name; this.index = index; } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java index 155031bee6..1b18096229 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java @@ -27,25 +27,13 @@ import java.util.concurrent.TimeUnit; public interface MessageSender { - /** - * This method provides a synchronized function which you want to send data - * with extra attributes except groupId,streamId,dt,etc - * This method is deprecated,we suggest you don't use it. - * - * @param body The data will be sent - * @param attributes The attributes you want to add - */ - @Deprecated - public SendResult sendMessage(byte[] body, String attributes, String msgUUID, - long timeout, TimeUnit timeUnit); - /** * This method provides a synchronized function which you want to send data without packing * * @param body The data will be sent * */ - public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, + SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, long timeout, TimeUnit timeUnit); /** @@ -57,7 +45,7 @@ public interface MessageSender { * @param extraAttrMap The attributes you want to add, * and each element of extraAttrMap contains a pair like attrKey,attrValue */ - public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, + SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap); /** @@ -66,7 +54,7 @@ public interface MessageSender { * * @param bodyList The data will be sent,which is a collection consisting of byte arrays */ - public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID, + SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID, long timeout, TimeUnit timeUnit); /** @@ -78,23 +66,9 @@ public interface MessageSender { * @param extraAttrMap The attributes you want to add, * and each element of extraAttrMap contains a pair like attrKey,attrValue */ - public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID, + SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap); - /** - * This method provides an asynchronized function which you want to send data - * with extra attributes except groupId,streamId,dt,etc - * This method is deprecated,we suggest you don't use it. - * - * - * @param body The data will be sent - * @param attributes The attributes you want to add - */ - @Deprecated - public void asyncSendMessage(SendMessageCallback callback, - byte[] body, String attributes, String msgUUID, - long timeout, TimeUnit timeUnit) throws ProxysdkException; - /** * This method provides a synchronized function which you want to send data without packing * with extra attributes except groupId,streamId,dt,etc @@ -104,7 +78,7 @@ public interface MessageSender { * @param extraAttrMap The attributes you want to add, * and each element of extraAttrMap contains a pair like attrKey,attrValue */ - public void asyncSendMessage(SendMessageCallback callback, + void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) throws ProxysdkException; @@ -116,7 +90,7 @@ public interface MessageSender { * @param callback The implementation of callback function * @param body The data will be sent */ - public void asyncSendMessage(SendMessageCallback callback, + void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException; @@ -126,7 +100,7 @@ public interface MessageSender { * * @param bodyList The data will be sent,which is a collection consisting of byte arrays */ - public void asyncSendMessage(SendMessageCallback callback, + void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException; @@ -139,7 +113,7 @@ public interface MessageSender { * @param extraAttrMap The attributes you want to add, and each * element of extraAttrMap contains a pair like attrKey,attrValue */ - public void asyncSendMessage(SendMessageCallback callback, + void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) throws ProxysdkException; @@ -170,5 +144,5 @@ public interface MessageSender { void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<byte[]> bodyList, SendMessageCallback callback) throws ProxysdkException; - public void close(); + void close(); } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java index e875d01e77..a89fef4fe9 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java @@ -35,7 +35,6 @@ public class EncodeObject { private static final Splitter.MapSplitter MAP_SPLITTER = Splitter.on(AttributeConstants.SEPARATOR).trimResults() .withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR); - private byte[] bodyBytes; private String attributes; private String messageId; private int msgtype; @@ -74,33 +73,12 @@ public class EncodeObject { } /* Used by de_serialization. */ - public EncodeObject(byte[] bodyBytes, String attributes) { - this.bodyBytes = bodyBytes; + public EncodeObject(List<byte[]> bodyList, String attributes) { + this.bodylist = bodyList; this.attributes = attributes; handleAttr(attributes); } - /* Used by serialization.But never used */ - // old version:we need add message id by attr - public EncodeObject(byte[] bodyBytes, String attributes, String messageId) { - this.bodyBytes = bodyBytes; - this.messageId = messageId; - this.attributes = attributes + "&messageId=" + messageId; - addRTMS(MsgType.MSG_COMMON_SERVICE.getValue()); - } - - // used for bytes initializtion,msgtype=3/5 - public EncodeObject(byte[] bodyBytes, String attributes, String messageId, - int msgtype, boolean isCompress, final String groupId) { - this.bodyBytes = bodyBytes; - this.messageId = messageId; - this.attributes = attributes + "&messageId=" + messageId; - this.msgtype = msgtype; - this.groupId = groupId; - this.isCompress = isCompress; - addRTMS(msgtype); - } - // used for bodylist initializtion,msgtype=3/5 public EncodeObject(List<byte[]> bodyList, String attributes, String messageId, int msgtype, boolean isCompress, final String groupId) { @@ -113,23 +91,6 @@ public class EncodeObject { addRTMS(msgtype); } - // used for bytes initializtion,msgtype=7/8 - public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress, boolean isReport, - boolean isGroupIdTransfer, long dt, long seqId, String groupId, - String streamId, String commonattr) { - this.bodyBytes = bodyBytes; - this.msgtype = msgtype; - this.isCompress = isCompress; - this.isReport = isReport; - this.dt = dt; - this.isGroupIdTransfer = isGroupIdTransfer; - this.commonattr = commonattr; - this.messageId = String.valueOf(seqId); - this.groupId = groupId; - this.streamId = streamId; - addRTMS(msgtype); - } - // used for bodylist initializtion,msgtype=7/8 public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress, boolean isReport, boolean isGroupIdTransfer, long dt, @@ -147,26 +108,6 @@ public class EncodeObject { addRTMS(msgtype); } - // file agent, used for bytes initializtion,msgtype=7/8 - public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress, - boolean isReport, boolean isGroupIdTransfer, long dt, - long seqId, String groupId, String streamId, String commonattr, - String messageKey, String proxyIp) { - this.bodyBytes = bodyBytes; - this.msgtype = msgtype; - this.isCompress = isCompress; - this.isReport = isReport; - this.dt = dt; - this.isGroupIdTransfer = isGroupIdTransfer; - this.commonattr = commonattr; - this.messageId = String.valueOf(seqId); - this.groupId = groupId; - this.streamId = streamId; - this.messageKey = messageKey; - this.proxyIp = proxyIp; - addRTMS(msgtype); - } - // file agent, used for bodylist initializtion,msgtype=7/8 public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress, boolean isReport, boolean isGroupIdTransfer, long dt, @@ -395,14 +336,6 @@ public class EncodeObject { this.msgtype = msgtype; } - public byte[] getBodyBytes() { - return bodyBytes; - } - - public void setBodyBytes(byte[] bodyBytes) { - this.bodyBytes = bodyBytes; - } - public String getAttributes() { return attributes; } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java index 63476e7a50..2038a8b8d6 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.List; public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> { @@ -71,7 +72,13 @@ public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> { attrBytes = new byte[attrLength]; buffer.readBytes(attrBytes); } - EncodeObject object = new EncodeObject(bodyBytes, new String(attrBytes, StandardCharsets.UTF_8)); + EncodeObject object; + if (bodyBytes == null) { + object = new EncodeObject(new String(attrBytes, StandardCharsets.UTF_8)); + } else { + object = new EncodeObject(Collections.singletonList(bodyBytes), + new String(attrBytes, StandardCharsets.UTF_8)); + } object.setMsgtype(5); out.add(object); } else if (msgType == 7) { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java index fe6dec9733..ecc1e1de91 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolEncoder.java @@ -17,10 +17,12 @@ package org.apache.inlong.sdk.dataproxy.codec; +import org.apache.inlong.common.msg.AttributeConstants; import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry; import org.apache.inlong.sdk.dataproxy.config.EncryptInfo; import org.apache.inlong.sdk.dataproxy.network.Utils; import org.apache.inlong.sdk.dataproxy.utils.EncryptUtil; +import org.apache.inlong.sdk.dataproxy.utils.LogCounter; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -31,11 +33,10 @@ import org.slf4j.LoggerFactory; import org.xerial.snappy.Snappy; import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.security.SecureRandom; -import java.util.Iterator; import java.util.List; import static org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_AUTH; @@ -44,30 +45,26 @@ import static org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_ENCRYPT public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> { - private static final Logger logger = LoggerFactory - .getLogger(ProtocolEncoder.class); + private static final Logger logger = LoggerFactory.getLogger(ProtocolEncoder.class); + private static final LogCounter exptCounter = new LogCounter(10, 100000, 60 * 1000L); protected void encode(ChannelHandlerContext ctx, EncodeObject message, List<Object> out) throws Exception { ByteBuf buf = null; try { - EncodeObject object = message; - if (object.getMsgtype() == 3) { - buf = writeToBuf3(object); + if (message.getMsgtype() == 3) { + buf = writeToBuf3(message); + } else if (message.getMsgtype() == 5) { + buf = writeToBuf5(message); + } else if (message.getMsgtype() == 7) { + buf = writeToBuf7(message); + } else if (message.getMsgtype() == 8) { + buf = writeToBuf8(message); } - if (object.getMsgtype() == 5) { - buf = writeToBuf5(object); + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.error("ProtocolEncoder encode message failure", ex); } - - if (object.getMsgtype() == 7) { - buf = writeToBuf7(object); - } - if (object.getMsgtype() == 8) { - buf = writeToBuf8(object); - } - } catch (Exception e) { - logger.error("{}", e.getMessage()); - e.printStackTrace(); } if (buf != null) { out.add(buf); @@ -113,8 +110,10 @@ public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> { buf.writeBytes(endAttr.getBytes("utf8")); } buf.writeShort(0xee01); - } catch (Exception e) { - logger.error(e.getMessage()); + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.error("Write type8 data exception", ex); + } } return buf; } @@ -176,7 +175,7 @@ public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> { buf.writeInt((int) object.getDt()); buf.writeShort(cnt); - buf.writeInt(Integer.valueOf(object.getMessageId())); + buf.writeInt(Integer.parseInt(object.getMessageId())); buf.writeInt(body.length); buf.writeBytes(body); @@ -195,53 +194,41 @@ public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> { byte[] body = null; int cnt = 1; - if (object.getBodylist() != null && object.getBodylist().size() != 0) { + if (object.getBodylist() != null && !object.getBodylist().isEmpty()) { if (object.getCnt() > 0) { cnt = object.getCnt(); } else { cnt = object.getBodylist().size(); } - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Iterator<byte[]> iter = object.getBodylist().iterator(); - if (object.isSupportLF()) { + int totalCnt = 0; ByteArrayOutputStream data = new ByteArrayOutputStream(); - int len = object.getBodylist().size(); - for (int i = 0; i < len - 1; i++) { - data.write(object.getBodylist().get(i)); - data.write("\n".getBytes("utf8")); + for (byte[] entry : object.getBodylist()) { + if (totalCnt++ > 0) { + data.write("\n".getBytes("utf8")); + } + data.write(entry); } - data.write(object.getBodylist().get(len - 1)); - ByteBuffer databuffer = ByteBuffer.allocate(4); - databuffer.putInt(data.toByteArray().length); - out.write(databuffer.array()); + ByteBuffer dataBuffer = ByteBuffer.allocate(4); + dataBuffer.putInt(data.toByteArray().length); + out.write(dataBuffer.array()); out.write(data.toByteArray()); } else { - while (iter.hasNext()) { - byte[] entry = iter.next(); - ByteBuffer databuffer = ByteBuffer.allocate(4); - databuffer.putInt(entry.length); - out.write(databuffer.array()); + for (byte[] entry : object.getBodylist()) { + ByteBuffer dataBuffer = ByteBuffer.allocate(4); + dataBuffer.putInt(entry.length); + out.write(dataBuffer.array()); out.write(entry); } } body = out.toByteArray(); } - // send single message one time - if (object.getBodyBytes() != null && object.getBodyBytes().length != 0) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - - ByteBuffer databuffer = ByteBuffer.allocate(4); - databuffer.putInt(object.getBodyBytes().length); - out.write(databuffer.array()); - out.write(object.getBodyBytes()); - body = out.toByteArray(); - } - buf = constructBody(body, object, totalLength, cnt); - } catch (Exception e) { - logger.error("writeToBuf7 has {}", e); + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.error("Write type7 data exception", ex); + } } return buf; } @@ -253,11 +240,9 @@ public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> { byte[] body = null; // send multiple messages one time - if (object.getBodylist() != null && object.getBodylist().size() != 0) { + if (object.getBodylist() != null && !object.getBodylist().isEmpty()) { ByteArrayOutputStream out = new ByteArrayOutputStream(); - Iterator<byte[]> iter = object.getBodylist().iterator(); - while (iter.hasNext()) { - byte[] entry = iter.next(); + for (byte[] entry : object.getBodylist()) { ByteBuffer byteBuffer = ByteBuffer.allocate(4); byteBuffer.putInt(entry.length); out.write(byteBuffer.array()); @@ -265,15 +250,6 @@ public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> { } body = out.toByteArray(); } - // send single message one time - if (object.getBodyBytes() != null && object.getBodyBytes().length != 0) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ByteBuffer byteBuffer = ByteBuffer.allocate(4); - byteBuffer.putInt(object.getBodyBytes().length); - out.write(byteBuffer.array()); - out.write(object.getBodyBytes()); - body = out.toByteArray(); - } if (body != null) { String msgAttrs = object.getAttributes(); if (object.isCompress()) { @@ -312,25 +288,14 @@ public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> { buf.writeInt(msgAttrs.getBytes("utf8").length); buf.writeBytes(msgAttrs.getBytes("utf8")); } - } catch (Exception e) { - logger.error("{}", e.getMessage()); - e.printStackTrace(); + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.error("Write type5 data exception", ex); + } } return buf; } - /* - * private ChannelBuffer writeToBuf4(EncodeObject object) { ChannelBuffer buf = ChannelBuffers.dynamicBuffer(); try - * { int totalLength = 1 + 4 + 4; byte[] body = null; - * - * //send single message one time if (object.getBodyBytes() != null && object.getBodyBytes().length != 0) { body = - * object.getBodyBytes(); } totalLength = totalLength + body.length + - * object.getAttributes().getBytes("utf8").length; buf.writeInt(totalLength); buf.writeByte(4); - * buf.writeInt(body.length); buf.writeBytes(body); buf.writeInt(object.getAttributes().getBytes().length); - * buf.writeBytes(object.getAttributes().getBytes()); } catch (Exception e) { logger.error(e.getMessage()); } return - * buf; } - */ - private ByteBuf writeToBuf3(EncodeObject object) { ByteBuf buf = null; try { @@ -338,20 +303,17 @@ public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> { byte[] body = null; // send multiple messages one time - if (object.getBodylist() != null && object.getBodylist().size() != 0) { + if (object.getBodylist() != null && !object.getBodylist().isEmpty()) { + int totalCnt = 0; ByteArrayOutputStream out = new ByteArrayOutputStream(); - Iterator<byte[]> iter = object.getBodylist().iterator(); - while (iter.hasNext()) { - byte[] entry = iter.next(); + for (byte[] entry : object.getBodylist()) { + if (totalCnt++ > 0) { + out.write(AttributeConstants.LINE_FEED_SEP.getBytes(StandardCharsets.UTF_8)); + } out.write(entry); - out.write("\n".getBytes("utf8")); } body = out.toByteArray(); } - // send single message one time - if (object.getBodyBytes() != null && object.getBodyBytes().length != 0) { - body = object.getBodyBytes(); - } if (body != null) { String msgAttrs = object.getAttributes(); if (object.isCompress()) { @@ -390,9 +352,10 @@ public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> { buf.writeInt(msgAttrs.getBytes("utf8").length); buf.writeBytes(msgAttrs.getBytes("utf8")); } - } catch (Exception e) { - logger.error("{}", e.getMessage()); - e.printStackTrace(); + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.error("Write type3 data exception", ex); + } } return buf; } @@ -407,9 +370,10 @@ public class ProtocolEncoder extends MessageToMessageEncoder<EncodeObject> { tmpData, 0); body = new byte[len]; System.arraycopy(tmpData, 0, body, 0, len); - } catch (IOException e) { - logger.error("{}", e.getMessage()); - e.printStackTrace(); + } catch (Throwable ex) { + if (exptCounter.shouldPrint()) { + logger.error("Compress data exception", ex); + } } return body; } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java index f27506fe22..6acfe09d8a 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/EncryptConfigEntry.java @@ -35,7 +35,7 @@ public class EncryptConfigEntry implements java.io.Serializable { private String pubKey; private byte[] aesKey; private String rsaEncryptedKey; - private AtomicLong lastUpdateTime = new AtomicLong(0); + private final AtomicLong lastUpdateTime = new AtomicLong(0); public EncryptConfigEntry(final String userName, final String version, final String pubKey) { this.userName = userName; @@ -43,7 +43,6 @@ public class EncryptConfigEntry implements java.io.Serializable { this.pubKey = pubKey; this.aesKey = null; this.rsaEncryptedKey = null; - // this.rsaKey = EncryptUtil.loadPublicKeyByText(pubKey); } public String getVersion() { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java index 26e490fc37..863f197353 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java @@ -45,7 +45,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.security.SecureRandom; -import java.util.Iterator; +import java.util.Collections; import java.util.concurrent.TimeUnit; import static org.apache.inlong.sdk.dataproxy.ConfigConstants.FLAG_ALLOW_COMPRESS; @@ -116,9 +116,8 @@ public class UdpClientExample { boolean isGroupIdTransfer, long dt, long seqId, String groupId, String streamId, String attr) throws UnsupportedEncodingException { EncodeObject encodeObject = - new EncodeObject(getRandomString(5).getBytes("UTF-8"), msgType, - isCompress, - isReport, isGroupIdTransfer, dt, seqId, groupId, streamId, attr); + new EncodeObject(Collections.singletonList(getRandomString(5).getBytes("UTF-8")), + msgType, isCompress, isReport, isGroupIdTransfer, dt, seqId, groupId, streamId, attr); return encodeObject; } @@ -142,48 +141,37 @@ public class UdpClientExample { byte[] body = null; int cnt = 1; - if (object.getBodylist() != null && object.getBodylist().size() != 0) { + if (object.getBodylist() != null && !object.getBodylist().isEmpty()) { if (object.getCnt() > 0) { cnt = object.getCnt(); } else { cnt = object.getBodylist().size(); } ByteArrayOutputStream out = new ByteArrayOutputStream(); - Iterator<byte[]> iter = object.getBodylist().iterator(); if (object.isSupportLF()) { + int totalCnt = 0; ByteArrayOutputStream data = new ByteArrayOutputStream(); - int len = object.getBodylist().size(); - for (int i = 0; i < len - 1; i++) { - data.write(object.getBodylist().get(i)); - data.write("\n".getBytes("utf8")); + for (byte[] entry : object.getBodylist()) { + if (totalCnt++ > 0) { + data.write("\n".getBytes("utf8")); + } + data.write(entry); } - data.write(object.getBodylist().get(len - 1)); - ByteBuffer databuffer = ByteBuffer.allocate(4); - databuffer.putInt(data.toByteArray().length); - out.write(databuffer.array()); + ByteBuffer dataBuffer = ByteBuffer.allocate(4); + dataBuffer.putInt(data.toByteArray().length); + out.write(dataBuffer.array()); out.write(data.toByteArray()); } else { - while (iter.hasNext()) { - byte[] entry = iter.next(); - ByteBuffer databuffer = ByteBuffer.allocate(4); - databuffer.putInt(entry.length); - out.write(databuffer.array()); + for (byte[] entry : object.getBodylist()) { + ByteBuffer dataBuffer = ByteBuffer.allocate(4); + dataBuffer.putInt(entry.length); + out.write(dataBuffer.array()); out.write(entry); } } body = out.toByteArray(); } - if (object.getBodyBytes() != null && object.getBodyBytes().length != 0) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - - ByteBuffer databuffer = ByteBuffer.allocate(4); - databuffer.putInt(object.getBodyBytes().length); - out.write(databuffer.array()); - out.write(object.getBodyBytes()); - body = out.toByteArray(); - } - if (body != null) { if (object.isCompress()) { body = processCompress(body); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java index c891788465..e1412a9936 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ClientMgr.java @@ -847,7 +847,8 @@ public class ClientMgr { } logger.debug("active host to send heartbeat! {}", hostInfo.getReferenceName()); String hbMsg = "heartbeat:" + hostInfo.getHostName(); - EncodeObject encodeObject = new EncodeObject(hbMsg.getBytes(StandardCharsets.UTF_8), + EncodeObject encodeObject = new EncodeObject( + Collections.singletonList(hbMsg.getBytes(StandardCharsets.UTF_8)), 8, false, false, false, System.currentTimeMillis() / 1000, 1, "", "", ""); try { if (configure.isNeedAuthentication()) { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java index d9817b7a03..f0bd45d13e 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java @@ -171,22 +171,6 @@ public class PbProtocolMessageSender implements MessageSender, Configurable { return context; } - /** - * sendMessage - * - * @param body - * @param attributes - * @param msgUUID - * @param timeout - * @param timeUnit - * @return SendResult - * @deprecated - */ - @Override - public SendResult sendMessage(byte[] body, String attributes, String msgUUID, long timeout, TimeUnit timeUnit) { - return SendResult.INVALID_ATTRIBUTES; - } - /** * sendMessage * @@ -345,24 +329,6 @@ public class PbProtocolMessageSender implements MessageSender, Configurable { return refResult.get(); } - /** - * asyncSendMessage - * - * @param callback - * @param body - * @param attributes - * @param msgUUID - * @param timeout - * @param timeUnit - * @throws ProxysdkException - * @deprecated - */ - @Override - public void asyncSendMessage(SendMessageCallback callback, byte[] body, String attributes, String msgUUID, - long timeout, TimeUnit timeUnit) throws ProxysdkException { - throw new ProxysdkException("Not support"); - } - /** * asyncSendMessage * diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java index e735850360..ed62ccb4b6 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java @@ -32,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -195,7 +196,7 @@ public class MetricWorkerThread extends Thread implements Closeable { sender.asyncSendMessage(encodeObject, callBack, String.valueOf(System.currentTimeMillis()), 20, TimeUnit.SECONDS); } else { - logger.error("Send metric failure: {} {}", encodeObject.getBodyBytes(), encodeObject.getBodylist()); + logger.error("Send metric failure: {}", encodeObject.getBodylist()); } } catch (Throwable ex) { logger.warn("Send metric throw exception", ex); @@ -204,7 +205,7 @@ public class MetricWorkerThread extends Thread implements Closeable { } private void sendSingleLine(String line, String streamId, long dtTime) { - EncodeObject encodeObject = new EncodeObject(line.getBytes(), 7, + EncodeObject encodeObject = new EncodeObject(Collections.singletonList(line.getBytes()), 7, false, false, false, dtTime, idGenerator.getNextInt(), metricConfig.getMetricGroupId(), streamId, "", "", Utils.getLocalIp()); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java new file mode 100644 index 0000000000..edb14c62b2 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.utils; + +import java.util.concurrent.atomic.AtomicLong; + +public class LogCounter { + + private final AtomicLong counter = new AtomicLong(0); + + private long start = 10L; + private long control = 100000L; + private long reset = 60 * 1000L; + + private AtomicLong lastLogTime = new AtomicLong(System.currentTimeMillis()); + + public LogCounter(long start, long control, long reset) { + this.start = start; + this.control = control; + this.reset = reset; + } + + public boolean shouldPrint() { + long curTime = lastLogTime.get(); + if (System.currentTimeMillis() - curTime > reset) { + if (lastLogTime.compareAndSet(curTime, System.currentTimeMillis())) { + counter.set(0); + } + } + return counter.incrementAndGet() <= start || counter.get() % control == 0; + } +}