This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new eb5f890a23 [INLONG-9149][Agent] Add sender manager for file collect 
(#9150)
eb5f890a23 is described below

commit eb5f890a23e5b556d1c4e6c841b298c72388af0c
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Mon Oct 30 14:10:28 2023 +0800

    [INLONG-9149][Agent] Add sender manager for file collect (#9150)
---
 .../plugin/sinks/filecollect/SenderManager.java    | 444 +++++++++++++++++++++
 .../agent/plugin/task/filecollect/AgentErrMsg.java |  54 +--
 .../agent/plugin/task/filecollect/FileScanner.java |   6 +-
 .../agent/plugin/task/filecollect/WatchEntity.java |   2 +-
 .../agent/plugin/utils/file/NewDateUtils.java      |   2 +-
 .../inlong/agent/plugin/AgentBaseTestsHelper.java  |  22 +-
 .../sinks/filecollect/TestSenderManager.java       | 134 +++++++
 7 files changed, 619 insertions(+), 45 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
new file mode 100755
index 0000000000..a0e4d46c7f
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sinks.filecollect;
+
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.OffsetProfile;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.core.task.file.MemoryManager;
+import org.apache.inlong.agent.message.filecollect.PackageAckInfo;
+import org.apache.inlong.agent.message.filecollect.SenderMessage;
+import org.apache.inlong.agent.metrics.AgentMetricItem;
+import org.apache.inlong.agent.metrics.AgentMetricItemSet;
+import org.apache.inlong.agent.plugin.message.SequentialID;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.constant.ProtocolType;
+import org.apache.inlong.common.metric.MetricRegister;
+import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
+import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
+import static 
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_JOB_PROXY_SEND;
+import static org.apache.inlong.agent.constant.TaskConstants.INODE_INFO;
+import static org.apache.inlong.agent.constant.TaskConstants.JOB_PROXY_SEND;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
+
+/**
+ * proxy client
+ */
+public class SenderManager {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SenderManager.class);
+    private static final SequentialID SEQUENTIAL_ID = 
SequentialID.getInstance();
+    public final int SAVE_OFFSET_INTERVAL_MS = 1000;
+    private final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
+    // cache for group and sender list, share the map cross agent lifecycle.
+    private DefaultMessageSender sender;
+    private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
+    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
+            0, Integer.MAX_VALUE,
+            1L, TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            new AgentThreadFactory("sender-manager"));
+    // sharing worker threads between sender client
+    // in case of thread abusing.
+    private ThreadFactory SHARED_FACTORY;
+    private static final AtomicLong METRIC_INDEX = new AtomicLong(0);
+    private final String managerHost;
+    private final int managerPort;
+    private final String netTag;
+    private final String localhost;
+    private final boolean isLocalVisit;
+    private final int totalAsyncBufSize;
+    private final int aliveConnectionNum;
+    private final boolean isCompress;
+    private final int msgType;
+    private final boolean isFile;
+    private final long maxSenderTimeout;
+    private final int maxSenderRetry;
+    private final long retrySleepTime;
+    private final String inlongGroupId;
+    private final int maxSenderPerGroup;
+    private final String sourcePath;
+    private final boolean proxySend;
+    private volatile boolean shutdown = false;
+    // metric
+    private AgentMetricItemSet metricItemSet;
+    private Map<String, String> dimensions;
+    private OffsetManager offsetManager;
+    private int ioThreadNum;
+    private boolean enableBusyWait;
+    private String authSecretId;
+    private String authSecretKey;
+    protected int batchFlushInterval;
+    private List<PackageAckInfo> packageAckInfoList = new ArrayList<>();
+    private final ReentrantReadWriteLock packageAckInfoLock = new 
ReentrantReadWriteLock(true);
+    protected InstanceProfile profile;
+    private Random testRandom = new Random();
+    private volatile boolean offsetRunning = false;
+    private volatile boolean resendRunning = false;
+    private volatile boolean started = false;
+
+    public SenderManager(InstanceProfile profile, String inlongGroupId, String 
sourcePath) {
+        AgentConfiguration conf = AgentConfiguration.getAgentConf();
+        this.profile = profile;
+        managerHost = conf.get(AGENT_MANAGER_VIP_HTTP_HOST);
+        managerPort = conf.getInt(AGENT_MANAGER_VIP_HTTP_PORT);
+        proxySend = profile.getBoolean(JOB_PROXY_SEND, DEFAULT_JOB_PROXY_SEND);
+        localhost = profile.get(CommonConstants.PROXY_LOCAL_HOST, 
CommonConstants.DEFAULT_PROXY_LOCALHOST);
+        netTag = profile.get(CommonConstants.PROXY_NET_TAG, 
CommonConstants.DEFAULT_PROXY_NET_TAG);
+        isLocalVisit = profile.getBoolean(
+                CommonConstants.PROXY_IS_LOCAL_VISIT, 
CommonConstants.DEFAULT_PROXY_IS_LOCAL_VISIT);
+        totalAsyncBufSize = profile
+                .getInt(
+                        CommonConstants.PROXY_TOTAL_ASYNC_PROXY_SIZE,
+                        CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
+        aliveConnectionNum = profile
+                .getInt(
+                        CommonConstants.PROXY_ALIVE_CONNECTION_NUM, 
CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
+        isCompress = profile.getBoolean(
+                CommonConstants.PROXY_IS_COMPRESS, 
CommonConstants.DEFAULT_PROXY_IS_COMPRESS);
+        maxSenderPerGroup = profile.getInt(
+                CommonConstants.PROXY_MAX_SENDER_PER_GROUP, 
CommonConstants.DEFAULT_PROXY_MAX_SENDER_PER_GROUP);
+        msgType = profile.getInt(CommonConstants.PROXY_MSG_TYPE, 
CommonConstants.DEFAULT_PROXY_MSG_TYPE);
+        maxSenderTimeout = profile.getInt(
+                CommonConstants.PROXY_SENDER_MAX_TIMEOUT, 
CommonConstants.DEFAULT_PROXY_SENDER_MAX_TIMEOUT);
+        maxSenderRetry = profile.getInt(
+                CommonConstants.PROXY_SENDER_MAX_RETRY, 
CommonConstants.DEFAULT_PROXY_SENDER_MAX_RETRY);
+        retrySleepTime = profile.getLong(
+                CommonConstants.PROXY_RETRY_SLEEP, 
CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
+        isFile = profile.getBoolean(CommonConstants.PROXY_IS_FILE, 
CommonConstants.DEFAULT_IS_FILE);
+        offsetManager = OffsetManager.init();
+        ioThreadNum = 
profile.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
+                CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
+        enableBusyWait = 
profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT,
+                CommonConstants.DEFAULT_PROXY_CLIENT_ENABLE_BUSY_WAIT);
+        batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL, 
DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
+        authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
+        authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
+
+        this.sourcePath = sourcePath;
+        this.inlongGroupId = inlongGroupId;
+
+        this.dimensions = new HashMap<>();
+        dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
+        String metricName = String.join("-", this.getClass().getSimpleName(),
+                String.valueOf(METRIC_INDEX.incrementAndGet()));
+        this.metricItemSet = new AgentMetricItemSet(metricName);
+        MetricRegister.register(metricItemSet);
+        resendQueue = new LinkedBlockingQueue<>();
+
+    }
+
+    public void Start() throws Exception {
+        createMessageSender(inlongGroupId);
+        EXECUTOR_SERVICE.execute(flushOffset());
+        EXECUTOR_SERVICE.execute(flushResendQueue());
+        started = true;
+    }
+
+    public void Stop() {
+        LOGGER.info("stop send manager");
+        shutdown = true;
+        if (!started) {
+            return;
+        }
+        while (offsetRunning || resendRunning) {
+            AgentUtils.silenceSleepInMs(1);
+        }
+        closeMessageSender();
+        clearOffset();
+        LOGGER.info("stop send manager end");
+    }
+
+    private void closeMessageSender() {
+        if (sender != null) {
+            sender.close();
+        }
+    }
+
+    private AgentMetricItem getMetricItem(Map<String, String> otherDimensions) 
{
+        Map<String, String> dimensions = new HashMap<>();
+        dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
+        dimensions.putAll(otherDimensions);
+        return this.metricItemSet.findMetricItem(dimensions);
+    }
+
+    private AgentMetricItem getMetricItem(String groupId, String streamId) {
+        Map<String, String> dims = new HashMap<>();
+        dims.put(KEY_INLONG_GROUP_ID, groupId);
+        dims.put(KEY_INLONG_STREAM_ID, streamId);
+        return getMetricItem(dims);
+    }
+
+    /**
+     * createMessageSender
+     *
+     * @param tagName we use group id as tag name
+     */
+    private void createMessageSender(String tagName) throws Exception {
+
+        ProxyClientConfig proxyClientConfig = new ProxyClientConfig(
+                localhost, isLocalVisit, managerHost, managerPort, tagName, 
netTag, authSecretId, authSecretKey);
+        proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
+        proxyClientConfig.setFile(isFile);
+        proxyClientConfig.setAliveConnections(aliveConnectionNum);
+
+        proxyClientConfig.setIoThreadNum(ioThreadNum);
+        proxyClientConfig.setEnableBusyWait(enableBusyWait);
+        proxyClientConfig.setProtocolType(ProtocolType.TCP);
+
+        SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-" + 
sourcePath,
+                Thread.currentThread().isDaemon());
+
+        DefaultMessageSender sender = new 
DefaultMessageSender(proxyClientConfig, SHARED_FACTORY);
+        sender.setMsgtype(msgType);
+        sender.setCompress(isCompress);
+        this.sender = sender;
+    }
+
+    public void sendBatch(SenderMessage message) {
+        while (!resendQueue.isEmpty()) {
+            AgentUtils.silenceSleepInMs(retrySleepTime);
+        }
+        addAckInfo(message.getAckInfo());
+        sendBatchWithRetryCount(message, 0);
+    }
+
+    private void addAckInfo(PackageAckInfo info) {
+        packageAckInfoLock.writeLock().lock();
+        packageAckInfoList.add(info);
+        packageAckInfoLock.writeLock().unlock();
+    }
+
+    public boolean sendFinished() {
+        boolean finished = false;
+        packageAckInfoLock.writeLock().lock();
+        if (packageAckInfoList.isEmpty()) {
+            finished = true;
+        }
+        packageAckInfoLock.writeLock().unlock();
+        return finished;
+    }
+
+    /**
+     * flushOffset
+     */
+    private void doFlushOffset() {
+        packageAckInfoLock.writeLock().lock();
+        PackageAckInfo info = null;
+        for (int i = 0; i < packageAckInfoList.size();) {
+            if (packageAckInfoList.get(i).getHasAck()) {
+                info = packageAckInfoList.remove(i);
+                
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, info.getLen());
+            } else {
+                break;
+            }
+        }
+        if (info != null) {
+            LOGGER.info("save offset {} taskId {} instanceId {}", 
info.getOffset(), profile.getTaskId(),
+                    profile.getInstanceId());
+            OffsetProfile offsetProfile = new 
OffsetProfile(profile.getTaskId(), profile.getInstanceId(),
+                    info.getOffset(), profile.get(INODE_INFO));
+            offsetManager.setOffset(offsetProfile);
+        }
+        packageAckInfoLock.writeLock().unlock();
+    }
+
+    private void clearOffset() {
+        packageAckInfoLock.writeLock().lock();
+        for (int i = 0; i < packageAckInfoList.size();) {
+            MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, 
packageAckInfoList.remove(i).getLen());
+        }
+        packageAckInfoLock.writeLock().unlock();
+    }
+
+    /**
+     * Send message to proxy by batch, use message cache.
+     */
+    private void sendBatchWithRetryCount(SenderMessage message, int retry) {
+        boolean suc = false;
+        while (!suc) {
+            try {
+                AgentSenderCallback cb = new AgentSenderCallback(message, 
retry);
+                asyncSendByMessageSender(cb, message.getDataList(), 
message.getGroupId(),
+                        message.getStreamId(), message.getDataTime(), 
SEQUENTIAL_ID.getNextUuid(),
+                        maxSenderTimeout, TimeUnit.SECONDS, 
message.getExtraMap(), proxySend);
+                getMetricItem(message.getGroupId(), 
message.getStreamId()).pluginSendCount.addAndGet(
+                        message.getMsgCnt());
+                suc = true;
+            } catch (Exception exception) {
+                suc = false;
+                if (retry > maxSenderRetry) {
+                    if (retry % 10 == 0) {
+                        LOGGER.error("max retry reached, sample log Exception 
caught", exception);
+                    }
+                } else {
+                    LOGGER.error("Exception caught", exception);
+                }
+                retry++;
+                AgentUtils.silenceSleepInMs(retrySleepTime);
+            }
+        }
+    }
+
+    private void asyncSendByMessageSender(SendMessageCallback cb,
+            List<byte[]> bodyList, String groupId, String streamId, long dt, 
String msgUUID,
+            long timeout, TimeUnit timeUnit,
+            Map<String, String> extraAttrMap, boolean isProxySend) throws 
ProxysdkException {
+        sender.asyncSendMessage(cb, bodyList, groupId,
+                streamId, dt, msgUUID,
+                timeout, timeUnit, extraAttrMap, isProxySend);
+    }
+
+    /**
+     * flushResendQueue
+     *
+     * @return thread runner
+     */
+    private Runnable flushResendQueue() {
+        return () -> {
+            AgentThreadFactory.nameThread(
+                    "flushResendQueue-" + profile.getTaskId() + "-" + 
profile.getInstanceId());
+            LOGGER.info("start flush resend queue {}:{}", inlongGroupId, 
sourcePath);
+            resendRunning = true;
+            while (!shutdown) {
+                try {
+                    AgentSenderCallback callback = resendQueue.poll(1, 
TimeUnit.SECONDS);
+                    if (callback != null) {
+                        sendBatchWithRetryCount(callback.message, 
callback.retry + 1);
+                    }
+                } catch (Exception ex) {
+                    LOGGER.error("error caught", ex);
+                } catch (Throwable t) {
+                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
t);
+                } finally {
+                    AgentUtils.silenceSleepInMs(batchFlushInterval);
+                }
+            }
+            LOGGER.info("stop flush resend queue {}:{}", inlongGroupId, 
sourcePath);
+            resendRunning = false;
+        };
+    }
+
+    /**
+     * flushOffset
+     *
+     * @return thread runner
+     */
+    private Runnable flushOffset() {
+        return () -> {
+            AgentThreadFactory.nameThread(
+                    "flushOffset-" + profile.getTaskId() + "-" + 
profile.getInstanceId());
+            LOGGER.info("start flush offset {}:{}", inlongGroupId, sourcePath);
+            offsetRunning = true;
+            while (!shutdown) {
+                doFlushOffset();
+                AgentUtils.silenceSleepInMs(SAVE_OFFSET_INTERVAL_MS);
+            }
+            LOGGER.info("stop flush offset {}:{}", inlongGroupId, sourcePath);
+            offsetRunning = false;
+        };
+    }
+
+    /**
+     * put the data into resend queue and will be resent later.
+     *
+     * @param batchMessageCallBack
+     */
+    private void putInResendQueue(AgentSenderCallback batchMessageCallBack) {
+        try {
+            resendQueue.put(batchMessageCallBack);
+        } catch (Throwable throwable) {
+            LOGGER.error("putInResendQueue e = {}", throwable);
+        }
+    }
+
+    /**
+     * sender callback
+     */
+    private class AgentSenderCallback implements SendMessageCallback {
+
+        private final int retry;
+        private final SenderMessage message;
+        private final int msgCnt;
+
+        AgentSenderCallback(SenderMessage message, int retry) {
+            this.message = message;
+            this.retry = retry;
+            this.msgCnt = message.getDataList().size();
+        }
+
+        @Override
+        public void onMessageAck(SendResult result) {
+            String groupId = message.getGroupId();
+            String streamId = message.getStreamId();
+            String taskId = message.getTaskId();
+            String instanceId = message.getInstanceId();
+            long dataTime = message.getDataTime();
+            if (result != null && result.equals(SendResult.OK)) {
+                message.getAckInfo().setHasAck(true);
+                getMetricItem(groupId, 
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
+            } else {
+                LOGGER.warn("send groupId {}, streamId {}, taskId {}, 
instanceId {}, dataTime {} fail with times {}, "
+                        + "error {}", groupId, streamId, taskId, instanceId, 
dataTime, retry, result);
+                getMetricItem(groupId, 
streamId).pluginSendFailCount.addAndGet(msgCnt);
+                putInResendQueue(new AgentSenderCallback(message, retry));
+            }
+        }
+
+        @Override
+        public void onException(Throwable e) {
+            getMetricItem(message.getGroupId(), 
message.getStreamId()).pluginSendFailCount.addAndGet(msgCnt);
+            LOGGER.error("exception caught", e);
+        }
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java
index eff927736d..4768aa71c3 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java
@@ -21,47 +21,47 @@ public class AgentErrMsg {
 
     public static final String CONFIG_SUCCESS = "SUCCESS";
 
-    // 数据源配置异常 */
-    public static final String DATA_SOURCE_CONFIG_ERROR = 
"ERROR-0-TDAgent|10001|ERROR"
+    // data source config error */
+    public static final String DATA_SOURCE_CONFIG_ERROR = 
"ERROR-0-INLONG_AGENT|10001|ERROR"
             + "|ERROR_DATA_SOURCE_CONFIG|";
 
-    // 监控文件夹不存在 */
-    public static final String DIRECTORY_NOT_FOUND_ERROR = 
"ERROR-0-TDAgent|11001|WARN"
+    // directory not found error */
+    public static final String DIRECTORY_NOT_FOUND_ERROR = 
"ERROR-0-INLONG_AGENT|11001|WARN"
             + "|WARN_DIRECTORY_NOT_EXIST|";
 
-    // 监控文件夹时出错 */
-    public static final String WATCH_DIR_ERROR = "ERROR-0-TDAgent|11002|ERROR"
+    // watch directory error */
+    public static final String WATCH_DIR_ERROR = 
"ERROR-0-INLONG_AGENT|11002|ERROR"
             + "|ERROR_WATCH_DIR_ERROR|";
 
-    // 要读取的文件异常(不存在,rotate)
-    public static final String FILE_ERROR = 
"ERROR-0-TDAgent|10002|ERROR|ERROR_SOURCE_FILE|";
+    // file error(not found,rotate)
+    public static final String FILE_ERROR = 
"ERROR-0-INLONG_AGENT|10002|ERROR|ERROR_SOURCE_FILE|";
 
-    // 读取文件异常
-    public static final String FILE_OP_ERROR = 
"ERROR-1-TDAgent|30002|ERROR|ERROR_OPERATE_FILE|";
+    // read file error
+    public static final String FILE_OP_ERROR = 
"ERROR-1-INLONG_AGENT|30002|ERROR|ERROR_OPERATE_FILE|";
 
-    // 磁盘满
-    public static final String DISK_FULL = 
"ERROR-1-TDAgent|30001|FATAL|FATAL_DISK_FULL|";
+    // disk full
+    public static final String DISK_FULL = 
"ERROR-1-INLONG_AGENT|30001|FATAL|FATAL_DISK_FULL|";
 
-    // 内存溢出
-    public static final String OOM_ERROR = 
"ERROR-1-TDAgent|30001|FATAL|FATAL_OOM_ERROR|";
+    // out of memory
+    public static final String OOM_ERROR = 
"ERROR-1-INLONG_AGENT|30001|FATAL|FATAL_OOM_ERROR|";
 
-    // watcher异常
-    public static final String WATCHER_INVALID = 
"ERROR-1-TDAgent|40001|WARN|WARN_INVALID_WATCHER|";
+    // watcher error
+    public static final String WATCHER_INVALID = 
"ERROR-1-INLONG_AGENT|40001|WARN|WARN_INVALID_WATCHER|";
 
-    // 连不上tdmanager
-    public static final String CONNECT_TDM_ERROR = 
"ERROR-1-TDAgent|30002|ERROR"
-            + "|ERROR_CANNOT_CONNECT_TO_TDM|";
+    // could not connect to manager
+    public static final String CONNECT_MANAGER_ERROR = 
"ERROR-1-INLONG_AGENT|30002|ERROR"
+            + "|ERROR_CANNOT_CONNECT_TO_MANAGER|";
 
-    // 发送数据到tdbus失败
-    public static final String SEND_TO_BUS_ERROR = 
"ERROR-1-TDAgent|30003|ERROR|ERROR_SEND_TO_BUS|";
+    // send data to dataProxy failed
+    public static final String SEND_TO_BUS_ERROR = 
"ERROR-1-INLONG_AGENT|30003|ERROR|ERROR_SEND_TO_BUS|";
 
-    // 操作bdb异常
-    public static final String BDB_ERROR = 
"ERROR-1-TDAgent|30003|ERROR|BDB_OPERATION_ERROR|";
+    // operate bdb error
+    public static final String BDB_ERROR = 
"ERROR-1-INLONG_AGENT|30003|ERROR|BDB_OPERATION_ERROR|";
 
-    // 内部缓存满
-    public static final String MSG_BUFFER_FULL = 
"ERROR-1-TDAgent|40002|WARN|WARN_MSG_BUFFER_FULL|";
+    // buffer full
+    public static final String MSG_BUFFER_FULL = 
"ERROR-1-INLONG_AGENT|40002|WARN|WARN_MSG_BUFFER_FULL|";
 
-    // 监控到的事件不合法(任务已删除)
-    public static final String FOUND_EVENT_INVALID = 
"ERROR-1-TDAgent|30003|ERROR"
+    // found event invalid(task has been delete)
+    public static final String FOUND_EVENT_INVALID = 
"ERROR-1-INLONG_AGENT|30003|ERROR"
             + "|FOUND_EVENT_INVALID|";
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
index 46896d6cdc..eb25d60f82 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
@@ -37,7 +37,7 @@ import java.util.regex.Pattern;
 
 /*
  * This class is mainly used for scanning log file that we want to read. We 
use this class at
- * tdagent recover process, the do and redo tasks and the current log file 
access when we deploy a
+ * inlong_agent recover process, the do and redo tasks and the current log 
file access when we deploy a
  * new data source.
  */
 public class FileScanner {
@@ -114,10 +114,6 @@ public class FileScanner {
 
     private static ArrayList<String> getUpdatedOrNewFiles(String firstDir, 
String secondDir,
             String fileName, long depth, int maxFileNum) {
-
-        // logger.info("getUpdatedOrNewFiles: firstdir: {}, seconddir: {} 
filename: {}",
-        // new Object[]{firstDir, secondDir, fileName});
-
         ArrayList<String> ret = new ArrayList<String>();
         ArrayList<File> readyFiles = new ArrayList<File>();
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
index ffd74c0100..e0c6f464c6 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
@@ -198,7 +198,7 @@ public class WatchEntity {
                         logger.info("Register a new directory: " + dirName);
                     } catch (IOException e) {
                         /**
-                         * 捕获异常,不能注册的子目录就忽略。
+                         * catch error,ignore the child directory that can not 
register
                          */
                         logger.error("Register directory {} error, skip it. ", 
dirName, e);
                         continue;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
index 363b029783..935a1e4314 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
@@ -52,7 +52,7 @@ public class NewDateUtils {
     public static long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
     public static long HOUR_TIMEOUT_INTERVAL = 2 * 3600 * 1000;
     // data source config error */
-    public static final String DATA_SOURCE_CONFIG_ERROR = 
"ERROR-0-TDAgent|10001|ERROR"
+    public static final String DATA_SOURCE_CONFIG_ERROR = 
"ERROR-0-INLONG_AGENT|10001|ERROR"
             + "|ERROR_DATA_SOURCE_CONFIG|";
 
     /* Return the time in milliseconds for a data time. */
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
index 7850678acd..eccfe50b8c 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java
@@ -88,18 +88,18 @@ public class AgentBaseTestsHelper {
     private DataConfig getDataConfig(int taskId, String pattern, boolean 
retry, Long startTime, Long endTime,
             TaskStateEnum state) {
         DataConfig dataConfig = new DataConfig();
-        dataConfig.setInlongGroupId("testGroupId"); // 老字段 groupId
-        dataConfig.setInlongStreamId("testStreamId"); // 老字段 streamId
-        dataConfig.setDataReportType(1); // 老字段 reportType
-        dataConfig.setTaskType(3); // 老字段 任务类型,3 代表文件采集
-        dataConfig.setTaskId(taskId); // 老字段 任务 id
-        dataConfig.setState(state.ordinal()); // 新增! 任务状态 1 正常 2 暂停
+        dataConfig.setInlongGroupId("testGroupId");
+        dataConfig.setInlongStreamId("testStreamId");
+        dataConfig.setDataReportType(1);
+        dataConfig.setTaskType(3);
+        dataConfig.setTaskId(taskId);
+        dataConfig.setState(state.ordinal());
         FileTaskConfig fileTaskConfig = new FileTaskConfig();
-        fileTaskConfig.setPattern(pattern);// 正则
-        fileTaskConfig.setTimeOffset("0d"); // 老字段 时间偏移 "-1d" 采一天前的 "-2h" 采 2 
小时前的
-        fileTaskConfig.setMaxFileCount(100); // 最大文件数
-        fileTaskConfig.setCycleUnit("D"); // 新增! 任务周期 "D" 天 "h" 小时
-        fileTaskConfig.setRetry(retry); // 新增! 是否补录,如果是补录任务则为 true
+        fileTaskConfig.setPattern(pattern);
+        fileTaskConfig.setTimeOffset("0d");
+        fileTaskConfig.setMaxFileCount(100);
+        fileTaskConfig.setCycleUnit("D");
+        fileTaskConfig.setRetry(retry);
         fileTaskConfig.setStartTime(startTime);
         fileTaskConfig.setEndTime(endTime);
         dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
new file mode 100644
index 0000000000..084af58851
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sinks.filecollect;
+
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.message.filecollect.PackageAckInfo;
+import org.apache.inlong.agent.message.filecollect.SenderMessage;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.SendResult;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(SenderManager.class)
+@PowerMockIgnore({"javax.management.*"})
+public class TestSenderManager {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TestSenderManager.class);
+    private static final ClassLoader LOADER = 
TestSenderManager.class.getClassLoader();
+    private static AgentBaseTestsHelper helper;
+    private static InstanceProfile profile;
+    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
+            0, Integer.MAX_VALUE,
+            1L, TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            new AgentThreadFactory("TestLogfileCollectTask"));
+
+    @BeforeClass
+    public static void setup() {
+        String fileName = LOADER.getResource("test/20230928_1.txt").getPath();
+        helper = new 
AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome();
+        String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
+        TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 
0L, TaskStateEnum.RUNNING);
+        profile = taskProfile.createInstanceProfile("", fileName,
+                "20230927");
+    }
+
+    @AfterClass
+    public static void teardown() throws Exception {
+        helper.teardownAgentHome();
+    }
+
+    @Test
+    public void testNormalAck() {
+        List<SendMessageCallback> cbList = new ArrayList<>();
+        try {
+            profile.set(TaskConstants.INODE_INFO, 
FileDataUtils.getInodeInfo(profile.getInstanceId()));
+            SenderManager senderManager = PowerMockito.spy(new 
SenderManager(profile, "inlongGroupId", "sourceName"));
+            PowerMockito.doNothing().when(senderManager, 
"createMessageSender", Mockito.anyString());
+
+            PowerMockito.doAnswer(invocation -> {
+                SendMessageCallback cb = invocation.getArgument(0);
+                cbList.add(cb);
+                return null;
+            }).when(senderManager, "asyncSendByMessageSender", Mockito.any(),
+                    Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.anyLong(), Mockito.any(),
+                    Mockito.anyLong(), Mockito.any(),
+                    Mockito.any(), Mockito.anyBoolean());
+
+            senderManager.Start();
+            Long packageIndex = 0L;
+            Long packageOffset = 100L;
+            List<byte[]> bodyList = new ArrayList<>();
+            bodyList.add("123456789".getBytes(StandardCharsets.UTF_8));
+            Integer resultBatchSize = 0;
+            for (int i = 0; i < bodyList.size(); i++) {
+                resultBatchSize += bodyList.get(i).length;
+            }
+            for (int i = 0; i < 10; i++) {
+                PackageAckInfo ackInfo = new PackageAckInfo(packageIndex++, 
packageOffset, resultBatchSize, false);
+                SenderMessage senderMessage = new SenderMessage("taskId", 
"instanceId", "groupId", "streamId", bodyList,
+                        AgentUtils.getCurrentTime(), null, ackInfo);
+                senderManager.sendBatch(senderMessage);
+                packageOffset += 100;
+            }
+            Assert.assertTrue(cbList.size() == 10);
+            for (int i = 0; i < 5; i++) {
+                cbList.get(4 - i).onMessageAck(SendResult.OK);
+            }
+
+            await().atMost(2, TimeUnit.SECONDS).until(() -> 
!senderManager.sendFinished());
+            for (int i = 5; i < 10; i++) {
+                cbList.get(i).onMessageAck(SendResult.OK);
+                AgentUtils.silenceSleepInMs(10);
+            }
+            await().atMost(2, TimeUnit.SECONDS).until(() -> 
senderManager.sendFinished());
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.assertTrue("testNormalAck failed", false);
+        }
+    }
+}
\ No newline at end of file


Reply via email to