This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 004c2be5e4 [INLONG-11670][SDK] Rename the ProxysdkException class name to ProxySdkException (#11671) 004c2be5e4 is described below commit 004c2be5e45806618cb4f2dfe57a3365423d41c5 Author: Goson Zhang <4675...@qq.com> AuthorDate: Wed Jan 15 11:10:36 2025 +0800 [INLONG-11670][SDK] Rename the ProxysdkException class name to ProxySdkException (#11671) * [INLONG-11670][SDK] Rename the ProxysdkException class name to ProxySdkException * [INLONG-11670][SDK] Rename the ProxysdkException class name to ProxySdkException --------- Co-authored-by: gosonzhang <gosonzh...@tencent.com> --- .../plugin/sinks/filecollect/SenderManager.java | 4 +- .../inlong/common/msg/AttributeConstants.java | 10 +++- inlong-sdk/dataproxy-sdk/pom.xml | 22 ++++++++ .../inlong/sdk/dataproxy/DefaultMessageSender.java | 58 +++++++++++----------- .../apache/inlong/sdk/dataproxy/MessageSender.java | 18 +++---- .../inlong/sdk/dataproxy/ProxyClientConfig.java | 32 ++++++------ .../sdk/dataproxy/example/HttpClientExample.java | 4 +- .../ProxySdkException.java} | 24 ++++++--- .../inlong/sdk/dataproxy/network/IpUtils.java | 4 +- .../inlong/sdk/dataproxy/network/Sender.java | 17 ++++--- .../sdk/dataproxy/pb/PbProtocolMessageSender.java | 26 +++++----- 11 files changed, 131 insertions(+), 88 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index 9ac9083ad8..9ef20bdf4b 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -35,7 +35,7 @@ import org.apache.inlong.sdk.dataproxy.DefaultMessageSender; import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; import org.apache.inlong.sdk.dataproxy.common.SendResult; -import org.apache.inlong.sdk.dataproxy.network.ProxysdkException; +import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; import io.netty.util.concurrent.DefaultThreadFactory; import org.slf4j.Logger; @@ -271,7 +271,7 @@ public class SenderManager { private void asyncSendByMessageSender(SendMessageCallback cb, List<byte[]> bodyList, String groupId, String streamId, long dataTime, String msgUUID, - Map<String, String> extraAttrMap, boolean isProxySend) throws ProxysdkException { + Map<String, String> extraAttrMap, boolean isProxySend) throws ProxySdkException { sender.asyncSendMessage(cb, bodyList, groupId, streamId, dataTime, msgUUID, extraAttrMap, isProxySend); } diff --git a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java index 3402fc6455..974579d16e 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java @@ -62,6 +62,9 @@ public interface AttributeConstants { /* from where */ String FROM = "f"; + /* msg uuid */ + String MSG_UUID = "msgUUID"; + // whether to return a response, false: not need, true or not exist: need String MESSAGE_IS_ACK = "isAck"; @@ -101,11 +104,14 @@ public interface AttributeConstants { // Message reporting time, in milliseconds // Provided by the initial sender of the data, and passed to - // the downstream by the Bus without modification for the downstream to + // the downstream by the DataProxy without modification for the downstream to // calculate the end-to-end message delay; if this field does not exist in the request, - // it will be added by the Bus with the current time + // it will be added by the DataProxy with the current time String MSG_RPT_TIME = "rtms"; + // inlong sdk version + String PROXY_SDK_VERSION = "sdkVersion"; + // Audit version is used for audit to reconciliation String AUDIT_VERSION = "auditVersion"; } diff --git a/inlong-sdk/dataproxy-sdk/pom.xml b/inlong-sdk/dataproxy-sdk/pom.xml index 0041797502..a5943dd52f 100644 --- a/inlong-sdk/dataproxy-sdk/pom.xml +++ b/inlong-sdk/dataproxy-sdk/pom.xml @@ -174,6 +174,28 @@ </execution> </executions> </plugin> + <plugin> + <groupId>io.github.git-commit-id</groupId> + <artifactId>git-commit-id-maven-plugin</artifactId> + <version>4.9.9</version> + <configuration> + <generateGitPropertiesFile>true</generateGitPropertiesFile> + <generateGitPropertiesFilename>${project.build.outputDirectory}/git.properties</generateGitPropertiesFilename> + <includeOnlyProperties> + <includeOnlyProperty>^git.build.(version)$</includeOnlyProperty> + </includeOnlyProperties> + <commitIdGenerationMode>full</commitIdGenerationMode> + </configuration> + <executions> + <execution> + <id>get-the-git-infos</id> + <goals> + <goal>revision</goal> + </goals> + <phase>initialize</phase> + </execution> + </executions> + </plugin> </plugins> </build> diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java index 02157fe95a..92dec7b125 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java @@ -25,7 +25,7 @@ import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; import org.apache.inlong.sdk.dataproxy.common.SendResult; import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry; import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager; -import org.apache.inlong.sdk.dataproxy.network.ProxysdkException; +import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; import org.apache.inlong.sdk.dataproxy.network.Sender; import org.apache.inlong.sdk.dataproxy.network.SequentialID; import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread; @@ -443,25 +443,25 @@ public class DefaultMessageSender implements MessageSender { @Override public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, - String msgUUID, Map<String, String> extraAttrMap) throws ProxysdkException { + String msgUUID, Map<String, String> extraAttrMap) throws ProxySdkException { asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, extraAttrMap, false); } @Override public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, - String msgUUID) throws ProxysdkException { + String msgUUID) throws ProxySdkException { asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, false); } @Override public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, String groupId, String streamId, - long dt, String msgUUID) throws ProxysdkException { + long dt, String msgUUID) throws ProxySdkException { asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, false); } @Override public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, String groupId, String streamId, - long dt, String msgUUID, Map<String, String> extraAttrMap) throws ProxysdkException { + long dt, String msgUUID, Map<String, String> extraAttrMap) throws ProxySdkException { asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, extraAttrMap, false); } @@ -529,16 +529,16 @@ public class DefaultMessageSender implements MessageSender { * @param dt data report timestamp * @param msgUUID msg uuid * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ - * @throws ProxysdkException + * @throws ProxySdkException */ public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, - String streamId, long dt, String msgUUID, boolean isProxySend) throws ProxysdkException { + String streamId, long dt, String msgUUID, boolean isProxySend) throws ProxySdkException { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) { - throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString()); + throw new ProxySdkException(SendResult.INVALID_ATTRIBUTES.toString()); } if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) { - throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString()); + throw new ProxySdkException(SendResult.BODY_EXCEED_MAX_LEN.toString()); } addIndexCnt(groupId, streamId, 1); @@ -584,16 +584,16 @@ public class DefaultMessageSender implements MessageSender { * @param msgUUID msg uuid * @param extraAttrMap extra attributes * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ - * @throws ProxysdkException + * @throws ProxySdkException */ public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, - String msgUUID, Map<String, String> extraAttrMap, boolean isProxySend) throws ProxysdkException { + String msgUUID, Map<String, String> extraAttrMap, boolean isProxySend) throws ProxySdkException { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) { - throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString()); + throw new ProxySdkException(SendResult.INVALID_ATTRIBUTES.toString()); } if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) { - throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString()); + throw new ProxySdkException(SendResult.BODY_EXCEED_MAX_LEN.toString()); } addIndexCnt(groupId, streamId, 1); if (isProxySend) { @@ -635,16 +635,16 @@ public class DefaultMessageSender implements MessageSender { * @param dt data report time * @param msgUUID msg uuid * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ - * @throws ProxysdkException + * @throws ProxySdkException */ public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, - String groupId, String streamId, long dt, String msgUUID, boolean isProxySend) throws ProxysdkException { + String groupId, String streamId, long dt, String msgUUID, boolean isProxySend) throws ProxySdkException { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) { - throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString()); + throw new ProxySdkException(SendResult.INVALID_ATTRIBUTES.toString()); } if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) { - throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString()); + throw new ProxySdkException(SendResult.BODY_EXCEED_MAX_LEN.toString()); } addIndexCnt(groupId, streamId, bodyList.size()); String proxySend = ""; @@ -690,18 +690,18 @@ public class DefaultMessageSender implements MessageSender { * @param msgUUID msg uuid * @param extraAttrMap extra attributes * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ - * @throws ProxysdkException + * @throws ProxySdkException */ public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID, - Map<String, String> extraAttrMap, boolean isProxySend) throws ProxysdkException { + Map<String, String> extraAttrMap, boolean isProxySend) throws ProxySdkException { dt = ProxyUtils.covertZeroDt(dt); if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid( extraAttrMap)) { - throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString()); + throw new ProxySdkException(SendResult.INVALID_ATTRIBUTES.toString()); } if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) { - throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString()); + throw new ProxySdkException(SendResult.BODY_EXCEED_MAX_LEN.toString()); } addIndexCnt(groupId, streamId, bodyList.size()); if (isProxySend) { @@ -738,11 +738,11 @@ public class DefaultMessageSender implements MessageSender { * @param inlongStreamId * @param body * @param callback - * @throws ProxysdkException + * @throws ProxySdkException */ @Override public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback) - throws ProxysdkException { + throws ProxySdkException { this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, System.currentTimeMillis(), idGenerator.getNextId()); } @@ -755,10 +755,10 @@ public class DefaultMessageSender implements MessageSender { * @param body a single message * @param callback callback can be null * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ - * @throws ProxysdkException + * @throws ProxySdkException */ public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback, - boolean isProxySend) throws ProxysdkException { + boolean isProxySend) throws ProxySdkException { this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, System.currentTimeMillis(), idGenerator.getNextId(), isProxySend); } @@ -770,11 +770,11 @@ public class DefaultMessageSender implements MessageSender { * @param inlongStreamId streamId * @param bodyList list of messages * @param callback callback can be null - * @throws ProxysdkException + * @throws ProxySdkException */ @Override public void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<byte[]> bodyList, - SendMessageCallback callback) throws ProxysdkException { + SendMessageCallback callback) throws ProxySdkException { this.asyncSendMessage(callback, bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(), idGenerator.getNextId()); } @@ -787,10 +787,10 @@ public class DefaultMessageSender implements MessageSender { * @param bodyList list of messages * @param callback callback can be null * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ - * @throws ProxysdkException + * @throws ProxySdkException */ public void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<byte[]> bodyList, - SendMessageCallback callback, boolean isProxySend) throws ProxysdkException { + SendMessageCallback callback, boolean isProxySend) throws ProxySdkException { this.asyncSendMessage(callback, bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(), idGenerator.getNextId(), isProxySend); } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java index 2a4ae6313a..862586ab77 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java @@ -19,7 +19,7 @@ package org.apache.inlong.sdk.dataproxy; import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; import org.apache.inlong.sdk.dataproxy.common.SendResult; -import org.apache.inlong.sdk.dataproxy.network.ProxysdkException; +import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; import java.util.List; import java.util.Map; @@ -79,7 +79,7 @@ public interface MessageSender { */ void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, String msgUUID, - Map<String, String> extraAttrMap) throws ProxysdkException; + Map<String, String> extraAttrMap) throws ProxySdkException; /** * This method provides an asynchronized function which you want to send data without packing @@ -89,7 +89,7 @@ public interface MessageSender { * @param body The data will be sent */ void asyncSendMessage(SendMessageCallback callback, - byte[] body, String groupId, String streamId, long dt, String msgUUID) throws ProxysdkException; + byte[] body, String groupId, String streamId, long dt, String msgUUID) throws ProxySdkException; /** * This method provides an asynchronized function which you want to send data with packing @@ -98,7 +98,7 @@ public interface MessageSender { * @param bodyList The data will be sent,which is a collection consisting of byte arrays */ void asyncSendMessage(SendMessageCallback callback, - List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID) throws ProxysdkException; + List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID) throws ProxySdkException; /** * This method provides an asynchronized function which you want to send data with packing @@ -111,7 +111,7 @@ public interface MessageSender { */ void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID, - Map<String, String> extraAttrMap) throws ProxysdkException; + Map<String, String> extraAttrMap) throws ProxySdkException; /** * This method provides an asynchronized function which you want to send data.<br> @@ -121,10 +121,10 @@ public interface MessageSender { * @param inlongStreamId * @param body * @param callback callback can be null - * @throws ProxysdkException + * @throws ProxySdkException */ void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback) - throws ProxysdkException; + throws ProxySdkException; /** * This method provides an asynchronized function which you want to send datas.<br> @@ -134,9 +134,9 @@ public interface MessageSender { * @param inlongStreamId * @param bodyList * @param callback callback can be null - * @throws ProxysdkException + * @throws ProxySdkException */ void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<byte[]> bodyList, - SendMessageCallback callback) throws ProxysdkException; + SendMessageCallback callback) throws ProxySdkException; } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java index 1d03ee5b3a..fedf925aa1 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java @@ -17,9 +17,9 @@ package org.apache.inlong.sdk.dataproxy; +import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; import org.apache.inlong.sdk.dataproxy.metric.MetricConfig; import org.apache.inlong.sdk.dataproxy.network.IpUtils; -import org.apache.inlong.sdk.dataproxy.network.ProxysdkException; import lombok.Data; import org.apache.commons.lang3.StringUtils; @@ -99,18 +99,18 @@ public class ProxyClientConfig { /* pay attention to the last url parameter ip */ public ProxyClientConfig(String localHost, boolean visitManagerByHttp, String managerIp, - int managerPort, String inlongGroupId, String authSecretId, String authSecretKey) throws ProxysdkException { + int managerPort, String inlongGroupId, String authSecretId, String authSecretKey) throws ProxySdkException { if (StringUtils.isBlank(localHost)) { - throw new ProxysdkException("localHost is blank!"); + throw new ProxySdkException("localHost is blank!"); } if (StringUtils.isBlank(managerIp)) { - throw new ProxysdkException("managerIp is Blank!"); + throw new ProxySdkException("managerIp is Blank!"); } if (managerPort <= 0) { - throw new ProxysdkException("managerPort <= 0!"); + throw new ProxySdkException("managerPort <= 0!"); } if (StringUtils.isBlank(inlongGroupId)) { - throw new ProxysdkException("groupId is blank!"); + throw new ProxySdkException("groupId is blank!"); } this.inlongGroupId = inlongGroupId.trim(); this.visitManagerByHttp = visitManagerByHttp; @@ -126,10 +126,10 @@ public class ProxyClientConfig { /* pay attention to the last url parameter ip */ public ProxyClientConfig(String managerAddress, - String inlongGroupId, String authSecretId, String authSecretKey) throws ProxysdkException { + String inlongGroupId, String authSecretId, String authSecretKey) throws ProxySdkException { checkAndParseAddress(managerAddress); if (StringUtils.isBlank(inlongGroupId)) { - throw new ProxysdkException("groupId is blank!"); + throw new ProxySdkException("groupId is blank!"); } this.inlongGroupId = inlongGroupId.trim(); this.syncThreadPoolSize = ConfigConstants.SYNC_THREAD_POOL_SIZE; @@ -519,11 +519,11 @@ public class ProxyClientConfig { this.senderMaxAttempt = senderMaxAttempt; } - private void checkAndParseAddress(String managerAddress) throws ProxysdkException { + private void checkAndParseAddress(String managerAddress) throws ProxySdkException { if (StringUtils.isBlank(managerAddress) || (!managerAddress.startsWith(ConfigConstants.HTTP) && !managerAddress.startsWith(ConfigConstants.HTTPS))) { - throw new ProxysdkException("managerAddress is blank or missing http/https protocol"); + throw new ProxySdkException("managerAddress is blank or missing http/https protocol"); } String hostPortInfo; if (managerAddress.startsWith(ConfigConstants.HTTPS)) { @@ -533,25 +533,25 @@ public class ProxyClientConfig { hostPortInfo = managerAddress.substring(ConfigConstants.HTTP.length()); } if (StringUtils.isBlank(hostPortInfo)) { - throw new ProxysdkException("managerAddress must include host:port info!"); + throw new ProxySdkException("managerAddress must include host:port info!"); } String[] fields = hostPortInfo.split(":"); if (fields.length == 1) { - throw new ProxysdkException("managerAddress must include port info!"); + throw new ProxySdkException("managerAddress must include port info!"); } else if (fields.length > 2) { - throw new ProxysdkException("managerAddress must only include host:port info!"); + throw new ProxySdkException("managerAddress must only include host:port info!"); } if (StringUtils.isBlank(fields[0])) { - throw new ProxysdkException("managerAddress's host is blank!"); + throw new ProxySdkException("managerAddress's host is blank!"); } this.managerIP = fields[0].trim(); if (StringUtils.isBlank(fields[1])) { - throw new ProxysdkException("managerAddress's port is blank!"); + throw new ProxySdkException("managerAddress's port is blank!"); } try { this.managerPort = Integer.parseInt(fields[1]); } catch (Throwable ex) { - throw new ProxysdkException("managerAddress's port must be number!"); + throw new ProxySdkException("managerAddress's port must be number!"); } } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java index 2ba1938409..c22c3aed98 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/HttpClientExample.java @@ -19,8 +19,8 @@ package org.apache.inlong.sdk.dataproxy.example; import org.apache.inlong.common.constant.ProtocolType; import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; +import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; import org.apache.inlong.sdk.dataproxy.network.HttpProxySender; -import org.apache.inlong.sdk.dataproxy.network.ProxysdkException; import java.util.ArrayList; import java.util.List; @@ -59,7 +59,7 @@ public class HttpClientExample { proxyConfig.setDiscardOldMessage(true); proxyConfig.setProtocolType(ProtocolType.HTTP); sender = new HttpProxySender(proxyConfig); - } catch (ProxysdkException e) { + } catch (ProxySdkException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ProxysdkException.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/exception/ProxySdkException.java similarity index 50% rename from inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ProxysdkException.java rename to inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/exception/ProxySdkException.java index 29de17dcf2..c8b806c453 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/ProxysdkException.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/exception/ProxySdkException.java @@ -15,22 +15,34 @@ * limitations under the License. */ -package org.apache.inlong.sdk.dataproxy.network; +package org.apache.inlong.sdk.dataproxy.exception; -public class ProxysdkException extends Exception { +/** + * Proxy Sdk Exception + * + * Used for unacceptable situations when reporting messages, such as empty input parameters, + * illegal parameters, abnormal execution status, and exceptions encountered during execution that + * were not considered during design, etc. + * + * If this exception is thrown during the debugging phase, the caller needs to check and + * adjust the corresponding implementation according to the exception content; if the exception + * is encountered during operation; the caller can try a limited number of times, + * and discard this report if it fails after trying again. + */ +public class ProxySdkException extends Exception { - public ProxysdkException() { + public ProxySdkException() { } - public ProxysdkException(String message) { + public ProxySdkException(String message) { super(message); } - public ProxysdkException(String message, Throwable cause) { + public ProxySdkException(String message, Throwable cause) { super(message, cause); } - public ProxysdkException(Throwable cause) { + public ProxySdkException(Throwable cause) { super(cause); } } diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java index f3fbab04aa..90a0716772 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/IpUtils.java @@ -17,6 +17,8 @@ package org.apache.inlong.sdk.dataproxy.network; +import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; + import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.digest.HmacUtils; import org.slf4j.Logger; @@ -54,7 +56,7 @@ public class IpUtils { return ip; } - public static boolean validLocalIp(String currLocalHost) throws ProxysdkException { + public static boolean validLocalIp(String currLocalHost) throws ProxySdkException { String ip = "127.0.0.1"; try (DatagramSocket socket = new DatagramSocket()) { socket.connect(InetAddress.getByName("8.8.8.8"), 10002); diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java index 5ee4c93e4b..088ecb3d60 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java @@ -22,6 +22,7 @@ import org.apache.inlong.sdk.dataproxy.codec.EncodeObject; import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; import org.apache.inlong.sdk.dataproxy.common.SendResult; import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry; +import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; import org.apache.inlong.sdk.dataproxy.threads.MetricWorkerThread; import org.apache.inlong.sdk.dataproxy.threads.TimeoutScanThread; import org.apache.inlong.sdk.dataproxy.utils.LogCounter; @@ -308,13 +309,13 @@ public class Sender { * Following methods used by asynchronously message sending. */ public void asyncSendMessage(EncodeObject encodeObject, - SendMessageCallback callback, String msgUUID) throws ProxysdkException { + SendMessageCallback callback, String msgUUID) throws ProxySdkException { if (!started.get()) { if (callback != null) { callback.onMessageAck(SendResult.SENDER_CLOSED); return; } else { - throw new ProxysdkException(SendResult.SENDER_CLOSED.toString()); + throw new ProxySdkException(SendResult.SENDER_CLOSED.toString()); } } if (configure.isEnableMetric()) { @@ -331,7 +332,7 @@ public class Sender { callback.onMessageAck(SendResult.MAX_FLIGHT_ON_ALL_CONNECTION); return; } else { - throw new ProxysdkException(SendResult.MAX_FLIGHT_ON_ALL_CONNECTION.toString()); + throw new ProxySdkException(SendResult.MAX_FLIGHT_ON_ALL_CONNECTION.toString()); } } if (clientResult.getF0() != SendResult.OK) { @@ -339,7 +340,7 @@ public class Sender { callback.onMessageAck(clientResult.getF0()); return; } else { - throw new ProxysdkException(clientResult.getF0().toString()); + throw new ProxySdkException(clientResult.getF0().toString()); } } if (!clientResult.getF1().getChannel().isWritable()) { @@ -352,7 +353,7 @@ public class Sender { callback.onMessageAck(SendResult.WRITE_OVER_WATERMARK); return; } else { - throw new ProxysdkException(SendResult.WRITE_OVER_WATERMARK.toString()); + throw new ProxySdkException(SendResult.WRITE_OVER_WATERMARK.toString()); } } if (currentBufferSize.get() >= asyncCallbackMaxSize) { @@ -361,7 +362,7 @@ public class Sender { callback.onMessageAck(SendResult.ASYNC_CALLBACK_BUFFER_FULL); return; } else { - throw new ProxysdkException(SendResult.ASYNC_CALLBACK_BUFFER_FULL.toString()); + throw new ProxySdkException(SendResult.ASYNC_CALLBACK_BUFFER_FULL.toString()); } } if (isNotValidateAttr(encodeObject.getCommonattr(), encodeObject.getAttributes())) { @@ -374,7 +375,7 @@ public class Sender { callback.onMessageAck(SendResult.INVALID_ATTRIBUTES); return; } else { - throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString()); + throw new ProxySdkException(SendResult.INVALID_ATTRIBUTES.toString()); } } int size = 1; @@ -385,7 +386,7 @@ public class Sender { callback.onMessageAck(SendResult.ASYNC_CALLBACK_BUFFER_FULL); return; } else { - throw new ProxysdkException(SendResult.ASYNC_CALLBACK_BUFFER_FULL.toString()); + throw new ProxySdkException(SendResult.ASYNC_CALLBACK_BUFFER_FULL.toString()); } } ConcurrentHashMap<String, QueueObject> msgQueueMap = diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java index fc584f5a5a..aaf33941f3 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java @@ -22,7 +22,7 @@ import org.apache.inlong.sdk.commons.protocol.SdkEvent; import org.apache.inlong.sdk.dataproxy.MessageSender; import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; import org.apache.inlong.sdk.dataproxy.common.SendResult; -import org.apache.inlong.sdk.dataproxy.network.ProxysdkException; +import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException; import org.apache.inlong.sdk.dataproxy.pb.channel.BufferQueueChannel; import org.apache.inlong.sdk.dataproxy.pb.context.CallbackProfile; @@ -336,12 +336,12 @@ public class PbProtocolMessageSender implements MessageSender, Configurable { * @param dt * @param msgUUID * @param extraAttrMap - * @throws ProxysdkException + * @throws ProxySdkException */ @Override public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt, String msgUUID, Map<String, String> extraAttrMap) - throws ProxysdkException { + throws ProxySdkException { SdkEvent sdkEvent = new SdkEvent(); sdkEvent.setInlongGroupId(groupId); sdkEvent.setInlongStreamId(streamId); @@ -364,11 +364,11 @@ public class PbProtocolMessageSender implements MessageSender, Configurable { * @param streamId * @param dt * @param msgUUID - * @throws ProxysdkException + * @throws ProxySdkException */ @Override public void asyncSendMessage(SendMessageCallback callback, byte[] body, - String groupId, String streamId, long dt, String msgUUID) throws ProxysdkException { + String groupId, String streamId, long dt, String msgUUID) throws ProxySdkException { this.asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, null); } @@ -381,11 +381,11 @@ public class PbProtocolMessageSender implements MessageSender, Configurable { * @param streamId * @param dt * @param msgUUID - * @throws ProxysdkException + * @throws ProxySdkException */ @Override public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, - String groupId, String streamId, long dt, String msgUUID) throws ProxysdkException { + String groupId, String streamId, long dt, String msgUUID) throws ProxySdkException { this.asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, null); } @@ -399,12 +399,12 @@ public class PbProtocolMessageSender implements MessageSender, Configurable { * @param dt * @param msgUUID * @param extraAttrMap - * @throws ProxysdkException + * @throws ProxySdkException */ @Override public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID, - Map<String, String> extraAttrMap) throws ProxysdkException { + Map<String, String> extraAttrMap) throws ProxySdkException { List<CallbackProfile> events = new ArrayList<>(bodyList.size()); for (byte[] body : bodyList) { SdkEvent sdkEvent = new SdkEvent(); @@ -429,11 +429,11 @@ public class PbProtocolMessageSender implements MessageSender, Configurable { * @param inlongStreamId * @param body * @param callback - * @throws ProxysdkException + * @throws ProxySdkException */ @Override public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback) - throws ProxysdkException { + throws ProxySdkException { this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, System.currentTimeMillis(), null, null); } @@ -444,11 +444,11 @@ public class PbProtocolMessageSender implements MessageSender, Configurable { * @param inlongStreamId * @param bodyList * @param callback - * @throws ProxysdkException + * @throws ProxySdkException */ @Override public void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<byte[]> bodyList, - SendMessageCallback callback) throws ProxysdkException { + SendMessageCallback callback) throws ProxySdkException { this.asyncSendMessage(callback, bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(), null, null); }