This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 9baa8b3d9d [INLONG-11749][SDK] Clean up unused configuration items and functions (#11750) 9baa8b3d9d is described below commit 9baa8b3d9da280f7b82a721dab15fbf52556851e Author: Goson Zhang <4675...@qq.com> AuthorDate: Tue Feb 11 19:45:39 2025 +0800 [INLONG-11749][SDK] Clean up unused configuration items and functions (#11750) * [INLONG-11749][SDK] Clean up unused configuration items and functions * [INLONG-11749][SDK] Clean up unused configuration items and functions --------- Co-authored-by: gosonzhang <gosonzh...@tencent.com> --- .../sdk/dataproxy/MsgSenderMultiFactory.java | 5 +- .../sdk/dataproxy/MsgSenderSingleFactory.java | 4 +- .../inlong/sdk/dataproxy/codec/EncodeObject.java | 397 --------------------- .../sdk/dataproxy/common/ProxyClientConfig.java | 4 +- .../inlong/sdk/dataproxy/common/SdkConsts.java | 26 -- .../inlong/sdk/dataproxy/common/SendResult.java | 43 --- .../sdk/dataproxy/config/ProxyConfigManager.java | 8 +- .../inlong/sdk/dataproxy/example/ExampleUtils.java | 5 +- .../sdk/dataproxy/example/UdpClientExample.java | 283 --------------- .../inlong/sdk/dataproxy/network/SequentialID.java | 7 - .../dataproxy/sender/tcp/TcpMsgSenderConfig.java | 32 +- .../inlong/sdk/dataproxy/utils/ProxyUtils.java | 119 ------ 12 files changed, 18 insertions(+), 915 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java index bcf32defd3..559b7d7bac 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java @@ -23,6 +23,7 @@ import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig; import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; @@ -40,8 +41,8 @@ public class MsgSenderMultiFactory implements MsgSenderFactory { private final BaseMsgSenderFactory baseMsgSenderFactory; public MsgSenderMultiFactory() { - this.baseMsgSenderFactory = new BaseMsgSenderFactory( - this, "iMultiFact-" + refCounter.incrementAndGet()); + this.baseMsgSenderFactory = new BaseMsgSenderFactory(this, + "iMultiFact-" + ProxyUtils.getProcessPid() + "-" + refCounter.incrementAndGet()); this.initialized.set(true); } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java index 3ca9821c43..4e99c6550b 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java @@ -44,8 +44,8 @@ public class MsgSenderSingleFactory implements MsgSenderFactory { public MsgSenderSingleFactory() { synchronized (singletonRefCounter) { if (singletonRefCounter.incrementAndGet() == 1) { - baseMsgSenderFactory = new BaseMsgSenderFactory( - this, "iSingleFct-" + refCounter.incrementAndGet()); + baseMsgSenderFactory = new BaseMsgSenderFactory(this, + "iSingleFct-" + ProxyUtils.getProcessPid() + "-" + refCounter.incrementAndGet()); initialized.set(true); } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java deleted file mode 100644 index 6e00e98158..0000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java +++ /dev/null @@ -1,397 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.codec; - -import org.apache.inlong.common.enums.DataProxyErrCode; -import org.apache.inlong.common.msg.AttributeConstants; -import org.apache.inlong.common.msg.MsgType; -import org.apache.inlong.sdk.dataproxy.common.SendResult; -import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry; - -import com.google.common.base.Splitter; -import org.apache.commons.lang3.StringUtils; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class EncodeObject { - - private static final Splitter.MapSplitter MAP_SPLITTER = Splitter.on(AttributeConstants.SEPARATOR).trimResults() - .withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR); - - private String attributes; - private String messageId; - private int msgtype; - private List<byte[]> bodylist; - private String commonattr = ""; - private String messageKey = "data"; - private String proxyIp = ""; - // private long seqId - private long dt; - // package time - private long packageTime = System.currentTimeMillis(); - private int cnt = -1; - private boolean isReport = false; - private boolean isGroupIdTransfer = false; - private boolean isSupportLF = false; - private boolean isAuth = false; - private boolean isEncrypt = false; - private boolean isCompress = true; - private int groupIdNum = 0; - private int streamIdNum = 0; - private String groupId; - private String streamId; - private short load; - private String userName = ""; - private String secretKey = ""; - private String msgUUID = null; - private EncryptConfigEntry encryptEntry = null; - - private SendResult sendResult = SendResult.OK; - private String errMsg; - private String dpIp; - - /* Used by de_serialization. msgtype=7/8 */ - public EncodeObject(String attributes) { - handleAttr(attributes); - } - - /* Used by de_serialization. */ - public EncodeObject(List<byte[]> bodyList, String attributes) { - this.bodylist = bodyList; - this.attributes = attributes; - handleAttr(attributes); - } - - // used for bodylist initializtion,msgtype=3/5 - public EncodeObject(List<byte[]> bodyList, String attributes, String messageId, - int msgtype, boolean isCompress, final String groupId) { - this.bodylist = bodyList; - this.messageId = messageId; - this.attributes = attributes + "&messageId=" + messageId; - this.msgtype = msgtype; - this.groupId = groupId; - this.isCompress = isCompress; - addRTMS(msgtype); - } - - // used for bodylist initializtion,msgtype=7/8 - public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress, - boolean isReport, boolean isGroupIdTransfer, long dt, - long seqId, String groupId, String streamId, String commonattr) { - this.bodylist = bodyList; - this.msgtype = msgtype; - this.isCompress = isCompress; - this.isReport = isReport; - this.dt = dt; - this.isGroupIdTransfer = isGroupIdTransfer; - this.commonattr = commonattr; - this.messageId = String.valueOf(seqId); - this.groupId = groupId; - this.streamId = streamId; - addRTMS(msgtype); - } - - // file agent, used for bodylist initializtion,msgtype=7/8 - public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress, - boolean isReport, boolean isGroupIdTransfer, long dt, - long seqId, String groupId, String streamId, String commonattr, - String messageKey, String proxyIp) { - this.bodylist = bodyList; - this.msgtype = msgtype; - this.isCompress = isCompress; - this.isReport = isReport; - this.dt = dt; - this.isGroupIdTransfer = isGroupIdTransfer; - this.commonattr = commonattr; - this.messageId = String.valueOf(seqId); - this.groupId = groupId; - this.streamId = streamId; - this.messageKey = messageKey; - this.proxyIp = proxyIp; - addRTMS(msgtype); - } - - private void handleAttr(String attributes) { - if (StringUtils.isBlank(attributes)) { - return; - } - Map<String, String> backAttrs = new HashMap<>(MAP_SPLITTER.split(attributes)); - if (backAttrs.containsKey(AttributeConstants.MESSAGE_ID)) { - this.messageId = backAttrs.get(AttributeConstants.MESSAGE_ID); - } - dpIp = backAttrs.get(AttributeConstants.MESSAGE_DP_IP); - - String errCode = backAttrs.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE); - // errCode is empty or equals 0 -> success - if (StringUtils.isBlank(errCode) || Integer.parseInt(errCode) == 0) { - this.sendResult = SendResult.OK; - } else { - // get errMsg - this.errMsg = backAttrs.get(AttributeConstants.MESSAGE_PROCESS_ERRMSG); - if (StringUtils.isBlank(errMsg)) { - this.errMsg = DataProxyErrCode.valueOf(Integer.parseInt(errCode)).getErrMsg(); - } - // sendResult - this.sendResult = convertToSendResult(Integer.parseInt(errCode)); - } - } - - private void addRTMS(int msgtype) { - if (msgtype == MsgType.MSG_BIN_MULTI_BODY.getValue() || msgtype == MsgType.MSG_BIN_HEARTBEAT.getValue()) { - if (StringUtils.isBlank(commonattr)) { - this.commonattr = AttributeConstants.MSG_RPT_TIME + "=" + System.currentTimeMillis(); - } else { - this.commonattr += "&" + AttributeConstants.MSG_RPT_TIME + "=" + System.currentTimeMillis(); - } - } else { - if (StringUtils.isBlank(attributes)) { - this.attributes = AttributeConstants.MSG_RPT_TIME + "=" + System.currentTimeMillis(); - } else { - this.attributes += "&" + AttributeConstants.MSG_RPT_TIME + "=" + System.currentTimeMillis(); - } - } - } - - private SendResult convertToSendResult(int errCode) { - DataProxyErrCode dpErrCode = DataProxyErrCode.valueOf(errCode); - switch (dpErrCode) { - case SINK_SERVICE_UNREADY: - return SendResult.SINK_SERVICE_UNREADY; - case MISS_REQUIRED_GROUPID_ARGUMENT: - case MISS_REQUIRED_STREAMID_ARGUMENT: - case MISS_REQUIRED_DT_ARGUMENT: - case UNSUPPORTED_EXTEND_FIELD_VALUE: - return SendResult.INVALID_ATTRIBUTES; - case MISS_REQUIRED_BODY_ARGUMENT: - case EMPTY_MSG: - return SendResult.INVALID_DATA; - case BODY_EXCEED_MAX_LEN: - return SendResult.BODY_EXCEED_MAX_LEN; - case UNCONFIGURED_GROUPID_OR_STREAMID: - return SendResult.UNCONFIGURED_GROUPID_OR_STREAMID; - case PUT_EVENT_TO_CHANNEL_FAILURE: - case NO_AVAILABLE_PRODUCER: - case PRODUCER_IS_NULL: - case SEND_REQUEST_TO_MQ_FAILURE: - case MQ_RETURN_ERROR: - case DUPLICATED_MESSAGE: - return SendResult.DATAPROXY_FAIL_TO_RECEIVE; - default: - return SendResult.UNKOWN_ERROR; - } - } - - public String getMsgUUID() { - return msgUUID; - } - - public void setMsgUUID(String msgUUID) { - this.msgUUID = msgUUID; - } - - public boolean isGroupIdTransfer() { - return isGroupIdTransfer; - } - - public int getGroupIdNum() { - return groupIdNum; - } - - public int getStreamIdNum() { - return streamIdNum; - } - - public void setGroupIdAndStreamIdNum(int groupIdNum, int streamIdNum) { - this.groupIdNum = groupIdNum; - this.streamIdNum = streamIdNum; - if (groupIdNum != 0 && streamIdNum != 0) { - this.isGroupIdTransfer = true; - } - } - - public short getLoad() { - return load; - } - - public void setLoad(short load) { - this.load = load; - } - - public String getGroupId() { - return groupId; - } - - public void setGroupId(String groupId) { - this.groupId = groupId; - } - - public String getStreamId() { - return streamId; - } - - public void setStreamId(String streamId) { - this.streamId = streamId; - } - - public boolean isReport() { - return isReport; - } - - public void setReport(boolean isReport) { - this.isReport = isReport; - } - - public boolean isAuth() { - return isAuth; - } - - public void setAuth(boolean auth, final String userName, final String secretKey) { - this.isAuth = auth; - this.userName = userName; - this.secretKey = secretKey; - } - - public String getUserName() { - return userName; - } - - public String getSecretKey() { - return secretKey; - } - - public boolean isEncrypt() { - return isEncrypt; - } - - public EncryptConfigEntry getEncryptEntry() { - return encryptEntry; - } - - public void setEncryptEntry(boolean isEncrypt, String userName, EncryptConfigEntry encryptEntry) { - this.isEncrypt = isEncrypt; - if (userName != null) { - this.userName = userName; - } - this.encryptEntry = encryptEntry; - } - - public long getDt() { - return dt; - } - - public void setDt(long dt) { - this.dt = dt; - } - - public long getPackageTime() { - return packageTime; - } - - public void setPackageTime(long packageTime) { - this.packageTime = packageTime; - } - - public String getCommonattr() { - return commonattr; - } - - public void setCommonattr(String commonattr) { - this.commonattr = commonattr; - } - - public boolean isCompress() { - return isCompress; - } - - public List<byte[]> getBodylist() { - return bodylist; - } - - public int getMsgtype() { - return msgtype; - } - - public void setMsgtype(int msgtype) { - this.msgtype = msgtype; - } - - public String getAttributes() { - return attributes; - } - - public String getMessageId() { - return messageId; - } - - public void setMessageId(String messageId) { - this.messageId = messageId; - } - - public String getMessageKey() { - return messageKey; - } - - public void setMessageKey(String messageKey) { - this.messageKey = messageKey; - } - - public String getProxyIp() { - return proxyIp; - } - - public void setProxyIp(String proxyIp) { - this.proxyIp = proxyIp; - } - - public boolean isSupportLF() { - return isSupportLF; - } - - public void setSupportLF(boolean supportLF) { - isSupportLF = supportLF; - } - - public int getCnt() { - return cnt; - } - - public void setCnt(int cnt) { - this.cnt = cnt; - } - - public int getRealCnt() { - if (bodylist != null) { - return bodylist.size(); - } - return 1; - } - - public String getDpIp() { - return dpIp; - } - - public String getErrMsg() { - return errMsg; - } - - public SendResult getSendResult() { - return sendResult; - } -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java index acb6db81ba..4079eaf81e 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java @@ -64,8 +64,6 @@ public class ProxyClientConfig implements Cloneable { private int mgrConnTimeoutMs = SdkConsts.VAL_DEF_TCP_CONNECT_TIMEOUT_MS; // whether to start using local proxy configure private boolean onlyUseLocalProxyConfig = false; - // max retry count if meta query - private int metaQryMaxRetryIfFail = SdkConsts.VAL_DEF_RETRY_IF_CONFIG_SYNC_FAIL; // meta sync interval ms private long mgrMetaSyncInrMs = SdkConsts.VAL_DEF_CONFIG_SYNC_INTERVAL_MIN * SdkConsts.VAL_UNIT_MIN_TO_MS; @@ -88,7 +86,7 @@ public class ProxyClientConfig implements Cloneable { // node forced selection interval ms private long forceReChooseInrMs = SdkConsts.VAL_DEF_FORCE_CHOOSE_INR_MS; // metric setting - private MetricConfig metricConfig = new MetricConfig(); + private final MetricConfig metricConfig = new MetricConfig(); // report setting private boolean enableReportAuthz = false; private boolean enableReportEncrypt = false; diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java index 948088bc8a..7979851d3e 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java @@ -122,31 +122,7 @@ public class SdkConsts { public static final int VAL_MAX_HTTP_ASYNC_WORKER_IDLE_WAIT_MS = 3000; public static final int VAL_MIN_HTTP_ASYNC_WORKER_IDLE_WAIT_MS = 10; - public static final int MAX_TIMEOUT_CNT = 10; public static final int LOAD_THRESHOLD = 0; - public static final int CYCLE = 30; - - public static final int MSG_TYPE = 7; - public static final int COMPRESS_SIZE = 120; - - /* Configure the thread pool size for sync message sending. */ - public static final int SYNC_THREAD_POOL_SIZE = 5; - public static final int MAX_SYNC_THREAD_POOL_SIZE = 10; - - /* Configure the in-memory callback size for asynchronously message sending. */ - public static final int ASYNC_CALLBACK_SIZE = 50000; - public static final int MAX_ASYNC_CALLBACK_SIZE = 2000000; - - /* Configure the proxy IP list refresh parameters. */ - public static final int PROXY_UPDATE_INTERVAL_MINUTES = 5; - - /* one hour interval */ - public static final int PROXY_HTTP_UPDATE_INTERVAL_MINUTES = 60; - - public static final int MAX_LINE_CNT = 30; - - public static final String RECEIVE_BUFFER_SIZE = "receiveBufferSize"; - public static final String SEND_BUFFER_SIZE = "sendBufferSize"; public static final int FLAG_ALLOW_AUTH = 1 << 7; public static final int FLAG_ALLOW_ENCRYPT = 1 << 6; @@ -155,8 +131,6 @@ public class SdkConsts { public static final int EXT_FIELD_FLAG_DISABLE_ID2NUM = 1 << 2; public static final int EXT_FIELD_FLAG_SEP_BY_LF = 1 << 5; - public static int DEFAULT_SENDER_MAX_ATTEMPT = 1; - /* Reserved attribute data size(bytes). */ public static int RESERVED_ATTRIBUTE_LENGTH = 256; } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java deleted file mode 100644 index 97716b4723..0000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.common; - -public enum SendResult { - OK, - INVALID_ATTRIBUTES, // including DataProxyErrCode(100,101,102,112) - TIMEOUT, - CONNECTION_BREAK, - THREAD_INTERRUPT, - ASYNC_CALLBACK_BUFFER_FULL, - NO_CONNECTION, - INVALID_DATA, // including DataProxyErrCode(103, 111) - BODY_EXCEED_MAX_LEN, // DataProxyErrCode(104) - SINK_SERVICE_UNREADY, // DataProxyErrCode(1) - UNCONFIGURED_GROUPID_OR_STREAMID, // DataProxyErrCode(113) - TOPIC_IS_BLANK, // DataProxyErrCode(115) - DATAPROXY_FAIL_TO_RECEIVE, // DataProxyErrCode(114,116,117,118,119,120) - MESSAGE_TOO_LARGE, - WRITE_OVER_WATERMARK, /* error when water overflow */ - MAX_FLIGHT_ON_ALL_CONNECTION, - NO_REMOTE_NODE_META_INFOS, - EMPTY_ACTIVE_NODE_SET, - NO_VALID_REMOTE_NODE, - SENDER_CLOSED, - - UNKOWN_ERROR -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java index 52aa52e85e..805d158849 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java @@ -863,9 +863,11 @@ public class ProxyConfigManager extends Thread { } private void addAuthorizationInfo(HttpPost httpPost) { - httpPost.addHeader(BasicAuth.BASIC_AUTH_HEADER, - BasicAuth.genBasicAuthCredential(mgrConfig.getMgrAuthSecretId(), - mgrConfig.getMgrAuthSecretKey())); + if (mgrConfig.isEnableMgrAuthz()) { + httpPost.addHeader(BasicAuth.BASIC_AUTH_HEADER, + BasicAuth.genBasicAuthCredential(mgrConfig.getMgrAuthSecretId(), + mgrConfig.getMgrAuthSecretKey())); + } } private List<BasicNameValuePair> buildProxyNodeQueryParams() { diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java index b271aeccf1..653ea86147 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java @@ -23,6 +23,7 @@ import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo; import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSender; import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo; import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; import org.apache.commons.codec.binary.StringUtils; @@ -53,13 +54,15 @@ public class ExampleUtils { if (isMultiItem) { // send single message while (curCount++ < reqCnt) { + Map<String, String> filteredAttrs; try { if (curCount > 1) { localAttrs.clear(); localAttrs.put("index", String.valueOf(curCount)); } + filteredAttrs = ProxyUtils.getValidAttrs(localAttrs); eventInfo = new TcpEventInfo(groupId, streamId, - System.currentTimeMillis(), localAttrs, multiBodys); + System.currentTimeMillis(), filteredAttrs, multiBodys); } catch (Throwable ex) { System.out.println("Build tcp event failure, ex=" + ex); continue; diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java deleted file mode 100644 index abc661348c..0000000000 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dataproxy.example; - -import org.apache.inlong.sdk.dataproxy.codec.EncodeObject; -import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry; -import org.apache.inlong.sdk.dataproxy.config.EncryptInfo; -import org.apache.inlong.sdk.dataproxy.network.SequentialID; -import org.apache.inlong.sdk.dataproxy.utils.EncryptUtil; - -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOption; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.DatagramPacket; -import io.netty.channel.socket.nio.NioDatagramChannel; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.xerial.snappy.Snappy; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.security.SecureRandom; -import java.util.Collections; -import java.util.concurrent.TimeUnit; - -import static org.apache.inlong.sdk.dataproxy.common.SdkConsts.FLAG_ALLOW_COMPRESS; -import static org.apache.inlong.sdk.dataproxy.common.SdkConsts.FLAG_ALLOW_ENCRYPT; - -public class UdpClientExample { - - private static final Logger logger = LoggerFactory.getLogger(UdpClientExample.class); - - private static SequentialID idGenerator = new SequentialID(); - - private static SecureRandom random = new SecureRandom(); - - public static void main(String[] args) { - long sentCount = 10; - String groupId = "test_group_id"; - String streamId = "test_stream_id"; - String busIp = "127.0.0.1"; - int busPort = 46802; - String attr = ""; - UdpClientExample demo = new UdpClientExample(); - Channel channel = demo.initUdpChannel(); - /* - * It is recommended to use msg type 7. For others, please refer to the official related documents Therefore, - * use type 7 to assemble the message. For other types, please refer to the sdk source code - */ - try { - int count = 0; - while (count < sentCount) { - if (count % 1000 == 0) { - long seqId = idGenerator.getNextInt(); - long dt = System.currentTimeMillis() / 1000; - EncodeObject encodeObject = - demo.getEncodeObject(7, false, - false, false, dt, seqId, groupId, - streamId, attr); - ByteBuf buffer = demo.getSendBuf(encodeObject); - demo.sendUdpMessage(channel, busIp, busPort, buffer); - TimeUnit.SECONDS.sleep(1); - } - } - } catch (Exception e) { - e.printStackTrace(); - } - } - - public static String getRandomString(int length) { - StringBuilder sb = new StringBuilder(); - String string = "i am bus test client!"; - for (int i = 0; i < length; i++) { - int number = random.nextInt(string.length()); - sb.append(string.charAt(number)); - } - return sb.toString(); - } - - public boolean sendUdpMessage(Channel channel, String ip, int port, ByteBuf msg) { - try { - channel.writeAndFlush(new DatagramPacket(msg, new InetSocketAddress(ip, port))).sync(); - logger.info("send = [{}/{}]", ip, port); - } catch (InterruptedException e) { - logger.info("send has exception e = {}", e); - } - return true; - } - - private EncodeObject getEncodeObject(int msgType, boolean isCompress, boolean isReport, - boolean isGroupIdTransfer, long dt, long seqId, String groupId, String streamId, - String attr) throws UnsupportedEncodingException { - EncodeObject encodeObject = - new EncodeObject(Collections.singletonList(getRandomString(5).getBytes("UTF-8")), - msgType, isCompress, isReport, isGroupIdTransfer, dt, seqId, groupId, streamId, attr); - return encodeObject; - } - - public ByteBuf getSendBuf(EncodeObject message) { - ByteBuf buf = null; - try { - if (message.getMsgtype() == 7) { - buf = writeToBuf7(message); - } - } catch (Exception e) { - logger.error("{}", e.getMessage()); - e.printStackTrace(); - } - return buf; - } - - private ByteBuf writeToBuf7(EncodeObject object) { - ByteBuf buf = null; - try { - int totalLength = 1 + 2 + 2 + 2 + 4 + 2 + 4 + 4 + 2 + 2; - byte[] body = null; - int cnt = 1; - - if (object.getBodylist() != null && !object.getBodylist().isEmpty()) { - if (object.getCnt() > 0) { - cnt = object.getCnt(); - } else { - cnt = object.getBodylist().size(); - } - ByteArrayOutputStream out = new ByteArrayOutputStream(); - - if (object.isSupportLF()) { - int totalCnt = 0; - ByteArrayOutputStream data = new ByteArrayOutputStream(); - for (byte[] entry : object.getBodylist()) { - if (totalCnt++ > 0) { - data.write("\n".getBytes("utf8")); - } - data.write(entry); - } - ByteBuffer dataBuffer = ByteBuffer.allocate(4); - dataBuffer.putInt(data.toByteArray().length); - out.write(dataBuffer.array()); - out.write(data.toByteArray()); - } else { - for (byte[] entry : object.getBodylist()) { - ByteBuffer dataBuffer = ByteBuffer.allocate(4); - dataBuffer.putInt(entry.length); - out.write(dataBuffer.array()); - out.write(entry); - } - } - body = out.toByteArray(); - } - if (body != null) { - if (object.isCompress()) { - body = processCompress(body); - } - String endAttr = object.getCommonattr(); - if (object.isEncrypt()) { - EncryptConfigEntry encryptEntry = object.getEncryptEntry(); - if (encryptEntry != null) { - if (StringUtils.isNotBlank(endAttr)) { - endAttr = endAttr + "&"; - } - EncryptInfo encryptInfo = encryptEntry.getRsaEncryptInfo(); - endAttr = endAttr + "_userName=" + object.getUserName() + "&_encyVersion=" - + encryptInfo.getVersion() + "&_encyAesKey=" - + encryptInfo.getRsaEncryptedKey(); - body = EncryptUtil.aesEncrypt(body, encryptInfo.getAesKey()); - } - } - if (!object.isGroupIdTransfer()) { - if (StringUtils.isNotBlank(endAttr)) { - endAttr = endAttr + "&"; - } - endAttr = (endAttr + "groupId=" + object.getGroupId() + "&streamId=" - + object.getStreamId()); - } - if (StringUtils.isNotBlank(object.getMsgUUID())) { - if (StringUtils.isNotBlank(endAttr)) { - endAttr = endAttr + "&"; - } - endAttr = endAttr + "msgUUID=" + object.getMsgUUID(); - } - int msgType = 7; - if (object.isEncrypt()) { - msgType |= FLAG_ALLOW_ENCRYPT; - } - if (object.isCompress()) { - msgType |= FLAG_ALLOW_COMPRESS; - } - totalLength = totalLength + body.length - + endAttr.getBytes("utf8").length; - buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength); - buf.writeInt(totalLength); - buf.writeByte(msgType); - buf.writeShort(object.getGroupIdNum()); - buf.writeShort(object.getStreamIdNum()); - String bitStr = object.isSupportLF() ? "1" : "0"; - bitStr += (object.getMessageKey().equals("minute")) ? "1" : "0"; - bitStr += (object.getMessageKey().equals("file")) ? "1" : "0"; - bitStr += !object.isGroupIdTransfer() ? "1" : "0"; - bitStr += object.isReport() ? "1" : "0"; - bitStr += "0"; - buf.writeShort(Integer.parseInt(bitStr, 2)); - buf.writeInt((int) object.getDt()); - buf.writeShort(cnt); - buf.writeInt(Integer.valueOf(object.getMessageId())); - - buf.writeInt(body.length); - buf.writeBytes(body); - - buf.writeShort(endAttr.getBytes("utf8").length); - buf.writeBytes(endAttr.getBytes("utf8")); - buf.writeShort(0xee01); - } - } catch (Exception e) { - logger.error("{}", e.getMessage()); - e.printStackTrace(); - } - return buf; - } - - private byte[] processCompress(byte[] body) { - try { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - out.write(body); - int guessLen = Snappy.maxCompressedLength(out.size()); - byte[] tmpData = new byte[guessLen]; - int len = Snappy.compress(out.toByteArray(), 0, - out.size(), tmpData, 0); - body = new byte[len]; - System.arraycopy(tmpData, 0, body, 0, len); - } catch (IOException e) { - logger.error("{}", e.getMessage()); - e.printStackTrace(); - } - return body; - } - - public Channel initUdpChannel() { - Channel channel = null; - Bootstrap bootstrap = new Bootstrap(); - bootstrap.group(new NioEventLoopGroup()) - .channel(NioDatagramChannel.class) - .option(ChannelOption.SO_BROADCAST, true) - .handler(new SimpleChannelInboundHandler<DatagramPacket>() { - - protected void channelRead0(ChannelHandlerContext var1, - DatagramPacket dmsg) throws Exception { - String msg = dmsg.content().toString(StandardCharsets.UTF_8); - System.out.println("from server:" + msg); - } - }); - try { - channel = bootstrap.bind(0).sync().channel(); - } catch (Exception e) { - logger.error("Connection has exception e = {}", e); - } - return channel; - } -} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java index f4a396b940..443732a80d 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java @@ -17,8 +17,6 @@ package org.apache.inlong.sdk.dataproxy.network; -import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; - import java.security.SecureRandom; import java.util.concurrent.atomic.AtomicInteger; @@ -26,16 +24,11 @@ public class SequentialID { private static final SecureRandom sRandom = new SecureRandom( Long.toString(System.nanoTime()).getBytes()); - private final String ip = ProxyUtils.getLocalIp(); private final AtomicInteger id = new AtomicInteger(sRandom.nextInt()); public SequentialID() { } - public String getNextId() { - return ip + "#" + id.incrementAndGet() + "#" + System.currentTimeMillis(); - } - public int getNextInt() { return id.incrementAndGet(); } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSenderConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSenderConfig.java index c30207347e..2fbeb0f8c5 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSenderConfig.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSenderConfig.java @@ -68,11 +68,6 @@ public class TcpMsgSenderConfig extends ProxyClientConfig implements Cloneable { private int maxAllowedSyncMsgTimeoutCnt = SdkConsts.VAL_DEF_SYNC_MSG_TIMEOUT_CNT; // the synchronization message timeout check duration ms private long syncMsgTimeoutChkDurMs = SdkConsts.VAL_DEF_SYNC_TIMEOUT_CHK_DUR_MS; - // max sync send attempt - private int maxSyncSendAttempt = SdkConsts.DEFAULT_SENDER_MAX_ATTEMPT; - - // async callback size - private int asyncCallbackSize = SdkConsts.ASYNC_CALLBACK_SIZE; public TcpMsgSenderConfig(boolean visitMgrByHttps, String managerIP, int managerPort, String groupId) throws ProxySdkException { @@ -252,22 +247,6 @@ public class TcpMsgSenderConfig extends ProxyClientConfig implements Cloneable { SdkConsts.VAL_MIN_SYNC_TIMEOUT_CHK_DUR_MS, syncMsgTimeoutChkDurMs); } - public int getMaxSyncSendAttempt() { - return maxSyncSendAttempt; - } - - public void setMaxSyncSendAttempt(int maxSyncSendAttempt) { - this.maxSyncSendAttempt = maxSyncSendAttempt; - } - - public int getTotalAsyncCallbackSize() { - return asyncCallbackSize; - } - - public void setTotalAsyncCallbackSize(int asyncCallbackSize) { - this.asyncCallbackSize = asyncCallbackSize; - } - @Override public boolean equals(Object o) { if (this == o) @@ -293,9 +272,7 @@ public class TcpMsgSenderConfig extends ProxyClientConfig implements Cloneable { && reconFailWaitMs == config.reconFailWaitMs && maxAllowedSyncMsgTimeoutCnt == config.maxAllowedSyncMsgTimeoutCnt && syncMsgTimeoutChkDurMs == config.syncMsgTimeoutChkDurMs - && sdkMsgType == config.sdkMsgType - && maxSyncSendAttempt == config.maxSyncSendAttempt - && asyncCallbackSize == config.asyncCallbackSize; + && sdkMsgType == config.sdkMsgType; } @Override @@ -305,8 +282,7 @@ public class TcpMsgSenderConfig extends ProxyClientConfig implements Cloneable { nettyWorkerThreadNum, rcvBufferSize, sendBufferSize, connectTimeoutMs, requestTimeoutMs, conCloseWaitPeriodMs, maxMsgInFlightPerConn, frozenReconnectWaitMs, busyReconnectWaitMs, reconFailWaitMs, - maxAllowedSyncMsgTimeoutCnt, maxSyncSendAttempt, syncMsgTimeoutChkDurMs, - asyncCallbackSize); + maxAllowedSyncMsgTimeoutCnt, syncMsgTimeoutChkDurMs); } @Override @@ -342,9 +318,7 @@ public class TcpMsgSenderConfig extends ProxyClientConfig implements Cloneable { .append(", busyReconnectWaitMs=").append(busyReconnectWaitMs) .append(", reconFailWaitMs=").append(reconFailWaitMs) .append(", maxAllowedSyncMsgTimeoutCnt=").append(maxAllowedSyncMsgTimeoutCnt) - .append(", maxSyncSendAttempt=").append(maxSyncSendAttempt) - .append(", syncMsgTimeoutChkDurMs=").append(syncMsgTimeoutChkDurMs) - .append(", asyncCallbackSize=").append(asyncCallbackSize); + .append(", syncMsgTimeoutChkDurMs=").append(syncMsgTimeoutChkDurMs); return super.getSetting(strBuff); } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java index cdcc5bc6c8..7aafca8237 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java @@ -19,8 +19,6 @@ package org.apache.inlong.sdk.dataproxy.utils; import org.apache.inlong.common.msg.AttributeConstants; import org.apache.inlong.common.msg.MsgType; -import org.apache.inlong.sdk.dataproxy.common.SdkConsts; -import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -33,7 +31,6 @@ import java.net.InetAddress; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -182,120 +179,4 @@ public class ProxyUtils { } return validAttrsMap; } - - public static boolean isAttrKeysValid(Map<String, String> attrsMap) { - if (attrsMap == null || attrsMap.size() == 0) { - return false; - } - for (String key : attrsMap.keySet()) { - if (SdkReservedWords.contains(key)) { - logger.error("the attributes is invalid ,please check ! {}", key); - return false; - } - } - return true; - } - - public static boolean isDtValid(long dt) { - if (String.valueOf(dt).length() != TIME_LENGTH) { - logger.error("dt {} is error", dt); - return false; - } - return true; - } - - /** - * check body valid - * - * @param body - * @return - */ - public static boolean isBodyValid(byte[] body) { - if (body == null || body.length == 0) { - logger.error("body is error {}", body); - return false; - } - return true; - } - - /** - * check body valid - * - * @param bodyList - * @return - */ - public static boolean isBodyValid(List<byte[]> bodyList) { - if (bodyList == null || bodyList.size() == 0) { - logger.error("body is error"); - return false; - } - return true; - } - - /** - * Check if the body length exceeds the maximum limit, if the maximum limit is less than 0, it is not checked - * @param body - * @param maxLen - * @return - */ - public static boolean isBodyLengthValid(byte[] body, int maxLen) { - // Not valid if the maximum limit is less than or equal to 0 - if (maxLen < 0) { - return true; - } - // Reserve space for attribute - if (body.length > maxLen - SdkConsts.RESERVED_ATTRIBUTE_LENGTH) { - logger.debug("body length({}) > max length({}) - fixed attribute length({})", - body.length, maxLen, SdkConsts.RESERVED_ATTRIBUTE_LENGTH); - return false; - } - return true; - } - - /** - * Check if the total body length exceeds the maximum limit, if the maximum limit is less than 0, it is not checked - * @param bodyList - * @param maxLen - * @return - */ - public static boolean isBodyLengthValid(List<byte[]> bodyList, int maxLen) { - // Not valid if the maximum limit is less than or equal to 0 - if (maxLen < 0) { - return true; - } - int size = 0; - for (byte[] body : bodyList) { - size += body.length; - } - // Reserve space for attribute - if (size > maxLen - SdkConsts.RESERVED_ATTRIBUTE_LENGTH) { - logger.debug("bodyList length({}) > max length({}) - fixed attribute length({})", - size, maxLen, SdkConsts.RESERVED_ATTRIBUTE_LENGTH); - return false; - } - return true; - } - - public static long covertZeroDt(long dt) { - if (dt == 0) { - return System.currentTimeMillis(); - } - return dt; - } - - /** - * valid client config - * - * @param tcpConfig - */ - public static void validClientConfig(TcpMsgSenderConfig tcpConfig) { - if (tcpConfig.isEnableMgrAuthz()) { - if (StringUtils.isBlank(tcpConfig.getMgrAuthSecretId())) { - throw new IllegalArgumentException("Authentication require secretId not Blank!"); - } - if (StringUtils.isBlank(tcpConfig.getMgrAuthSecretKey())) { - throw new IllegalArgumentException("Authentication require secretKey not Blank!"); - } - } - } }