This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a658a091b3 [INLONG-11463][SDK] Remove deprecated APIs in the 
DefaultMessageSender class (#11464)
a658a091b3 is described below

commit a658a091b369d0d76c698298d0cfc7fef12d2ed1
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Wed Nov 6 20:39:30 2024 +0800

    [INLONG-11463][SDK] Remove deprecated APIs in the DefaultMessageSender 
class (#11464)
    
    Co-authored-by: gosonzhang <gosonzh...@tencent.com>
---
 .../inlong/agent/constant/CommonConstants.java     |   3 -
 .../plugin/sinks/filecollect/SenderManager.java    |   3 -
 .../inlong/sdk/dataproxy/DefaultMessageSender.java | 117 -----------
 .../inlong/sdk/dataproxy/ProxyClientConfig.java    |   9 -
 .../inlong/sdk/dataproxy/common/FileCallback.java  |  38 ----
 .../sdk/dataproxy/common/SendMessageCallback.java  |   4 +-
 .../sdk/dataproxy/example/MyFileCallBack.java      |  61 ------
 .../sdk/dataproxy/example/MyMessageCallBack.java   |  11 +-
 .../inlong/sdk/dataproxy/network/Sender.java       | 229 +--------------------
 .../sdk/dataproxy/threads/MetricWorkerThread.java  |  18 +-
 .../sdk/dataproxy/threads/TimeoutScanThread.java   |  10 +-
 11 files changed, 25 insertions(+), 478 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index 53a5bd976c..757db41afd 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -68,9 +68,6 @@ public class CommonConstants {
     public static final String PROXY_SENDER_MAX_RETRY = 
"proxy.sender.maxRetry";
     public static final int DEFAULT_PROXY_SENDER_MAX_RETRY = 5;
 
-    public static final String PROXY_IS_FILE = "proxy.isFile";
-    public static final boolean DEFAULT_IS_FILE = false;
-
     public static final String PROXY_CLIENT_IO_THREAD_NUM = 
"client.iothread.num";
     public static final int DEFAULT_PROXY_CLIENT_IO_THREAD_NUM =
             Runtime.getRuntime().availableProcessors();
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 984baf6de6..a37a171a37 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -87,7 +87,6 @@ public class SenderManager {
     private final int aliveConnectionNum;
     private final boolean isCompress;
     private final int msgType;
-    private final boolean isFile;
     private final long maxSenderTimeout;
     private final int maxSenderRetry;
     private final long retrySleepTime;
@@ -133,7 +132,6 @@ public class SenderManager {
                 CommonConstants.PROXY_SENDER_MAX_RETRY, 
CommonConstants.DEFAULT_PROXY_SENDER_MAX_RETRY);
         retrySleepTime = agentConf.getLong(
                 CommonConstants.PROXY_RETRY_SLEEP, 
CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
-        isFile = profile.getBoolean(CommonConstants.PROXY_IS_FILE, 
CommonConstants.DEFAULT_IS_FILE);
         ioThreadNum = 
profile.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
                 CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
         enableBusyWait = 
profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
@@ -200,7 +198,6 @@ public class SenderManager {
         ProxyClientConfig proxyClientConfig = new 
ProxyClientConfig(managerAddr, inlongGroupId, authSecretId,
                 authSecretKey);
         proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
-        proxyClientConfig.setFile(isFile);
         proxyClientConfig.setAliveConnections(aliveConnectionNum);
 
         proxyClientConfig.setIoThreadNum(ioThreadNum);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index ec3eff3bad..623e84d8c0 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -21,7 +21,6 @@ import org.apache.inlong.common.constant.ProtocolType;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.common.util.MessageUtils;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
-import org.apache.inlong.sdk.dataproxy.common.FileCallback;
 import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
@@ -833,122 +832,6 @@ public class DefaultMessageSender implements 
MessageSender {
         }
     }
 
-    @Deprecated
-    public void asyncsendMessageData(FileCallback callback, List<byte[]> 
bodyList, String groupId, String streamId,
-            long dt, int sid, boolean isSupportLF, String msgUUID, long 
timeout, TimeUnit timeUnit,
-            Map<String, String> extraAttrMap) throws ProxysdkException {
-        dt = ProxyUtils.covertZeroDt(dt);
-        if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
-                || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
-            throw new 
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
-        }
-        if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
-            throw new 
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
-        }
-        addIndexCnt(groupId, streamId, bodyList.size());
-
-        StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
-
-        if (msgtype == 7 || msgtype == 8) {
-            EncodeObject encodeObject = new EncodeObject(bodyList, msgtype,
-                    isCompress, isReport, isGroupIdTransfer,
-                    dt / 1000, sid, groupId, streamId, attrs.toString(), 
"data", "");
-            encodeObject.setSupportLF(isSupportLF);
-            sender.asyncSendMessageIndex(encodeObject, callback, msgUUID, 
timeout, timeUnit);
-        }
-    }
-
-    @Deprecated
-    private void asyncSendMetric(FileCallback callback, byte[] body, String 
groupId, String streamId, long dt, int sid,
-            String ip, String msgUUID, long timeout, TimeUnit timeUnit, String 
messageKey) throws ProxysdkException {
-        dt = ProxyUtils.covertZeroDt(dt);
-        if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
-            throw new 
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
-        }
-        if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
-            throw new 
ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
-        }
-        boolean isCompressEnd = false;
-        if (msgtype == 7 || msgtype == 8) {
-            sender.asyncSendMessageIndex(new EncodeObject(body, msgtype, 
isCompressEnd,
-                    isReport, isGroupIdTransfer, dt / 1000,
-                    sid, groupId, streamId, "", messageKey, ip), callback, 
msgUUID, timeout, timeUnit);
-        }
-    }
-
-    @Deprecated
-    public void asyncsendMessageProxy(FileCallback callback, byte[] body, 
String groupId, String streamId, long dt,
-            int sid, String ip, String msgUUID, long timeout, TimeUnit 
timeUnit) throws ProxysdkException {
-        asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip, 
msgUUID, timeout,
-                timeUnit, "minute");
-    }
-
-    @Deprecated
-    public void asyncsendMessageFile(FileCallback callback, byte[] body, 
String groupId, String streamId, long dt,
-            int sid, String msgUUID, long timeout, TimeUnit timeUnit) throws 
ProxysdkException {
-        asyncSendMetric(callback, body, groupId, streamId, dt, sid, "", 
msgUUID, timeout, timeUnit,
-                "file");
-    }
-
-    @Deprecated
-    public String sendMessageData(List<byte[]> bodyList, String groupId, 
String streamId, long dt, int sid,
-            boolean isSupportLF, String msgUUID, long timeout, TimeUnit 
timeUnit, Map<String, String> extraAttrMap) {
-        dt = ProxyUtils.covertZeroDt(dt);
-        if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
-                || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
-            return SendResult.INVALID_ATTRIBUTES.toString();
-        }
-        if (!ProxyUtils.isBodyLengthValid(bodyList, maxPacketLength)) {
-            return SendResult.BODY_EXCEED_MAX_LEN.toString();
-        }
-        addIndexCnt(groupId, streamId, bodyList.size());
-
-        StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
-
-        if (msgtype == 7 || msgtype == 8) {
-            EncodeObject encodeObject = new EncodeObject(bodyList, msgtype, 
isCompress,
-                    isReport, isGroupIdTransfer, dt / 1000,
-                    sid, groupId, streamId, attrs.toString(), "data", "");
-            encodeObject.setSupportLF(isSupportLF);
-            Function<Sender, String> sendOperation = (sender) -> 
sender.syncSendMessageIndex(encodeObject, msgUUID,
-                    timeout, timeUnit);
-            return attemptSendMessageIndex(sendOperation);
-        }
-        return null;
-    }
-
-    @Deprecated
-    private String sendMetric(byte[] body, String groupId, String streamId, 
long dt, int sid, String ip, String msgUUID,
-            long timeout, TimeUnit timeUnit, String messageKey) {
-        dt = ProxyUtils.covertZeroDt(dt);
-        if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
-            return SendResult.INVALID_ATTRIBUTES.toString();
-        }
-        if (!ProxyUtils.isBodyLengthValid(body, maxPacketLength)) {
-            return SendResult.BODY_EXCEED_MAX_LEN.toString();
-        }
-        if (msgtype == 7 || msgtype == 8) {
-            EncodeObject encodeObject = new EncodeObject(body, msgtype, false, 
isReport,
-                    isGroupIdTransfer, dt / 1000, sid, groupId, streamId, "", 
messageKey, ip);
-            Function<Sender, String> sendOperation = (sender) -> 
sender.syncSendMessageIndex(encodeObject, msgUUID,
-                    timeout, timeUnit);
-            return attemptSendMessageIndex(sendOperation);
-        }
-        return null;
-    }
-
-    @Deprecated
-    public String sendMessageProxy(byte[] body, String groupId, String 
streamId, long dt, int sid, String ip,
-            String msgUUID, long timeout, TimeUnit timeUnit) {
-        return sendMetric(body, groupId, streamId, dt, sid, ip, msgUUID, 
timeout, timeUnit, "minute");
-    }
-
-    @Deprecated
-    public String sendMessageFile(byte[] body, String groupId, String 
streamId, long dt, int sid, String msgUUID,
-            long timeout, TimeUnit timeUnit) {
-        return sendMetric(body, groupId, streamId, dt, sid, "", msgUUID, 
timeout, timeUnit, "file");
-    }
-
     private void shutdownInternalThreads() {
         indexCol.shutDown();
         MANAGER_FETCHER_THREAD_STARTED.set(false);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index d74f876fab..c3253805cb 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -39,7 +39,6 @@ public class ProxyClientConfig {
     private int proxyUpdateIntervalMinutes;
     private int proxyUpdateMaxRetry;
     private String inlongGroupId;
-    private boolean isFile = false;
     private boolean requestByHttp = true;
     private boolean isNeedDataEncry = false;
     private boolean needAuthentication = false;
@@ -196,14 +195,6 @@ public class ProxyClientConfig {
         return requestByHttp;
     }
 
-    public boolean isFile() {
-        return isFile;
-    }
-
-    public void setFile(boolean file) {
-        isFile = file;
-    }
-
     public String getInlongGroupId() {
         return inlongGroupId;
     }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/FileCallback.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/FileCallback.java
deleted file mode 100644
index 8fce78257e..0000000000
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/FileCallback.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.common;
-
-public abstract class FileCallback implements SendMessageCallback {
-
-    /* Invoked when a message is confirmed by TDBus. */
-    public void onMessageAck(String result) {
-    }
-
-    ;
-
-    public void onMessageAck(SendResult result) {
-    }
-
-    ;
-
-    /* Invoked when a message transportation interrupted by an exception. */
-    public void onException(Throwable e) {
-    }
-
-    ;
-}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
index fc80705031..9e83f4673c 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SendMessageCallback.java
@@ -20,8 +20,8 @@ package org.apache.inlong.sdk.dataproxy.common;
 public interface SendMessageCallback {
 
     /* Invoked when a message is confirmed by TDBus. */
-    public void onMessageAck(SendResult result);
+    void onMessageAck(SendResult result);
 
     /* Invoked when a message transportation interrupted by an exception. */
-    public void onException(Throwable e);
+    void onException(Throwable e);
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyFileCallBack.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyFileCallBack.java
deleted file mode 100644
index 3685d0ad53..0000000000
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyFileCallBack.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.example;
-
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.FileCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MyFileCallBack extends FileCallback {
-
-    private static final Logger logger = LoggerFactory
-            .getLogger(MyFileCallBack.class);
-    private DefaultMessageSender messageSender = null;
-    private Event event = null;
-
-    public MyFileCallBack() {
-
-    }
-
-    public MyFileCallBack(DefaultMessageSender messageSender, Event event) {
-        super();
-        this.messageSender = messageSender;
-        this.event = event;
-    }
-
-    public void onMessageAck(String result) {
-        logger.info("onMessageAck return result = {}", result);
-    }
-
-    public void onMessageAck(SendResult result) {
-        if (result == SendResult.OK) {
-            logger.info("onMessageAck return Ok");
-        } else {
-            logger.info("onMessageAck return failure = {}", result);
-        }
-    }
-
-    public void onException(Throwable e) {
-        logger.error("Send message failure, error {}", e.getMessage());
-        e.printStackTrace();
-    }
-
-}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
index 7aef6e705c..d9b5c08132 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/MyMessageCallBack.java
@@ -18,16 +18,17 @@
 package org.apache.inlong.sdk.dataproxy.example;
 
 import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.FileCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class MyMessageCallBack extends FileCallback {
+public class MyMessageCallBack implements SendMessageCallback {
 
     private static final Logger logger = LoggerFactory
             .getLogger(MyMessageCallBack.class);
+
     private DefaultMessageSender messageSender = null;
     private Event event = null;
 
@@ -41,10 +42,7 @@ public class MyMessageCallBack extends FileCallback {
         this.event = event;
     }
 
-    public void onMessageAck(String result) {
-        logger.info("onMessageAck return result = {}", result);
-    }
-
+    @Override
     public void onMessageAck(SendResult result) {
         if (result == SendResult.OK) {
             logger.info("onMessageAck return Ok");
@@ -53,6 +51,7 @@ public class MyMessageCallBack extends FileCallback {
         }
     }
 
+    @Override
     public void onException(Throwable e) {
         logger.error("Send message failure, error {}", e.getMessage());
         e.printStackTrace();
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 9581da1f80..50b3105b56 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -19,7 +19,6 @@ package org.apache.inlong.sdk.dataproxy.network;
 
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
-import org.apache.inlong.sdk.dataproxy.common.FileCallback;
 import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
@@ -63,7 +62,6 @@ public class Sender {
     private final TimeoutScanThread scanThread;
     private final ClientMgr clientMgr;
     private final ProxyClientConfig configure;
-    private final boolean isFile;
     private MetricWorkerThread metricWorker = null;
     private int clusterId = -1;
 
@@ -98,7 +96,6 @@ public class Sender {
                 throw new Exception("In OutNetwork isNeedDataEncry must be 
true!");
             }
         }
-        this.isFile = configure.isFile();
         scanThread = new TimeoutScanThread(callbacks, currentBufferSize, 
configure, clientMgr);
         scanThread.start();
 
@@ -172,15 +169,8 @@ public class Sender {
         if (callback == null) {
             return;
         }
-        if (isFile) {
-            String proxyip = channel.remoteAddress().toString();
-            ((FileCallback) 
callback.getCallback()).onMessageAck(result.toString()
-                    + "=" + proxyip.substring(1, proxyip.indexOf(':')));
-            currentBufferSize.addAndGet(-callback.getSize());
-        } else {
-            callback.getCallback().onMessageAck(result);
-            currentBufferSize.decrementAndGet();
-        }
+        callback.getCallback().onMessageAck(result);
+        currentBufferSize.decrementAndGet();
     }
 
     private SendResult syncSendInternalMessage(NettyClient client, 
EncodeObject encodeObject, String msgUUID,
@@ -287,190 +277,6 @@ public class Sender {
         return message;
     }
 
-    private SendResult syncSendMessageIndexInternal(NettyClient client, 
EncodeObject encodeObject, String msgUUID,
-            long timeout, TimeUnit timeUnit) throws ExecutionException, 
InterruptedException, TimeoutException {
-        if (client == null || !client.isActive()) {
-            chooseProxy.remove(encodeObject.getMessageId());
-            client = clientMgr.getClientByRoundRobin();
-            if (client == null) {
-                return SendResult.NO_CONNECTION;
-            }
-            chooseProxy.put(encodeObject.getMessageId(), client);
-        }
-
-        if (encodeObject.getMsgtype() == 7) {
-            int groupIdnum = 0;
-            int streamIdnum = 0;
-            if (encodeObject.getGroupId().equals(clientMgr.getGroupId())) {
-                groupIdnum = clientMgr.getGroupIdNum();
-                streamIdnum = 
clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null
-                        ? 
clientMgr.getStreamIdMap().get(encodeObject.getStreamId())
-                        : 0;
-            }
-            encodeObject.setGroupIdNum(groupIdnum);
-            encodeObject.setStreamIdNum(streamIdnum);
-            if (groupIdnum == 0 || streamIdnum == 0) {
-                encodeObject.setGroupIdTransfer(false);
-            }
-        }
-        if (this.configure.isNeedDataEncry()) {
-            encodeObject.setEncryptEntry(true, configure.getUserName(), 
clientMgr.getEncryptConfigEntry());
-        } else {
-            encodeObject.setEncryptEntry(false, null, null);
-        }
-        encodeObject.setMsgUUID(msgUUID);
-        SyncMessageCallable callable = new SyncMessageCallable(client, 
encodeObject, timeout, timeUnit);
-        syncCallables.put(encodeObject.getMessageId(), callable);
-
-        Future<SendResult> future = threadPool.submit(callable);
-        return future.get(timeout, timeUnit);
-    }
-
-    /**
-     * sync send
-     *
-     * @param encodeObject
-     * @param msgUUID
-     * @param timeout
-     * @param timeUnit
-     * @return
-     */
-    public String syncSendMessageIndex(EncodeObject encodeObject, String 
msgUUID, long timeout, TimeUnit timeUnit) {
-        try {
-            SendResult message = null;
-            NettyClient client = chooseProxy.get(encodeObject.getMessageId());
-            String proxyip = encodeObject.getProxyIp();
-            if (proxyip != null && proxyip.length() != 0) {
-                client = clientMgr.getContainProxy(proxyip);
-            }
-            if (isNotValidateAttr(encodeObject.getCommonattr(), 
encodeObject.getAttributes())) {
-                LOGGER.error("error attr format {} {}", 
encodeObject.getCommonattr(),
-                        encodeObject.getAttributes());
-                return SendResult.INVALID_ATTRIBUTES.toString();
-            }
-            try {
-                message = syncSendMessageIndexInternal(client, encodeObject,
-                        msgUUID, timeout, timeUnit);
-            } catch (InterruptedException e) {
-                // TODO Auto-generated catch block
-                LOGGER.error("send message error {}", getExceptionStack(e));
-                syncCallables.remove(encodeObject.getMessageId());
-                return SendResult.THREAD_INTERRUPT.toString();
-            } catch (ExecutionException e) {
-                // TODO Auto-generated catch block
-                LOGGER.error("ExecutionException {}", getExceptionStack(e));
-                syncCallables.remove(encodeObject.getMessageId());
-                return SendResult.UNKOWN_ERROR.toString();
-            } catch (TimeoutException e) {
-                // TODO Auto-generated catch block
-                LOGGER.error("TimeoutException {}", getExceptionStack(e));
-                // e.printStackTrace();
-                SyncMessageCallable syncMessageCallable = 
syncCallables.remove(encodeObject.getMessageId());
-                if (syncMessageCallable != null) {
-                    NettyClient tmpClient = syncMessageCallable.getClient();
-                    if (tmpClient != null) {
-                        Channel curChannel = tmpClient.getChannel();
-                        if (curChannel != null) {
-                            LOGGER.error("channel maybe busy {}", curChannel);
-                            scanThread.addTimeoutChannel(curChannel);
-                        }
-                    }
-                }
-                return SendResult.TIMEOUT.toString();
-            } catch (Throwable e) {
-                LOGGER.error("syncSendMessage exception {}", 
getExceptionStack(e));
-                syncCallables.remove(encodeObject.getMessageId());
-                return SendResult.UNKOWN_ERROR.toString();
-            }
-            scanThread.resetTimeoutChannel(client.getChannel());
-            return message.toString() + "=" + client.getServerIP();
-        } catch (Exception e) {
-            LOGGER.error("agent send error {}", getExceptionStack(e));
-            syncCallables.remove(encodeObject.getMessageId());
-            return SendResult.UNKOWN_ERROR.toString();
-        }
-    }
-
-    /**
-     * async send message index
-     *
-     * @param encodeObject
-     * @param callback
-     * @param msgUUID
-     * @param timeout
-     * @param timeUnit
-     * @throws ProxysdkException
-     */
-    public void asyncSendMessageIndex(EncodeObject encodeObject, FileCallback 
callback, String msgUUID, long timeout,
-            TimeUnit timeUnit) throws ProxysdkException {
-        NettyClient client = chooseProxy.get(encodeObject.getMessageId());
-        String proxyip = encodeObject.getProxyIp();
-        if (proxyip != null && proxyip.length() != 0) {
-            client = clientMgr.getContainProxy(proxyip);
-        }
-        if (client == null || !client.isActive()) {
-            chooseProxy.remove(encodeObject.getMessageId());
-            client = clientMgr.getClientByRoundRobin();
-            if (client == null) {
-                throw new 
ProxysdkException(SendResult.NO_CONNECTION.toString());
-            }
-            chooseProxy.put(encodeObject.getMessageId(), client);
-        }
-        if (currentBufferSize.get() >= asyncCallbackMaxSize) {
-            throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
-        }
-        int size = 1;
-        if (isFile) {
-            if (encodeObject.getBodyBytes() != null) {
-                size = encodeObject.getBodyBytes().length;
-            } else {
-                for (byte[] bytes : encodeObject.getBodylist()) {
-                    size = size + bytes.length;
-                }
-            }
-            if (currentBufferSize.addAndGet(size) >= asyncCallbackMaxSize) {
-                currentBufferSize.addAndGet(-size);
-                throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
-            }
-
-        } else {
-            if (currentBufferSize.incrementAndGet() >= asyncCallbackMaxSize) {
-                currentBufferSize.decrementAndGet();
-                throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
-            }
-        }
-        ConcurrentHashMap<String, QueueObject> tmpCallBackMap = new 
ConcurrentHashMap<>();
-        ConcurrentHashMap<String, QueueObject> msgQueueMap = 
callbacks.putIfAbsent(
-                client.getChannel(), tmpCallBackMap);
-        if (msgQueueMap == null) {
-            msgQueueMap = tmpCallBackMap;
-        }
-        msgQueueMap.put(encodeObject.getMessageId(), new 
QueueObject(System.currentTimeMillis(),
-                callback, size, timeout, timeUnit));
-        if (encodeObject.getMsgtype() == 7) {
-            int groupIdnum = 0;
-            int streamIdnum = 0;
-            if ((clientMgr.getGroupId().length() != 0) && 
(encodeObject.getGroupId().equals(clientMgr.getGroupId()))) {
-                groupIdnum = clientMgr.getGroupIdNum();
-                streamIdnum = 
(clientMgr.getStreamIdMap().get(encodeObject.getStreamId()) != null)
-                        ? 
clientMgr.getStreamIdMap().get(encodeObject.getStreamId())
-                        : 0;
-            }
-            encodeObject.setGroupIdNum(groupIdnum);
-            encodeObject.setStreamIdNum(streamIdnum);
-            if (groupIdnum == 0 || streamIdnum == 0) {
-                encodeObject.setGroupIdTransfer(false);
-            }
-        }
-        if (this.configure.isNeedDataEncry()) {
-            encodeObject.setEncryptEntry(true, configure.getUserName(), 
clientMgr.getEncryptConfigEntry());
-        } else {
-            encodeObject.setEncryptEntry(false, null, null);
-        }
-        encodeObject.setMsgUUID(msgUUID);
-        client.write(encodeObject);
-    }
-
     /**
      * whether is validate
      *
@@ -539,24 +345,9 @@ public class Sender {
             throw new 
ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
         int size = 1;
-        if (isFile) {
-            if (encodeObject.getBodyBytes() != null) {
-                size = encodeObject.getBodyBytes().length;
-            } else {
-                for (byte[] bytes : encodeObject.getBodylist()) {
-                    size = size + bytes.length;
-                }
-            }
-            if (currentBufferSize.addAndGet(size) >= asyncCallbackMaxSize) {
-                currentBufferSize.addAndGet(-size);
-                throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
-            }
-
-        } else {
-            if (currentBufferSize.incrementAndGet() >= asyncCallbackMaxSize) {
-                currentBufferSize.decrementAndGet();
-                throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
-            }
+        if (currentBufferSize.incrementAndGet() >= asyncCallbackMaxSize) {
+            currentBufferSize.decrementAndGet();
+            throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
         }
         ConcurrentHashMap<String, QueueObject> msgQueueMap =
                 callbacks.computeIfAbsent(client.getChannel(), (k) -> new 
ConcurrentHashMap<>());
@@ -623,14 +414,8 @@ public class Sender {
                     if (queueObject == null) {
                         continue;
                     }
-                    if (isFile) {
-                        ((FileCallback) queueObject.getCallback())
-                                
.onMessageAck(SendResult.CONNECTION_BREAK.toString());
-                        currentBufferSize.addAndGet(-queueObject.getSize());
-                    } else {
-                        
queueObject.getCallback().onMessageAck(SendResult.CONNECTION_BREAK);
-                        currentBufferSize.decrementAndGet();
-                    }
+                    
queueObject.getCallback().onMessageAck(SendResult.CONNECTION_BREAK);
+                    currentBufferSize.decrementAndGet();
                 }
                 msgQueueMap.clear();
             }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
index 270531bf5b..e735850360 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/MetricWorkerThread.java
@@ -19,7 +19,7 @@ package org.apache.inlong.sdk.dataproxy.threads;
 
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
-import org.apache.inlong.sdk.dataproxy.common.FileCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
 import org.apache.inlong.sdk.dataproxy.metric.MessageRecord;
 import org.apache.inlong.sdk.dataproxy.metric.MetricConfig;
@@ -192,7 +192,7 @@ public class MetricWorkerThread extends Thread implements 
Closeable {
         callBack.increaseRetry();
         try {
             if (callBack.getRetryCount() < 4) {
-                sender.asyncSendMessageIndex(encodeObject, callBack,
+                sender.asyncSendMessage(encodeObject, callBack,
                         String.valueOf(System.currentTimeMillis()), 20, 
TimeUnit.SECONDS);
             } else {
                 logger.error("Send metric failure: {} {}", 
encodeObject.getBodyBytes(), encodeObject.getBodylist());
@@ -267,7 +267,7 @@ public class MetricWorkerThread extends Thread implements 
Closeable {
         }
     }
 
-    private class MetricSendCallBack extends FileCallback {
+    private class MetricSendCallBack implements SendMessageCallback {
 
         private final EncodeObject encodeObject;
         private int retryCount = 0;
@@ -285,17 +285,17 @@ public class MetricWorkerThread extends Thread implements 
Closeable {
         }
 
         @Override
-        public void onMessageAck(String result) {
-            if (!SendResult.OK.toString().equals(result)) {
-                tryToSendMetricToManager(encodeObject, this);
-            } else {
+        public void onMessageAck(SendResult result) {
+            if (!SendResult.OK.equals(result)) {
                 logger.debug("Send metric is ok!");
+            } else {
+                tryToSendMetricToManager(encodeObject, this);
             }
         }
 
         @Override
-        public void onMessageAck(SendResult result) {
-
+        public void onException(Throwable e) {
+            //
         }
     }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
index 8c77eae109..f9e4980264 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/threads/TimeoutScanThread.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.sdk.dataproxy.threads;
 
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.common.FileCallback;
 import org.apache.inlong.sdk.dataproxy.common.SendResult;
 import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
 import org.apache.inlong.sdk.dataproxy.network.QueueObject;
@@ -143,13 +142,8 @@ public class TimeoutScanThread extends Thread {
                 // remove it before callback
                 QueueObject queueObject1 = 
messageIdCallbacks.remove(messageId);
                 if (queueObject1 != null) {
-                    if (config.isFile()) {
-                        ((FileCallback) 
queueObject1.getCallback()).onMessageAck(SendResult.TIMEOUT.toString());
-                        currentBufferSize.addAndGet(-queueObject1.getSize());
-                    } else {
-                        
queueObject1.getCallback().onMessageAck(SendResult.TIMEOUT);
-                        currentBufferSize.decrementAndGet();
-                    }
+                    
queueObject1.getCallback().onMessageAck(SendResult.TIMEOUT);
+                    currentBufferSize.decrementAndGet();
                 }
                 addTimeoutChannel(channel);
             }


Reply via email to