This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9baa8b3d9d [INLONG-11749][SDK] Clean up unused configuration items and
functions (#11750)
9baa8b3d9d is described below
commit 9baa8b3d9da280f7b82a721dab15fbf52556851e
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Feb 11 19:45:39 2025 +0800
[INLONG-11749][SDK] Clean up unused configuration items and functions
(#11750)
* [INLONG-11749][SDK] Clean up unused configuration items and functions
* [INLONG-11749][SDK] Clean up unused configuration items and functions
---------
Co-authored-by: gosonzhang <[email protected]>
---
.../sdk/dataproxy/MsgSenderMultiFactory.java | 5 +-
.../sdk/dataproxy/MsgSenderSingleFactory.java | 4 +-
.../inlong/sdk/dataproxy/codec/EncodeObject.java | 397 ---------------------
.../sdk/dataproxy/common/ProxyClientConfig.java | 4 +-
.../inlong/sdk/dataproxy/common/SdkConsts.java | 26 --
.../inlong/sdk/dataproxy/common/SendResult.java | 43 ---
.../sdk/dataproxy/config/ProxyConfigManager.java | 8 +-
.../inlong/sdk/dataproxy/example/ExampleUtils.java | 5 +-
.../sdk/dataproxy/example/UdpClientExample.java | 283 ---------------
.../inlong/sdk/dataproxy/network/SequentialID.java | 7 -
.../dataproxy/sender/tcp/TcpMsgSenderConfig.java | 32 +-
.../inlong/sdk/dataproxy/utils/ProxyUtils.java | 119 ------
12 files changed, 18 insertions(+), 915 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
index bcf32defd3..559b7d7bac 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
@@ -23,6 +23,7 @@ import
org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,8 +41,8 @@ public class MsgSenderMultiFactory implements
MsgSenderFactory {
private final BaseMsgSenderFactory baseMsgSenderFactory;
public MsgSenderMultiFactory() {
- this.baseMsgSenderFactory = new BaseMsgSenderFactory(
- this, "iMultiFact-" + refCounter.incrementAndGet());
+ this.baseMsgSenderFactory = new BaseMsgSenderFactory(this,
+ "iMultiFact-" + ProxyUtils.getProcessPid() + "-" +
refCounter.incrementAndGet());
this.initialized.set(true);
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
index 3ca9821c43..4e99c6550b 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
@@ -44,8 +44,8 @@ public class MsgSenderSingleFactory implements
MsgSenderFactory {
public MsgSenderSingleFactory() {
synchronized (singletonRefCounter) {
if (singletonRefCounter.incrementAndGet() == 1) {
- baseMsgSenderFactory = new BaseMsgSenderFactory(
- this, "iSingleFct-" + refCounter.incrementAndGet());
+ baseMsgSenderFactory = new BaseMsgSenderFactory(this,
+ "iSingleFct-" + ProxyUtils.getProcessPid() + "-" +
refCounter.incrementAndGet());
initialized.set(true);
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
deleted file mode 100644
index 6e00e98158..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
+++ /dev/null
@@ -1,397 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.codec;
-
-import org.apache.inlong.common.enums.DataProxyErrCode;
-import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.common.msg.MsgType;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
-import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
-
-import com.google.common.base.Splitter;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class EncodeObject {
-
- private static final Splitter.MapSplitter MAP_SPLITTER =
Splitter.on(AttributeConstants.SEPARATOR).trimResults()
- .withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
-
- private String attributes;
- private String messageId;
- private int msgtype;
- private List<byte[]> bodylist;
- private String commonattr = "";
- private String messageKey = "data";
- private String proxyIp = "";
- // private long seqId
- private long dt;
- // package time
- private long packageTime = System.currentTimeMillis();
- private int cnt = -1;
- private boolean isReport = false;
- private boolean isGroupIdTransfer = false;
- private boolean isSupportLF = false;
- private boolean isAuth = false;
- private boolean isEncrypt = false;
- private boolean isCompress = true;
- private int groupIdNum = 0;
- private int streamIdNum = 0;
- private String groupId;
- private String streamId;
- private short load;
- private String userName = "";
- private String secretKey = "";
- private String msgUUID = null;
- private EncryptConfigEntry encryptEntry = null;
-
- private SendResult sendResult = SendResult.OK;
- private String errMsg;
- private String dpIp;
-
- /* Used by de_serialization. msgtype=7/8 */
- public EncodeObject(String attributes) {
- handleAttr(attributes);
- }
-
- /* Used by de_serialization. */
- public EncodeObject(List<byte[]> bodyList, String attributes) {
- this.bodylist = bodyList;
- this.attributes = attributes;
- handleAttr(attributes);
- }
-
- // used for bodylist initializtion,msgtype=3/5
- public EncodeObject(List<byte[]> bodyList, String attributes, String
messageId,
- int msgtype, boolean isCompress, final String groupId) {
- this.bodylist = bodyList;
- this.messageId = messageId;
- this.attributes = attributes + "&messageId=" + messageId;
- this.msgtype = msgtype;
- this.groupId = groupId;
- this.isCompress = isCompress;
- addRTMS(msgtype);
- }
-
- // used for bodylist initializtion,msgtype=7/8
- public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress,
- boolean isReport, boolean isGroupIdTransfer, long dt,
- long seqId, String groupId, String streamId, String commonattr) {
- this.bodylist = bodyList;
- this.msgtype = msgtype;
- this.isCompress = isCompress;
- this.isReport = isReport;
- this.dt = dt;
- this.isGroupIdTransfer = isGroupIdTransfer;
- this.commonattr = commonattr;
- this.messageId = String.valueOf(seqId);
- this.groupId = groupId;
- this.streamId = streamId;
- addRTMS(msgtype);
- }
-
- // file agent, used for bodylist initializtion,msgtype=7/8
- public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress,
- boolean isReport, boolean isGroupIdTransfer, long dt,
- long seqId, String groupId, String streamId, String commonattr,
- String messageKey, String proxyIp) {
- this.bodylist = bodyList;
- this.msgtype = msgtype;
- this.isCompress = isCompress;
- this.isReport = isReport;
- this.dt = dt;
- this.isGroupIdTransfer = isGroupIdTransfer;
- this.commonattr = commonattr;
- this.messageId = String.valueOf(seqId);
- this.groupId = groupId;
- this.streamId = streamId;
- this.messageKey = messageKey;
- this.proxyIp = proxyIp;
- addRTMS(msgtype);
- }
-
- private void handleAttr(String attributes) {
- if (StringUtils.isBlank(attributes)) {
- return;
- }
- Map<String, String> backAttrs = new
HashMap<>(MAP_SPLITTER.split(attributes));
- if (backAttrs.containsKey(AttributeConstants.MESSAGE_ID)) {
- this.messageId = backAttrs.get(AttributeConstants.MESSAGE_ID);
- }
- dpIp = backAttrs.get(AttributeConstants.MESSAGE_DP_IP);
-
- String errCode =
backAttrs.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE);
- // errCode is empty or equals 0 -> success
- if (StringUtils.isBlank(errCode) || Integer.parseInt(errCode) == 0) {
- this.sendResult = SendResult.OK;
- } else {
- // get errMsg
- this.errMsg =
backAttrs.get(AttributeConstants.MESSAGE_PROCESS_ERRMSG);
- if (StringUtils.isBlank(errMsg)) {
- this.errMsg =
DataProxyErrCode.valueOf(Integer.parseInt(errCode)).getErrMsg();
- }
- // sendResult
- this.sendResult = convertToSendResult(Integer.parseInt(errCode));
- }
- }
-
- private void addRTMS(int msgtype) {
- if (msgtype == MsgType.MSG_BIN_MULTI_BODY.getValue() || msgtype ==
MsgType.MSG_BIN_HEARTBEAT.getValue()) {
- if (StringUtils.isBlank(commonattr)) {
- this.commonattr = AttributeConstants.MSG_RPT_TIME + "=" +
System.currentTimeMillis();
- } else {
- this.commonattr += "&" + AttributeConstants.MSG_RPT_TIME + "="
+ System.currentTimeMillis();
- }
- } else {
- if (StringUtils.isBlank(attributes)) {
- this.attributes = AttributeConstants.MSG_RPT_TIME + "=" +
System.currentTimeMillis();
- } else {
- this.attributes += "&" + AttributeConstants.MSG_RPT_TIME + "="
+ System.currentTimeMillis();
- }
- }
- }
-
- private SendResult convertToSendResult(int errCode) {
- DataProxyErrCode dpErrCode = DataProxyErrCode.valueOf(errCode);
- switch (dpErrCode) {
- case SINK_SERVICE_UNREADY:
- return SendResult.SINK_SERVICE_UNREADY;
- case MISS_REQUIRED_GROUPID_ARGUMENT:
- case MISS_REQUIRED_STREAMID_ARGUMENT:
- case MISS_REQUIRED_DT_ARGUMENT:
- case UNSUPPORTED_EXTEND_FIELD_VALUE:
- return SendResult.INVALID_ATTRIBUTES;
- case MISS_REQUIRED_BODY_ARGUMENT:
- case EMPTY_MSG:
- return SendResult.INVALID_DATA;
- case BODY_EXCEED_MAX_LEN:
- return SendResult.BODY_EXCEED_MAX_LEN;
- case UNCONFIGURED_GROUPID_OR_STREAMID:
- return SendResult.UNCONFIGURED_GROUPID_OR_STREAMID;
- case PUT_EVENT_TO_CHANNEL_FAILURE:
- case NO_AVAILABLE_PRODUCER:
- case PRODUCER_IS_NULL:
- case SEND_REQUEST_TO_MQ_FAILURE:
- case MQ_RETURN_ERROR:
- case DUPLICATED_MESSAGE:
- return SendResult.DATAPROXY_FAIL_TO_RECEIVE;
- default:
- return SendResult.UNKOWN_ERROR;
- }
- }
-
- public String getMsgUUID() {
- return msgUUID;
- }
-
- public void setMsgUUID(String msgUUID) {
- this.msgUUID = msgUUID;
- }
-
- public boolean isGroupIdTransfer() {
- return isGroupIdTransfer;
- }
-
- public int getGroupIdNum() {
- return groupIdNum;
- }
-
- public int getStreamIdNum() {
- return streamIdNum;
- }
-
- public void setGroupIdAndStreamIdNum(int groupIdNum, int streamIdNum) {
- this.groupIdNum = groupIdNum;
- this.streamIdNum = streamIdNum;
- if (groupIdNum != 0 && streamIdNum != 0) {
- this.isGroupIdTransfer = true;
- }
- }
-
- public short getLoad() {
- return load;
- }
-
- public void setLoad(short load) {
- this.load = load;
- }
-
- public String getGroupId() {
- return groupId;
- }
-
- public void setGroupId(String groupId) {
- this.groupId = groupId;
- }
-
- public String getStreamId() {
- return streamId;
- }
-
- public void setStreamId(String streamId) {
- this.streamId = streamId;
- }
-
- public boolean isReport() {
- return isReport;
- }
-
- public void setReport(boolean isReport) {
- this.isReport = isReport;
- }
-
- public boolean isAuth() {
- return isAuth;
- }
-
- public void setAuth(boolean auth, final String userName, final String
secretKey) {
- this.isAuth = auth;
- this.userName = userName;
- this.secretKey = secretKey;
- }
-
- public String getUserName() {
- return userName;
- }
-
- public String getSecretKey() {
- return secretKey;
- }
-
- public boolean isEncrypt() {
- return isEncrypt;
- }
-
- public EncryptConfigEntry getEncryptEntry() {
- return encryptEntry;
- }
-
- public void setEncryptEntry(boolean isEncrypt, String userName,
EncryptConfigEntry encryptEntry) {
- this.isEncrypt = isEncrypt;
- if (userName != null) {
- this.userName = userName;
- }
- this.encryptEntry = encryptEntry;
- }
-
- public long getDt() {
- return dt;
- }
-
- public void setDt(long dt) {
- this.dt = dt;
- }
-
- public long getPackageTime() {
- return packageTime;
- }
-
- public void setPackageTime(long packageTime) {
- this.packageTime = packageTime;
- }
-
- public String getCommonattr() {
- return commonattr;
- }
-
- public void setCommonattr(String commonattr) {
- this.commonattr = commonattr;
- }
-
- public boolean isCompress() {
- return isCompress;
- }
-
- public List<byte[]> getBodylist() {
- return bodylist;
- }
-
- public int getMsgtype() {
- return msgtype;
- }
-
- public void setMsgtype(int msgtype) {
- this.msgtype = msgtype;
- }
-
- public String getAttributes() {
- return attributes;
- }
-
- public String getMessageId() {
- return messageId;
- }
-
- public void setMessageId(String messageId) {
- this.messageId = messageId;
- }
-
- public String getMessageKey() {
- return messageKey;
- }
-
- public void setMessageKey(String messageKey) {
- this.messageKey = messageKey;
- }
-
- public String getProxyIp() {
- return proxyIp;
- }
-
- public void setProxyIp(String proxyIp) {
- this.proxyIp = proxyIp;
- }
-
- public boolean isSupportLF() {
- return isSupportLF;
- }
-
- public void setSupportLF(boolean supportLF) {
- isSupportLF = supportLF;
- }
-
- public int getCnt() {
- return cnt;
- }
-
- public void setCnt(int cnt) {
- this.cnt = cnt;
- }
-
- public int getRealCnt() {
- if (bodylist != null) {
- return bodylist.size();
- }
- return 1;
- }
-
- public String getDpIp() {
- return dpIp;
- }
-
- public String getErrMsg() {
- return errMsg;
- }
-
- public SendResult getSendResult() {
- return sendResult;
- }
-}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
index acb6db81ba..4079eaf81e 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
@@ -64,8 +64,6 @@ public class ProxyClientConfig implements Cloneable {
private int mgrConnTimeoutMs = SdkConsts.VAL_DEF_TCP_CONNECT_TIMEOUT_MS;
// whether to start using local proxy configure
private boolean onlyUseLocalProxyConfig = false;
- // max retry count if meta query
- private int metaQryMaxRetryIfFail =
SdkConsts.VAL_DEF_RETRY_IF_CONFIG_SYNC_FAIL;
// meta sync interval ms
private long mgrMetaSyncInrMs =
SdkConsts.VAL_DEF_CONFIG_SYNC_INTERVAL_MIN *
SdkConsts.VAL_UNIT_MIN_TO_MS;
@@ -88,7 +86,7 @@ public class ProxyClientConfig implements Cloneable {
// node forced selection interval ms
private long forceReChooseInrMs = SdkConsts.VAL_DEF_FORCE_CHOOSE_INR_MS;
// metric setting
- private MetricConfig metricConfig = new MetricConfig();
+ private final MetricConfig metricConfig = new MetricConfig();
// report setting
private boolean enableReportAuthz = false;
private boolean enableReportEncrypt = false;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
index 948088bc8a..7979851d3e 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
@@ -122,31 +122,7 @@ public class SdkConsts {
public static final int VAL_MAX_HTTP_ASYNC_WORKER_IDLE_WAIT_MS = 3000;
public static final int VAL_MIN_HTTP_ASYNC_WORKER_IDLE_WAIT_MS = 10;
- public static final int MAX_TIMEOUT_CNT = 10;
public static final int LOAD_THRESHOLD = 0;
- public static final int CYCLE = 30;
-
- public static final int MSG_TYPE = 7;
- public static final int COMPRESS_SIZE = 120;
-
- /* Configure the thread pool size for sync message sending. */
- public static final int SYNC_THREAD_POOL_SIZE = 5;
- public static final int MAX_SYNC_THREAD_POOL_SIZE = 10;
-
- /* Configure the in-memory callback size for asynchronously message
sending. */
- public static final int ASYNC_CALLBACK_SIZE = 50000;
- public static final int MAX_ASYNC_CALLBACK_SIZE = 2000000;
-
- /* Configure the proxy IP list refresh parameters. */
- public static final int PROXY_UPDATE_INTERVAL_MINUTES = 5;
-
- /* one hour interval */
- public static final int PROXY_HTTP_UPDATE_INTERVAL_MINUTES = 60;
-
- public static final int MAX_LINE_CNT = 30;
-
- public static final String RECEIVE_BUFFER_SIZE = "receiveBufferSize";
- public static final String SEND_BUFFER_SIZE = "sendBufferSize";
public static final int FLAG_ALLOW_AUTH = 1 << 7;
public static final int FLAG_ALLOW_ENCRYPT = 1 << 6;
@@ -155,8 +131,6 @@ public class SdkConsts {
public static final int EXT_FIELD_FLAG_DISABLE_ID2NUM = 1 << 2;
public static final int EXT_FIELD_FLAG_SEP_BY_LF = 1 << 5;
- public static int DEFAULT_SENDER_MAX_ATTEMPT = 1;
-
/* Reserved attribute data size(bytes). */
public static int RESERVED_ATTRIBUTE_LENGTH = 256;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java
deleted file mode 100644
index 97716b4723..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendResult.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.common;
-
-public enum SendResult {
- OK,
- INVALID_ATTRIBUTES, // including DataProxyErrCode(100,101,102,112)
- TIMEOUT,
- CONNECTION_BREAK,
- THREAD_INTERRUPT,
- ASYNC_CALLBACK_BUFFER_FULL,
- NO_CONNECTION,
- INVALID_DATA, // including DataProxyErrCode(103, 111)
- BODY_EXCEED_MAX_LEN, // DataProxyErrCode(104)
- SINK_SERVICE_UNREADY, // DataProxyErrCode(1)
- UNCONFIGURED_GROUPID_OR_STREAMID, // DataProxyErrCode(113)
- TOPIC_IS_BLANK, // DataProxyErrCode(115)
- DATAPROXY_FAIL_TO_RECEIVE, // DataProxyErrCode(114,116,117,118,119,120)
- MESSAGE_TOO_LARGE,
- WRITE_OVER_WATERMARK, /* error when water overflow */
- MAX_FLIGHT_ON_ALL_CONNECTION,
- NO_REMOTE_NODE_META_INFOS,
- EMPTY_ACTIVE_NODE_SET,
- NO_VALID_REMOTE_NODE,
- SENDER_CLOSED,
-
- UNKOWN_ERROR
-}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index 52aa52e85e..805d158849 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -863,9 +863,11 @@ public class ProxyConfigManager extends Thread {
}
private void addAuthorizationInfo(HttpPost httpPost) {
- httpPost.addHeader(BasicAuth.BASIC_AUTH_HEADER,
-
BasicAuth.genBasicAuthCredential(mgrConfig.getMgrAuthSecretId(),
- mgrConfig.getMgrAuthSecretKey()));
+ if (mgrConfig.isEnableMgrAuthz()) {
+ httpPost.addHeader(BasicAuth.BASIC_AUTH_HEADER,
+
BasicAuth.genBasicAuthCredential(mgrConfig.getMgrAuthSecretId(),
+ mgrConfig.getMgrAuthSecretKey()));
+ }
}
private List<BasicNameValuePair> buildProxyNodeQueryParams() {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java
index b271aeccf1..653ea86147 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/ExampleUtils.java
@@ -23,6 +23,7 @@ import
org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.apache.commons.codec.binary.StringUtils;
@@ -53,13 +54,15 @@ public class ExampleUtils {
if (isMultiItem) {
// send single message
while (curCount++ < reqCnt) {
+ Map<String, String> filteredAttrs;
try {
if (curCount > 1) {
localAttrs.clear();
localAttrs.put("index", String.valueOf(curCount));
}
+ filteredAttrs = ProxyUtils.getValidAttrs(localAttrs);
eventInfo = new TcpEventInfo(groupId, streamId,
- System.currentTimeMillis(), localAttrs,
multiBodys);
+ System.currentTimeMillis(), filteredAttrs,
multiBodys);
} catch (Throwable ex) {
System.out.println("Build tcp event failure, ex=" +
ex);
continue;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
deleted file mode 100644
index abc661348c..0000000000
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.example;
-
-import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
-import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
-import org.apache.inlong.sdk.dataproxy.config.EncryptInfo;
-import org.apache.inlong.sdk.dataproxy.network.SequentialID;
-import org.apache.inlong.sdk.dataproxy.utils.EncryptUtil;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.DatagramPacket;
-import io.netty.channel.socket.nio.NioDatagramChannel;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.xerial.snappy.Snappy;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.security.SecureRandom;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.inlong.sdk.dataproxy.common.SdkConsts.FLAG_ALLOW_COMPRESS;
-import static
org.apache.inlong.sdk.dataproxy.common.SdkConsts.FLAG_ALLOW_ENCRYPT;
-
-public class UdpClientExample {
-
- private static final Logger logger =
LoggerFactory.getLogger(UdpClientExample.class);
-
- private static SequentialID idGenerator = new SequentialID();
-
- private static SecureRandom random = new SecureRandom();
-
- public static void main(String[] args) {
- long sentCount = 10;
- String groupId = "test_group_id";
- String streamId = "test_stream_id";
- String busIp = "127.0.0.1";
- int busPort = 46802;
- String attr = "";
- UdpClientExample demo = new UdpClientExample();
- Channel channel = demo.initUdpChannel();
- /*
- * It is recommended to use msg type 7. For others, please refer to
the official related documents Therefore,
- * use type 7 to assemble the message. For other types, please refer
to the sdk source code
- */
- try {
- int count = 0;
- while (count < sentCount) {
- if (count % 1000 == 0) {
- long seqId = idGenerator.getNextInt();
- long dt = System.currentTimeMillis() / 1000;
- EncodeObject encodeObject =
- demo.getEncodeObject(7, false,
- false, false, dt, seqId, groupId,
- streamId, attr);
- ByteBuf buffer = demo.getSendBuf(encodeObject);
- demo.sendUdpMessage(channel, busIp, busPort, buffer);
- TimeUnit.SECONDS.sleep(1);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public static String getRandomString(int length) {
- StringBuilder sb = new StringBuilder();
- String string = "i am bus test client!";
- for (int i = 0; i < length; i++) {
- int number = random.nextInt(string.length());
- sb.append(string.charAt(number));
- }
- return sb.toString();
- }
-
- public boolean sendUdpMessage(Channel channel, String ip, int port,
ByteBuf msg) {
- try {
- channel.writeAndFlush(new DatagramPacket(msg, new
InetSocketAddress(ip, port))).sync();
- logger.info("send = [{}/{}]", ip, port);
- } catch (InterruptedException e) {
- logger.info("send has exception e = {}", e);
- }
- return true;
- }
-
- private EncodeObject getEncodeObject(int msgType, boolean isCompress,
boolean isReport,
- boolean isGroupIdTransfer, long dt, long seqId, String groupId,
String streamId,
- String attr) throws UnsupportedEncodingException {
- EncodeObject encodeObject =
- new
EncodeObject(Collections.singletonList(getRandomString(5).getBytes("UTF-8")),
- msgType, isCompress, isReport, isGroupIdTransfer, dt,
seqId, groupId, streamId, attr);
- return encodeObject;
- }
-
- public ByteBuf getSendBuf(EncodeObject message) {
- ByteBuf buf = null;
- try {
- if (message.getMsgtype() == 7) {
- buf = writeToBuf7(message);
- }
- } catch (Exception e) {
- logger.error("{}", e.getMessage());
- e.printStackTrace();
- }
- return buf;
- }
-
- private ByteBuf writeToBuf7(EncodeObject object) {
- ByteBuf buf = null;
- try {
- int totalLength = 1 + 2 + 2 + 2 + 4 + 2 + 4 + 4 + 2 + 2;
- byte[] body = null;
- int cnt = 1;
-
- if (object.getBodylist() != null &&
!object.getBodylist().isEmpty()) {
- if (object.getCnt() > 0) {
- cnt = object.getCnt();
- } else {
- cnt = object.getBodylist().size();
- }
- ByteArrayOutputStream out = new ByteArrayOutputStream();
-
- if (object.isSupportLF()) {
- int totalCnt = 0;
- ByteArrayOutputStream data = new ByteArrayOutputStream();
- for (byte[] entry : object.getBodylist()) {
- if (totalCnt++ > 0) {
- data.write("\n".getBytes("utf8"));
- }
- data.write(entry);
- }
- ByteBuffer dataBuffer = ByteBuffer.allocate(4);
- dataBuffer.putInt(data.toByteArray().length);
- out.write(dataBuffer.array());
- out.write(data.toByteArray());
- } else {
- for (byte[] entry : object.getBodylist()) {
- ByteBuffer dataBuffer = ByteBuffer.allocate(4);
- dataBuffer.putInt(entry.length);
- out.write(dataBuffer.array());
- out.write(entry);
- }
- }
- body = out.toByteArray();
- }
- if (body != null) {
- if (object.isCompress()) {
- body = processCompress(body);
- }
- String endAttr = object.getCommonattr();
- if (object.isEncrypt()) {
- EncryptConfigEntry encryptEntry = object.getEncryptEntry();
- if (encryptEntry != null) {
- if (StringUtils.isNotBlank(endAttr)) {
- endAttr = endAttr + "&";
- }
- EncryptInfo encryptInfo =
encryptEntry.getRsaEncryptInfo();
- endAttr = endAttr + "_userName=" +
object.getUserName() + "&_encyVersion="
- + encryptInfo.getVersion() + "&_encyAesKey="
- + encryptInfo.getRsaEncryptedKey();
- body = EncryptUtil.aesEncrypt(body,
encryptInfo.getAesKey());
- }
- }
- if (!object.isGroupIdTransfer()) {
- if (StringUtils.isNotBlank(endAttr)) {
- endAttr = endAttr + "&";
- }
- endAttr = (endAttr + "groupId=" + object.getGroupId() +
"&streamId="
- + object.getStreamId());
- }
- if (StringUtils.isNotBlank(object.getMsgUUID())) {
- if (StringUtils.isNotBlank(endAttr)) {
- endAttr = endAttr + "&";
- }
- endAttr = endAttr + "msgUUID=" + object.getMsgUUID();
- }
- int msgType = 7;
- if (object.isEncrypt()) {
- msgType |= FLAG_ALLOW_ENCRYPT;
- }
- if (object.isCompress()) {
- msgType |= FLAG_ALLOW_COMPRESS;
- }
- totalLength = totalLength + body.length
- + endAttr.getBytes("utf8").length;
- buf = ByteBufAllocator.DEFAULT.buffer(4 + totalLength);
- buf.writeInt(totalLength);
- buf.writeByte(msgType);
- buf.writeShort(object.getGroupIdNum());
- buf.writeShort(object.getStreamIdNum());
- String bitStr = object.isSupportLF() ? "1" : "0";
- bitStr += (object.getMessageKey().equals("minute")) ? "1" :
"0";
- bitStr += (object.getMessageKey().equals("file")) ? "1" : "0";
- bitStr += !object.isGroupIdTransfer() ? "1" : "0";
- bitStr += object.isReport() ? "1" : "0";
- bitStr += "0";
- buf.writeShort(Integer.parseInt(bitStr, 2));
- buf.writeInt((int) object.getDt());
- buf.writeShort(cnt);
- buf.writeInt(Integer.valueOf(object.getMessageId()));
-
- buf.writeInt(body.length);
- buf.writeBytes(body);
-
- buf.writeShort(endAttr.getBytes("utf8").length);
- buf.writeBytes(endAttr.getBytes("utf8"));
- buf.writeShort(0xee01);
- }
- } catch (Exception e) {
- logger.error("{}", e.getMessage());
- e.printStackTrace();
- }
- return buf;
- }
-
- private byte[] processCompress(byte[] body) {
- try {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- out.write(body);
- int guessLen = Snappy.maxCompressedLength(out.size());
- byte[] tmpData = new byte[guessLen];
- int len = Snappy.compress(out.toByteArray(), 0,
- out.size(), tmpData, 0);
- body = new byte[len];
- System.arraycopy(tmpData, 0, body, 0, len);
- } catch (IOException e) {
- logger.error("{}", e.getMessage());
- e.printStackTrace();
- }
- return body;
- }
-
- public Channel initUdpChannel() {
- Channel channel = null;
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(new NioEventLoopGroup())
- .channel(NioDatagramChannel.class)
- .option(ChannelOption.SO_BROADCAST, true)
- .handler(new SimpleChannelInboundHandler<DatagramPacket>() {
-
- protected void channelRead0(ChannelHandlerContext var1,
- DatagramPacket dmsg) throws Exception {
- String msg =
dmsg.content().toString(StandardCharsets.UTF_8);
- System.out.println("from server:" + msg);
- }
- });
- try {
- channel = bootstrap.bind(0).sync().channel();
- } catch (Exception e) {
- logger.error("Connection has exception e = {}", e);
- }
- return channel;
- }
-}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java
index f4a396b940..443732a80d 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/SequentialID.java
@@ -17,8 +17,6 @@
package org.apache.inlong.sdk.dataproxy.network;
-import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
-
import java.security.SecureRandom;
import java.util.concurrent.atomic.AtomicInteger;
@@ -26,16 +24,11 @@ public class SequentialID {
private static final SecureRandom sRandom = new SecureRandom(
Long.toString(System.nanoTime()).getBytes());
- private final String ip = ProxyUtils.getLocalIp();
private final AtomicInteger id = new AtomicInteger(sRandom.nextInt());
public SequentialID() {
}
- public String getNextId() {
- return ip + "#" + id.incrementAndGet() + "#" +
System.currentTimeMillis();
- }
-
public int getNextInt() {
return id.incrementAndGet();
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSenderConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSenderConfig.java
index c30207347e..2fbeb0f8c5 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSenderConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSenderConfig.java
@@ -68,11 +68,6 @@ public class TcpMsgSenderConfig extends ProxyClientConfig
implements Cloneable {
private int maxAllowedSyncMsgTimeoutCnt =
SdkConsts.VAL_DEF_SYNC_MSG_TIMEOUT_CNT;
// the synchronization message timeout check duration ms
private long syncMsgTimeoutChkDurMs =
SdkConsts.VAL_DEF_SYNC_TIMEOUT_CHK_DUR_MS;
- // max sync send attempt
- private int maxSyncSendAttempt = SdkConsts.DEFAULT_SENDER_MAX_ATTEMPT;
-
- // async callback size
- private int asyncCallbackSize = SdkConsts.ASYNC_CALLBACK_SIZE;
public TcpMsgSenderConfig(boolean visitMgrByHttps,
String managerIP, int managerPort, String groupId) throws
ProxySdkException {
@@ -252,22 +247,6 @@ public class TcpMsgSenderConfig extends ProxyClientConfig
implements Cloneable {
SdkConsts.VAL_MIN_SYNC_TIMEOUT_CHK_DUR_MS,
syncMsgTimeoutChkDurMs);
}
- public int getMaxSyncSendAttempt() {
- return maxSyncSendAttempt;
- }
-
- public void setMaxSyncSendAttempt(int maxSyncSendAttempt) {
- this.maxSyncSendAttempt = maxSyncSendAttempt;
- }
-
- public int getTotalAsyncCallbackSize() {
- return asyncCallbackSize;
- }
-
- public void setTotalAsyncCallbackSize(int asyncCallbackSize) {
- this.asyncCallbackSize = asyncCallbackSize;
- }
-
@Override
public boolean equals(Object o) {
if (this == o)
@@ -293,9 +272,7 @@ public class TcpMsgSenderConfig extends ProxyClientConfig
implements Cloneable {
&& reconFailWaitMs == config.reconFailWaitMs
&& maxAllowedSyncMsgTimeoutCnt ==
config.maxAllowedSyncMsgTimeoutCnt
&& syncMsgTimeoutChkDurMs == config.syncMsgTimeoutChkDurMs
- && sdkMsgType == config.sdkMsgType
- && maxSyncSendAttempt == config.maxSyncSendAttempt
- && asyncCallbackSize == config.asyncCallbackSize;
+ && sdkMsgType == config.sdkMsgType;
}
@Override
@@ -305,8 +282,7 @@ public class TcpMsgSenderConfig extends ProxyClientConfig
implements Cloneable {
nettyWorkerThreadNum, rcvBufferSize, sendBufferSize,
connectTimeoutMs,
requestTimeoutMs, conCloseWaitPeriodMs, maxMsgInFlightPerConn,
frozenReconnectWaitMs, busyReconnectWaitMs, reconFailWaitMs,
- maxAllowedSyncMsgTimeoutCnt, maxSyncSendAttempt,
syncMsgTimeoutChkDurMs,
- asyncCallbackSize);
+ maxAllowedSyncMsgTimeoutCnt, syncMsgTimeoutChkDurMs);
}
@Override
@@ -342,9 +318,7 @@ public class TcpMsgSenderConfig extends ProxyClientConfig
implements Cloneable {
.append(",
busyReconnectWaitMs=").append(busyReconnectWaitMs)
.append(", reconFailWaitMs=").append(reconFailWaitMs)
.append(",
maxAllowedSyncMsgTimeoutCnt=").append(maxAllowedSyncMsgTimeoutCnt)
- .append(",
maxSyncSendAttempt=").append(maxSyncSendAttempt)
- .append(",
syncMsgTimeoutChkDurMs=").append(syncMsgTimeoutChkDurMs)
- .append(",
asyncCallbackSize=").append(asyncCallbackSize);
+ .append(",
syncMsgTimeoutChkDurMs=").append(syncMsgTimeoutChkDurMs);
return super.getSetting(strBuff);
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index cdcc5bc6c8..7aafca8237 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -19,8 +19,6 @@ package org.apache.inlong.sdk.dataproxy.utils;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.MsgType;
-import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
-import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -33,7 +31,6 @@ import java.net.InetAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -182,120 +179,4 @@ public class ProxyUtils {
}
return validAttrsMap;
}
-
- public static boolean isAttrKeysValid(Map<String, String> attrsMap) {
- if (attrsMap == null || attrsMap.size() == 0) {
- return false;
- }
- for (String key : attrsMap.keySet()) {
- if (SdkReservedWords.contains(key)) {
- logger.error("the attributes is invalid ,please check ! {}",
key);
- return false;
- }
- }
- return true;
- }
-
- public static boolean isDtValid(long dt) {
- if (String.valueOf(dt).length() != TIME_LENGTH) {
- logger.error("dt {} is error", dt);
- return false;
- }
- return true;
- }
-
- /**
- * check body valid
- *
- * @param body
- * @return
- */
- public static boolean isBodyValid(byte[] body) {
- if (body == null || body.length == 0) {
- logger.error("body is error {}", body);
- return false;
- }
- return true;
- }
-
- /**
- * check body valid
- *
- * @param bodyList
- * @return
- */
- public static boolean isBodyValid(List<byte[]> bodyList) {
- if (bodyList == null || bodyList.size() == 0) {
- logger.error("body is error");
- return false;
- }
- return true;
- }
-
- /**
- * Check if the body length exceeds the maximum limit, if the maximum
limit is less than 0, it is not checked
- * @param body
- * @param maxLen
- * @return
- */
- public static boolean isBodyLengthValid(byte[] body, int maxLen) {
- // Not valid if the maximum limit is less than or equal to 0
- if (maxLen < 0) {
- return true;
- }
- // Reserve space for attribute
- if (body.length > maxLen - SdkConsts.RESERVED_ATTRIBUTE_LENGTH) {
- logger.debug("body length({}) > max length({}) - fixed attribute
length({})",
- body.length, maxLen, SdkConsts.RESERVED_ATTRIBUTE_LENGTH);
- return false;
- }
- return true;
- }
-
- /**
- * Check if the total body length exceeds the maximum limit, if the
maximum limit is less than 0, it is not checked
- * @param bodyList
- * @param maxLen
- * @return
- */
- public static boolean isBodyLengthValid(List<byte[]> bodyList, int maxLen)
{
- // Not valid if the maximum limit is less than or equal to 0
- if (maxLen < 0) {
- return true;
- }
- int size = 0;
- for (byte[] body : bodyList) {
- size += body.length;
- }
- // Reserve space for attribute
- if (size > maxLen - SdkConsts.RESERVED_ATTRIBUTE_LENGTH) {
- logger.debug("bodyList length({}) > max length({}) - fixed
attribute length({})",
- size, maxLen, SdkConsts.RESERVED_ATTRIBUTE_LENGTH);
- return false;
- }
- return true;
- }
-
- public static long covertZeroDt(long dt) {
- if (dt == 0) {
- return System.currentTimeMillis();
- }
- return dt;
- }
-
- /**
- * valid client config
- *
- * @param tcpConfig
- */
- public static void validClientConfig(TcpMsgSenderConfig tcpConfig) {
- if (tcpConfig.isEnableMgrAuthz()) {
- if (StringUtils.isBlank(tcpConfig.getMgrAuthSecretId())) {
- throw new IllegalArgumentException("Authentication require
secretId not Blank!");
- }
- if (StringUtils.isBlank(tcpConfig.getMgrAuthSecretKey())) {
- throw new IllegalArgumentException("Authentication require
secretKey not Blank!");
- }
- }
- }
}