This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit ce8f194f72d67d7218bf7734bf832d9df3e093f5 Author: xueyingzhang <86780714+poc...@users.noreply.github.com> AuthorDate: Tue Nov 8 15:47:47 2022 +0800 [INLONG-6417][SDK] Support proxy-send mode (#6437) --- .../inlong/sdk/dataproxy/DefaultMessageSender.java | 376 +++++++++++++++++---- .../inlong/sdk/dataproxy/utils/ProxyUtils.java | 3 +- 2 files changed, 306 insertions(+), 73 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java index 2dc1c2ca5..4cba1a3ea 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java @@ -18,6 +18,7 @@ package org.apache.inlong.sdk.dataproxy; +import org.apache.inlong.common.msg.AttributeConstants; import org.apache.inlong.sdk.dataproxy.codec.EncodeObject; import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry; import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager; @@ -95,12 +96,12 @@ public class DefaultMessageSender implements MessageSender { /** * generate by cluster id * - * @param configure - sender + * @param configure - sender * @param selfDefineFactory - sender factory * @return - sender */ public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig configure, - ThreadFactory selfDefineFactory) throws Exception { + ThreadFactory selfDefineFactory) throws Exception { ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure, Utils.getLocalIp(), null); proxyConfigManager.setGroupId(configure.getGroupId()); @@ -188,35 +189,61 @@ public class DefaultMessageSender implements MessageSender { @Deprecated public SendResult sendMessage(byte[] body, String attributes, String msgUUID, - long timeout, TimeUnit timeUnit) { + long timeout, TimeUnit timeUnit) { return sender.syncSendMessage(new EncodeObject(body, attributes, idGenerator.getNextId()), msgUUID, timeout, timeUnit); } public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit) { + long timeout, TimeUnit timeUnit) { + return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, timeUnit, false); + } + + /** + * ync send single message + * + * @param body message data + * @param groupId groupId + * @param streamId streamId + * @param dt data report timestamp + * @param msgUUID msg uuid + * @param timeout + * @param timeUnit + * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ + * @return SendResult.OK means success + */ + public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, + long timeout, TimeUnit timeUnit, boolean isProxySend) { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) { return SendResult.INVALID_ATTRIBUTES; } addIndexCnt(groupId, streamId, 1); + String proxySend = ""; + if (isProxySend) { + proxySend = AttributeConstants.MESSAGE_PROXY_SEND + "=true"; + } + boolean isCompressEnd = (isCompress && (body.length > cpsSize)); if (msgtype == 7 || msgtype == 8) { EncodeObject encodeObject = new EncodeObject(body, msgtype, isCompressEnd, isReport, - isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, ""); + isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend); encodeObject.setSupportLF(isSupportLF); return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit); } else if (msgtype == 3 || msgtype == 5) { + if (isProxySend) { + proxySend = "&" + proxySend; + } if (isCompressEnd) { return sender.syncSendMessage(new EncodeObject(body, "groupId=" + groupId + "&streamId=" - + streamId + "&dt=" + dt + "&cp=snappy", idGenerator.getNextId(), this.getMsgtype(), - true, groupId), msgUUID, timeout, timeUnit); + + streamId + "&dt=" + dt + "&cp=snappy" + proxySend, idGenerator.getNextId(), + this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit); } else { - return sender.syncSendMessage(new EncodeObject(body, "groupId=" + groupId + "&streamId=" - + streamId + "&dt=" + dt, idGenerator.getNextId(), this.getMsgtype(), false, groupId), - msgUUID, timeout, timeUnit); + return sender.syncSendMessage(new EncodeObject(body, + "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt + proxySend, + idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit); } } @@ -224,7 +251,26 @@ public class DefaultMessageSender implements MessageSender { } public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) { + long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) { + return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false); + } + + /** + * sync send single message + * + * @param body message data + * @param groupId groupId + * @param streamId streamId + * @param dt data report timestamp + * @param msgUUID msg uuid + * @param timeout + * @param timeUnit + * @param extraAttrMap extra attributes + * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ + * @return SendResult.OK means success + */ + public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID, + long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap, boolean isProxySend) { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) { @@ -232,6 +278,9 @@ public class DefaultMessageSender implements MessageSender { } addIndexCnt(groupId, streamId, 1); + if (isProxySend) { + extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true"); + } StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap); boolean isCompressEnd = (isCompress && (body.length > cpsSize)); @@ -260,27 +309,54 @@ public class DefaultMessageSender implements MessageSender { } public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit) { + long timeout, TimeUnit timeUnit) { + return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, false); + } + + /** + * sync send a batch of messages + * + * @param bodyList list of messages + * @param groupId groupId + * @param streamId streamId + * @param dt data report timestamp + * @param msgUUID msg uuid + * @param timeout + * @param timeUnit + * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ + * @return SendResult.OK means success + */ + public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID, + long timeout, TimeUnit timeUnit, boolean isProxySend) { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) { return SendResult.INVALID_ATTRIBUTES; } addIndexCnt(groupId, streamId, bodyList.size()); + String proxySend = ""; + if (isProxySend) { + proxySend = AttributeConstants.MESSAGE_SYNC_SEND + "=true"; + } + if (msgtype == 7 || msgtype == 8) { EncodeObject encodeObject = new EncodeObject(bodyList, msgtype, isCompress, isReport, isGroupIdTransfer, dt / 1000, - idGenerator.getNextInt(), groupId, streamId, ""); + idGenerator.getNextInt(), groupId, streamId, proxySend); encodeObject.setSupportLF(isSupportLF); return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit); } else if (msgtype == 3 || msgtype == 5) { + if (isProxySend) { + proxySend = "&" + proxySend; + } if (isCompress) { return sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId - + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size(), idGenerator.getNextId(), - this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit); + + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size() + proxySend, + idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit); } else { return sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId - + "&dt=" + dt + "&cnt=" + bodyList.size(), idGenerator.getNextId(), this.getMsgtype(), + + "&dt=" + dt + "&cnt=" + bodyList.size() + proxySend, idGenerator.getNextId(), + this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit); } } @@ -288,13 +364,35 @@ public class DefaultMessageSender implements MessageSender { } public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, - String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) { + String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) { + return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false); + } + + /** + * sync send a batch of messages + * + * @param bodyList list of messages + * @param groupId groupId + * @param streamId streamId + * @param dt data report timestamp + * @param msgUUID msg uuid + * @param timeout + * @param timeUnit + * @param extraAttrMap extra attributes + * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ + * @return SendResult.OK means success + */ + public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, + String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap, boolean isProxySend) { dt = ProxyUtils.covertZeroDt(dt); - if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) - || !ProxyUtils.isAttrKeysValid(extraAttrMap)) { + if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid( + extraAttrMap)) { return SendResult.INVALID_ATTRIBUTES; } addIndexCnt(groupId, streamId, bodyList.size()); + if (isProxySend) { + extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true"); + } StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap); if (msgtype == 7 || msgtype == 8) { @@ -322,37 +420,62 @@ public class DefaultMessageSender implements MessageSender { @Deprecated public void asyncSendMessage(SendMessageCallback callback, byte[] body, String attributes, - String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException { + String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException { sender.asyncSendMessage(new EncodeObject(body, attributes, idGenerator.getNextId()), callback, msgUUID, timeout, timeUnit); } - public void asyncSendMessage(SendMessageCallback callback, byte[] body, - String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit) throws ProxysdkException { + public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, + String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException { + asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, timeout, timeUnit, false); + } + + /** + * async send single message + * + * @param callback callback can be null + * @param body message data + * @param groupId groupId + * @param streamId streamId + * @param dt data report timestamp + * @param msgUUID msg uuid + * @param timeout + * @param timeUnit + * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ + * @throws ProxysdkException + */ + public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, + String msgUUID, long timeout, TimeUnit timeUnit, boolean isProxySend) throws ProxysdkException { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) { throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString()); } addIndexCnt(groupId, streamId, 1); + String proxySend = ""; + if (isProxySend) { + proxySend = AttributeConstants.MESSAGE_PROXY_SEND + "=true"; + } boolean isCompressEnd = (isCompress && (body.length > cpsSize)); if (msgtype == 7 || msgtype == 8) { EncodeObject encodeObject = new EncodeObject(body, this.getMsgtype(), isCompressEnd, isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), - groupId, streamId, ""); + groupId, streamId, proxySend); encodeObject.setSupportLF(isSupportLF); sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit); } else if (msgtype == 3 || msgtype == 5) { if (isCompressEnd) { + if (isProxySend) { + proxySend = "&" + proxySend; + } sender.asyncSendMessage(new EncodeObject(body, "groupId=" - + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy", + + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy" + proxySend, idGenerator.getNextId(), this.getMsgtype(), true, groupId), callback, msgUUID, timeout, timeUnit); } else { sender.asyncSendMessage( new EncodeObject(body, "groupId=" + groupId + "&streamId=" - + streamId + "&dt=" + dt, idGenerator.getNextId(), + + streamId + "&dt=" + dt + proxySend, idGenerator.getNextId(), this.getMsgtype(), false, groupId), callback, msgUUID, timeout, timeUnit); } @@ -360,14 +483,38 @@ public class DefaultMessageSender implements MessageSender { } - public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, - long dt, String msgUUID, long timeout, TimeUnit timeUnit, - Map<String, String> extraAttrMap) throws ProxysdkException { + public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, + String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) + throws ProxysdkException { + asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false); + } + + /** + * async send single message + * + * @param callback callback can be null + * @param body message data + * @param groupId groupId + * @param streamId streamId + * @param dt data report timestamp + * @param msgUUID msg uuid + * @param timeout + * @param timeUnit + * @param extraAttrMap extra attributes + * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ + * @throws ProxysdkException + */ + public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, + String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap, boolean isProxySend) + throws ProxysdkException { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) { throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString()); } addIndexCnt(groupId, streamId, 1); + if (isProxySend) { + extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true"); + } StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap); boolean isCompressEnd = (isCompress && (body.length > cpsSize)); @@ -392,45 +539,97 @@ public class DefaultMessageSender implements MessageSender { } } + public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, String groupId, String streamId, + long dt, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException { + asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, false); + } + + /** + * async send a batch of messages + * + * @param callback callback can be null + * @param bodyList list of messages + * @param groupId groupId + * @param streamId streamId + * @param dt data report time + * @param msgUUID msg uuid + * @param timeout + * @param timeUnit + * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ + * @throws ProxysdkException + */ public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, - String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit) throws ProxysdkException { + String groupId, String streamId, long dt, String msgUUID, + long timeout, TimeUnit timeUnit, boolean isProxySend) throws ProxysdkException { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) { throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString()); } addIndexCnt(groupId, streamId, bodyList.size()); + String proxySend = ""; + if (isProxySend) { + proxySend = AttributeConstants.MESSAGE_PROXY_SEND + "=true"; + } if (msgtype == 7 || msgtype == 8) { EncodeObject encodeObject = new EncodeObject(bodyList, this.getMsgtype(), isCompress, isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), - groupId, streamId, ""); + groupId, streamId, proxySend); encodeObject.setSupportLF(isSupportLF); sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit); } else if (msgtype == 3 || msgtype == 5) { + if (isProxySend) { + proxySend = "&" + proxySend; + } if (isCompress) { sender.asyncSendMessage( new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId - + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size(), idGenerator.getNextId(), + + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size() + proxySend, + idGenerator.getNextId(), this.getMsgtype(), true, groupId), callback, msgUUID, timeout, timeUnit); } else { sender.asyncSendMessage( - new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt - + "&cnt=" + bodyList.size(), idGenerator.getNextId(), this.getMsgtype(), - false, groupId), callback, msgUUID, timeout, timeUnit); + new EncodeObject(bodyList, + "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cnt=" + bodyList.size() + + proxySend, idGenerator.getNextId(), this.getMsgtype(), false, groupId), + callback, msgUUID, timeout, timeUnit); } } } public void asyncSendMessage(SendMessageCallback callback, - List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID, - long timeout, TimeUnit timeUnit, - Map<String, String> extraAttrMap) throws ProxysdkException { + List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID, + long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) throws ProxysdkException { + asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false); + } + + /** + * async send a batch of messages + * + * @param callback callback can be null + * @param bodyList list of messages + * @param groupId groupId + * @param streamId streamId + * @param dt data report time + * @param msgUUID msg uuid + * @param timeout + * @param timeUnit + * @param extraAttrMap extra attributes + * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ + * @throws ProxysdkException + */ + public void asyncSendMessage(SendMessageCallback callback, + List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID, + long timeout, TimeUnit timeUnit, + Map<String, String> extraAttrMap, boolean isProxySend) throws ProxysdkException { dt = ProxyUtils.covertZeroDt(dt); - if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) - || !ProxyUtils.isAttrKeysValid(extraAttrMap)) { + if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid( + extraAttrMap)) { throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString()); } addIndexCnt(groupId, streamId, bodyList.size()); + if (isProxySend) { + extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true"); + } StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap); if (msgtype == 7 || msgtype == 8) { @@ -465,28 +664,60 @@ public class DefaultMessageSender implements MessageSender { * @throws ProxysdkException */ @Override - public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, - SendMessageCallback callback) throws ProxysdkException { + public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback) + throws ProxysdkException { this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, System.currentTimeMillis(), idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT); } /** - * asyncSendMessage + * async send single message * - * @param inlongGroupId - * @param inlongStreamId - * @param bodyList - * @param callback + * @param inlongGroupId groupId + * @param inlongStreamId streamId + * @param body a single message + * @param callback callback can be null + * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ + * @throws ProxysdkException + */ + public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback, + boolean isProxySend) throws ProxysdkException { + this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, System.currentTimeMillis(), + idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT, isProxySend); + } + + /** + * async send a batch of messages + * + * @param inlongGroupId groupId + * @param inlongStreamId streamId + * @param bodyList list of messages + * @param callback callback can be null * @throws ProxysdkException */ @Override public void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<byte[]> bodyList, - SendMessageCallback callback) throws ProxysdkException { + SendMessageCallback callback) throws ProxysdkException { this.asyncSendMessage(callback, bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(), idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT); } + /** + * async send a batch of messages + * + * @param inlongGroupId groupId + * @param inlongStreamId streamId + * @param bodyList list of messages + * @param callback callback can be null + * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ + * @throws ProxysdkException + */ + public void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<byte[]> bodyList, + SendMessageCallback callback, boolean isProxySend) throws ProxysdkException { + this.asyncSendMessage(callback, bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(), + idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT, isProxySend); + } + private void addIndexCnt(String groupId, String streamId, long cnt) { try { String key = groupId + "|" + streamId; @@ -501,10 +732,10 @@ public class DefaultMessageSender implements MessageSender { } } - public void asyncsendMessageData(FileCallback callback, List<byte[]> bodyList, String groupId, - String streamId, long dt, int sid, boolean isSupportLF, String msgUUID, - long timeout, TimeUnit timeUnit, - Map<String, String> extraAttrMap) throws ProxysdkException { + @Deprecated + public void asyncsendMessageData(FileCallback callback, List<byte[]> bodyList, String groupId, String streamId, + long dt, int sid, boolean isSupportLF, String msgUUID, long timeout, TimeUnit timeUnit, + Map<String, String> extraAttrMap) throws ProxysdkException { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) { @@ -523,9 +754,9 @@ public class DefaultMessageSender implements MessageSender { } } - private void asyncSendMetric(FileCallback callback, byte[] body, String groupId, - String streamId, long dt, int sid, String ip, String msgUUID, - long timeout, TimeUnit timeUnit, String messageKey) throws ProxysdkException { + @Deprecated + private void asyncSendMetric(FileCallback callback, byte[] body, String groupId, String streamId, long dt, int sid, + String ip, String msgUUID, long timeout, TimeUnit timeUnit, String messageKey) throws ProxysdkException { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) { throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString()); @@ -538,23 +769,23 @@ public class DefaultMessageSender implements MessageSender { } } - public void asyncsendMessageProxy(FileCallback callback, byte[] body, String groupId, String streamId, - long dt, int sid, String ip, String msgUUID, - long timeout, TimeUnit timeUnit) throws ProxysdkException { + @Deprecated + public void asyncsendMessageProxy(FileCallback callback, byte[] body, String groupId, String streamId, long dt, + int sid, String ip, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException { asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip, msgUUID, timeout, timeUnit, "minute"); } - public void asyncsendMessageFile(FileCallback callback, byte[] body, String groupId, - String streamId, long dt, int sid, String msgUUID, - long timeout, TimeUnit timeUnit) throws ProxysdkException { + @Deprecated + public void asyncsendMessageFile(FileCallback callback, byte[] body, String groupId, String streamId, long dt, + int sid, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException { asyncSendMetric(callback, body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit, "file"); } - public String sendMessageData(List<byte[]> bodyList, String groupId, - String streamId, long dt, int sid, boolean isSupportLF, String msgUUID, - long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) { + @Deprecated + public String sendMessageData(List<byte[]> bodyList, String groupId, String streamId, long dt, int sid, + boolean isSupportLF, String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) { @@ -574,9 +805,9 @@ public class DefaultMessageSender implements MessageSender { return null; } - private String sendMetric(byte[] body, String groupId, String streamId, long dt, int sid, String ip, - String msgUUID, - long timeout, TimeUnit timeUnit, String messageKey) { + @Deprecated + private String sendMetric(byte[] body, String groupId, String streamId, long dt, int sid, String ip, String msgUUID, + long timeout, TimeUnit timeUnit, String messageKey) { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) { return SendResult.INVALID_ATTRIBUTES.toString(); @@ -589,14 +820,15 @@ public class DefaultMessageSender implements MessageSender { return null; } - public String sendMessageProxy(byte[] body, String groupId, String streamId, - long dt, int sid, String ip, String msgUUID, - long timeout, TimeUnit timeUnit) { + @Deprecated + public String sendMessageProxy(byte[] body, String groupId, String streamId, long dt, int sid, String ip, + String msgUUID, long timeout, TimeUnit timeUnit) { return sendMetric(body, groupId, streamId, dt, sid, ip, msgUUID, timeout, timeUnit, "minute"); } + @Deprecated public String sendMessageFile(byte[] body, String groupId, String streamId, long dt, int sid, String msgUUID, - long timeout, TimeUnit timeUnit) { + long timeout, TimeUnit timeUnit) { return sendMetric(body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit, "file"); } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java index df1c4939a..fd6b0f275 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java @@ -39,7 +39,8 @@ public class ProxyUtils { static { Collections.addAll(invalidAttr, "groupId", "streamId", "dt", "msgUUID", "cp", "cnt", "mt", "m", "sid", "t", "NodeIP", "messageId", "_file_status_check", "_secretId", - "_signature", "_timeStamp", "_nonce", "_userName", "_clientIP", "_encyVersion", "_encyAesKey"); + "_signature", "_timeStamp", "_nonce", "_userName", "_clientIP", "_encyVersion", "_encyAesKey", + "proxySend", "errMsg", "errCode"); } public static boolean isAttrKeysValid(Map<String, String> attrsMap) {