This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new eb5f890a23 [INLONG-9149][Agent] Add sender manager for file collect (#9150) eb5f890a23 is described below commit eb5f890a23e5b556d1c4e6c841b298c72388af0c Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Mon Oct 30 14:10:28 2023 +0800 [INLONG-9149][Agent] Add sender manager for file collect (#9150) --- .../plugin/sinks/filecollect/SenderManager.java | 444 +++++++++++++++++++++ .../agent/plugin/task/filecollect/AgentErrMsg.java | 54 +-- .../agent/plugin/task/filecollect/FileScanner.java | 6 +- .../agent/plugin/task/filecollect/WatchEntity.java | 2 +- .../agent/plugin/utils/file/NewDateUtils.java | 2 +- .../inlong/agent/plugin/AgentBaseTestsHelper.java | 22 +- .../sinks/filecollect/TestSenderManager.java | 134 +++++++ 7 files changed, 619 insertions(+), 45 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java new file mode 100755 index 0000000000..a0e4d46c7f --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sinks.filecollect; + +import org.apache.inlong.agent.common.AgentThreadFactory; +import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.OffsetProfile; +import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.core.task.OffsetManager; +import org.apache.inlong.agent.core.task.file.MemoryManager; +import org.apache.inlong.agent.message.filecollect.PackageAckInfo; +import org.apache.inlong.agent.message.filecollect.SenderMessage; +import org.apache.inlong.agent.metrics.AgentMetricItem; +import org.apache.inlong.agent.metrics.AgentMetricItemSet; +import org.apache.inlong.agent.plugin.message.SequentialID; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.ThreadUtils; +import org.apache.inlong.common.constant.ProtocolType; +import org.apache.inlong.common.metric.MetricRegister; +import org.apache.inlong.sdk.dataproxy.DefaultMessageSender; +import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; +import org.apache.inlong.sdk.dataproxy.SendMessageCallback; +import org.apache.inlong.sdk.dataproxy.SendResult; +import org.apache.inlong.sdk.dataproxy.network.ProxysdkException; + +import io.netty.util.concurrent.DefaultThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT; +import static org.apache.inlong.agent.constant.TaskConstants.DEFAULT_JOB_PROXY_SEND; +import static org.apache.inlong.agent.constant.TaskConstants.INODE_INFO; +import static org.apache.inlong.agent.constant.TaskConstants.JOB_PROXY_SEND; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID; + +/** + * proxy client + */ +public class SenderManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class); + private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance(); + public final int SAVE_OFFSET_INTERVAL_MS = 1000; + private final AtomicInteger SENDER_INDEX = new AtomicInteger(0); + // cache for group and sender list, share the map cross agent lifecycle. + private DefaultMessageSender sender; + private LinkedBlockingQueue<AgentSenderCallback> resendQueue; + private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( + 0, Integer.MAX_VALUE, + 1L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new AgentThreadFactory("sender-manager")); + // sharing worker threads between sender client + // in case of thread abusing. + private ThreadFactory SHARED_FACTORY; + private static final AtomicLong METRIC_INDEX = new AtomicLong(0); + private final String managerHost; + private final int managerPort; + private final String netTag; + private final String localhost; + private final boolean isLocalVisit; + private final int totalAsyncBufSize; + private final int aliveConnectionNum; + private final boolean isCompress; + private final int msgType; + private final boolean isFile; + private final long maxSenderTimeout; + private final int maxSenderRetry; + private final long retrySleepTime; + private final String inlongGroupId; + private final int maxSenderPerGroup; + private final String sourcePath; + private final boolean proxySend; + private volatile boolean shutdown = false; + // metric + private AgentMetricItemSet metricItemSet; + private Map<String, String> dimensions; + private OffsetManager offsetManager; + private int ioThreadNum; + private boolean enableBusyWait; + private String authSecretId; + private String authSecretKey; + protected int batchFlushInterval; + private List<PackageAckInfo> packageAckInfoList = new ArrayList<>(); + private final ReentrantReadWriteLock packageAckInfoLock = new ReentrantReadWriteLock(true); + protected InstanceProfile profile; + private Random testRandom = new Random(); + private volatile boolean offsetRunning = false; + private volatile boolean resendRunning = false; + private volatile boolean started = false; + + public SenderManager(InstanceProfile profile, String inlongGroupId, String sourcePath) { + AgentConfiguration conf = AgentConfiguration.getAgentConf(); + this.profile = profile; + managerHost = conf.get(AGENT_MANAGER_VIP_HTTP_HOST); + managerPort = conf.getInt(AGENT_MANAGER_VIP_HTTP_PORT); + proxySend = profile.getBoolean(JOB_PROXY_SEND, DEFAULT_JOB_PROXY_SEND); + localhost = profile.get(CommonConstants.PROXY_LOCAL_HOST, CommonConstants.DEFAULT_PROXY_LOCALHOST); + netTag = profile.get(CommonConstants.PROXY_NET_TAG, CommonConstants.DEFAULT_PROXY_NET_TAG); + isLocalVisit = profile.getBoolean( + CommonConstants.PROXY_IS_LOCAL_VISIT, CommonConstants.DEFAULT_PROXY_IS_LOCAL_VISIT); + totalAsyncBufSize = profile + .getInt( + CommonConstants.PROXY_TOTAL_ASYNC_PROXY_SIZE, + CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE); + aliveConnectionNum = profile + .getInt( + CommonConstants.PROXY_ALIVE_CONNECTION_NUM, CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM); + isCompress = profile.getBoolean( + CommonConstants.PROXY_IS_COMPRESS, CommonConstants.DEFAULT_PROXY_IS_COMPRESS); + maxSenderPerGroup = profile.getInt( + CommonConstants.PROXY_MAX_SENDER_PER_GROUP, CommonConstants.DEFAULT_PROXY_MAX_SENDER_PER_GROUP); + msgType = profile.getInt(CommonConstants.PROXY_MSG_TYPE, CommonConstants.DEFAULT_PROXY_MSG_TYPE); + maxSenderTimeout = profile.getInt( + CommonConstants.PROXY_SENDER_MAX_TIMEOUT, CommonConstants.DEFAULT_PROXY_SENDER_MAX_TIMEOUT); + maxSenderRetry = profile.getInt( + CommonConstants.PROXY_SENDER_MAX_RETRY, CommonConstants.DEFAULT_PROXY_SENDER_MAX_RETRY); + retrySleepTime = profile.getLong( + CommonConstants.PROXY_RETRY_SLEEP, CommonConstants.DEFAULT_PROXY_RETRY_SLEEP); + isFile = profile.getBoolean(CommonConstants.PROXY_IS_FILE, CommonConstants.DEFAULT_IS_FILE); + offsetManager = OffsetManager.init(); + ioThreadNum = profile.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM, + CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM); + enableBusyWait = profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT, + CommonConstants.DEFAULT_PROXY_CLIENT_ENABLE_BUSY_WAIT); + batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL, DEFAULT_PROXY_BATCH_FLUSH_INTERVAL); + authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID); + authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY); + + this.sourcePath = sourcePath; + this.inlongGroupId = inlongGroupId; + + this.dimensions = new HashMap<>(); + dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName()); + String metricName = String.join("-", this.getClass().getSimpleName(), + String.valueOf(METRIC_INDEX.incrementAndGet())); + this.metricItemSet = new AgentMetricItemSet(metricName); + MetricRegister.register(metricItemSet); + resendQueue = new LinkedBlockingQueue<>(); + + } + + public void Start() throws Exception { + createMessageSender(inlongGroupId); + EXECUTOR_SERVICE.execute(flushOffset()); + EXECUTOR_SERVICE.execute(flushResendQueue()); + started = true; + } + + public void Stop() { + LOGGER.info("stop send manager"); + shutdown = true; + if (!started) { + return; + } + while (offsetRunning || resendRunning) { + AgentUtils.silenceSleepInMs(1); + } + closeMessageSender(); + clearOffset(); + LOGGER.info("stop send manager end"); + } + + private void closeMessageSender() { + if (sender != null) { + sender.close(); + } + } + + private AgentMetricItem getMetricItem(Map<String, String> otherDimensions) { + Map<String, String> dimensions = new HashMap<>(); + dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName()); + dimensions.putAll(otherDimensions); + return this.metricItemSet.findMetricItem(dimensions); + } + + private AgentMetricItem getMetricItem(String groupId, String streamId) { + Map<String, String> dims = new HashMap<>(); + dims.put(KEY_INLONG_GROUP_ID, groupId); + dims.put(KEY_INLONG_STREAM_ID, streamId); + return getMetricItem(dims); + } + + /** + * createMessageSender + * + * @param tagName we use group id as tag name + */ + private void createMessageSender(String tagName) throws Exception { + + ProxyClientConfig proxyClientConfig = new ProxyClientConfig( + localhost, isLocalVisit, managerHost, managerPort, tagName, netTag, authSecretId, authSecretKey); + proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize); + proxyClientConfig.setFile(isFile); + proxyClientConfig.setAliveConnections(aliveConnectionNum); + + proxyClientConfig.setIoThreadNum(ioThreadNum); + proxyClientConfig.setEnableBusyWait(enableBusyWait); + proxyClientConfig.setProtocolType(ProtocolType.TCP); + + SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-" + sourcePath, + Thread.currentThread().isDaemon()); + + DefaultMessageSender sender = new DefaultMessageSender(proxyClientConfig, SHARED_FACTORY); + sender.setMsgtype(msgType); + sender.setCompress(isCompress); + this.sender = sender; + } + + public void sendBatch(SenderMessage message) { + while (!resendQueue.isEmpty()) { + AgentUtils.silenceSleepInMs(retrySleepTime); + } + addAckInfo(message.getAckInfo()); + sendBatchWithRetryCount(message, 0); + } + + private void addAckInfo(PackageAckInfo info) { + packageAckInfoLock.writeLock().lock(); + packageAckInfoList.add(info); + packageAckInfoLock.writeLock().unlock(); + } + + public boolean sendFinished() { + boolean finished = false; + packageAckInfoLock.writeLock().lock(); + if (packageAckInfoList.isEmpty()) { + finished = true; + } + packageAckInfoLock.writeLock().unlock(); + return finished; + } + + /** + * flushOffset + */ + private void doFlushOffset() { + packageAckInfoLock.writeLock().lock(); + PackageAckInfo info = null; + for (int i = 0; i < packageAckInfoList.size();) { + if (packageAckInfoList.get(i).getHasAck()) { + info = packageAckInfoList.remove(i); + MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, info.getLen()); + } else { + break; + } + } + if (info != null) { + LOGGER.info("save offset {} taskId {} instanceId {}", info.getOffset(), profile.getTaskId(), + profile.getInstanceId()); + OffsetProfile offsetProfile = new OffsetProfile(profile.getTaskId(), profile.getInstanceId(), + info.getOffset(), profile.get(INODE_INFO)); + offsetManager.setOffset(offsetProfile); + } + packageAckInfoLock.writeLock().unlock(); + } + + private void clearOffset() { + packageAckInfoLock.writeLock().lock(); + for (int i = 0; i < packageAckInfoList.size();) { + MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, packageAckInfoList.remove(i).getLen()); + } + packageAckInfoLock.writeLock().unlock(); + } + + /** + * Send message to proxy by batch, use message cache. + */ + private void sendBatchWithRetryCount(SenderMessage message, int retry) { + boolean suc = false; + while (!suc) { + try { + AgentSenderCallback cb = new AgentSenderCallback(message, retry); + asyncSendByMessageSender(cb, message.getDataList(), message.getGroupId(), + message.getStreamId(), message.getDataTime(), SEQUENTIAL_ID.getNextUuid(), + maxSenderTimeout, TimeUnit.SECONDS, message.getExtraMap(), proxySend); + getMetricItem(message.getGroupId(), message.getStreamId()).pluginSendCount.addAndGet( + message.getMsgCnt()); + suc = true; + } catch (Exception exception) { + suc = false; + if (retry > maxSenderRetry) { + if (retry % 10 == 0) { + LOGGER.error("max retry reached, sample log Exception caught", exception); + } + } else { + LOGGER.error("Exception caught", exception); + } + retry++; + AgentUtils.silenceSleepInMs(retrySleepTime); + } + } + } + + private void asyncSendByMessageSender(SendMessageCallback cb, + List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID, + long timeout, TimeUnit timeUnit, + Map<String, String> extraAttrMap, boolean isProxySend) throws ProxysdkException { + sender.asyncSendMessage(cb, bodyList, groupId, + streamId, dt, msgUUID, + timeout, timeUnit, extraAttrMap, isProxySend); + } + + /** + * flushResendQueue + * + * @return thread runner + */ + private Runnable flushResendQueue() { + return () -> { + AgentThreadFactory.nameThread( + "flushResendQueue-" + profile.getTaskId() + "-" + profile.getInstanceId()); + LOGGER.info("start flush resend queue {}:{}", inlongGroupId, sourcePath); + resendRunning = true; + while (!shutdown) { + try { + AgentSenderCallback callback = resendQueue.poll(1, TimeUnit.SECONDS); + if (callback != null) { + sendBatchWithRetryCount(callback.message, callback.retry + 1); + } + } catch (Exception ex) { + LOGGER.error("error caught", ex); + } catch (Throwable t) { + ThreadUtils.threadThrowableHandler(Thread.currentThread(), t); + } finally { + AgentUtils.silenceSleepInMs(batchFlushInterval); + } + } + LOGGER.info("stop flush resend queue {}:{}", inlongGroupId, sourcePath); + resendRunning = false; + }; + } + + /** + * flushOffset + * + * @return thread runner + */ + private Runnable flushOffset() { + return () -> { + AgentThreadFactory.nameThread( + "flushOffset-" + profile.getTaskId() + "-" + profile.getInstanceId()); + LOGGER.info("start flush offset {}:{}", inlongGroupId, sourcePath); + offsetRunning = true; + while (!shutdown) { + doFlushOffset(); + AgentUtils.silenceSleepInMs(SAVE_OFFSET_INTERVAL_MS); + } + LOGGER.info("stop flush offset {}:{}", inlongGroupId, sourcePath); + offsetRunning = false; + }; + } + + /** + * put the data into resend queue and will be resent later. + * + * @param batchMessageCallBack + */ + private void putInResendQueue(AgentSenderCallback batchMessageCallBack) { + try { + resendQueue.put(batchMessageCallBack); + } catch (Throwable throwable) { + LOGGER.error("putInResendQueue e = {}", throwable); + } + } + + /** + * sender callback + */ + private class AgentSenderCallback implements SendMessageCallback { + + private final int retry; + private final SenderMessage message; + private final int msgCnt; + + AgentSenderCallback(SenderMessage message, int retry) { + this.message = message; + this.retry = retry; + this.msgCnt = message.getDataList().size(); + } + + @Override + public void onMessageAck(SendResult result) { + String groupId = message.getGroupId(); + String streamId = message.getStreamId(); + String taskId = message.getTaskId(); + String instanceId = message.getInstanceId(); + long dataTime = message.getDataTime(); + if (result != null && result.equals(SendResult.OK)) { + message.getAckInfo().setHasAck(true); + getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(msgCnt); + } else { + LOGGER.warn("send groupId {}, streamId {}, taskId {}, instanceId {}, dataTime {} fail with times {}, " + + "error {}", groupId, streamId, taskId, instanceId, dataTime, retry, result); + getMetricItem(groupId, streamId).pluginSendFailCount.addAndGet(msgCnt); + putInResendQueue(new AgentSenderCallback(message, retry)); + } + } + + @Override + public void onException(Throwable e) { + getMetricItem(message.getGroupId(), message.getStreamId()).pluginSendFailCount.addAndGet(msgCnt); + LOGGER.error("exception caught", e); + } + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java index eff927736d..4768aa71c3 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/AgentErrMsg.java @@ -21,47 +21,47 @@ public class AgentErrMsg { public static final String CONFIG_SUCCESS = "SUCCESS"; - // 数据源配置异常 */ - public static final String DATA_SOURCE_CONFIG_ERROR = "ERROR-0-TDAgent|10001|ERROR" + // data source config error */ + public static final String DATA_SOURCE_CONFIG_ERROR = "ERROR-0-INLONG_AGENT|10001|ERROR" + "|ERROR_DATA_SOURCE_CONFIG|"; - // 监控文件夹不存在 */ - public static final String DIRECTORY_NOT_FOUND_ERROR = "ERROR-0-TDAgent|11001|WARN" + // directory not found error */ + public static final String DIRECTORY_NOT_FOUND_ERROR = "ERROR-0-INLONG_AGENT|11001|WARN" + "|WARN_DIRECTORY_NOT_EXIST|"; - // 监控文件夹时出错 */ - public static final String WATCH_DIR_ERROR = "ERROR-0-TDAgent|11002|ERROR" + // watch directory error */ + public static final String WATCH_DIR_ERROR = "ERROR-0-INLONG_AGENT|11002|ERROR" + "|ERROR_WATCH_DIR_ERROR|"; - // 要读取的文件异常(不存在,rotate) - public static final String FILE_ERROR = "ERROR-0-TDAgent|10002|ERROR|ERROR_SOURCE_FILE|"; + // file error(not found,rotate) + public static final String FILE_ERROR = "ERROR-0-INLONG_AGENT|10002|ERROR|ERROR_SOURCE_FILE|"; - // 读取文件异常 - public static final String FILE_OP_ERROR = "ERROR-1-TDAgent|30002|ERROR|ERROR_OPERATE_FILE|"; + // read file error + public static final String FILE_OP_ERROR = "ERROR-1-INLONG_AGENT|30002|ERROR|ERROR_OPERATE_FILE|"; - // 磁盘满 - public static final String DISK_FULL = "ERROR-1-TDAgent|30001|FATAL|FATAL_DISK_FULL|"; + // disk full + public static final String DISK_FULL = "ERROR-1-INLONG_AGENT|30001|FATAL|FATAL_DISK_FULL|"; - // 内存溢出 - public static final String OOM_ERROR = "ERROR-1-TDAgent|30001|FATAL|FATAL_OOM_ERROR|"; + // out of memory + public static final String OOM_ERROR = "ERROR-1-INLONG_AGENT|30001|FATAL|FATAL_OOM_ERROR|"; - // watcher异常 - public static final String WATCHER_INVALID = "ERROR-1-TDAgent|40001|WARN|WARN_INVALID_WATCHER|"; + // watcher error + public static final String WATCHER_INVALID = "ERROR-1-INLONG_AGENT|40001|WARN|WARN_INVALID_WATCHER|"; - // 连不上tdmanager - public static final String CONNECT_TDM_ERROR = "ERROR-1-TDAgent|30002|ERROR" - + "|ERROR_CANNOT_CONNECT_TO_TDM|"; + // could not connect to manager + public static final String CONNECT_MANAGER_ERROR = "ERROR-1-INLONG_AGENT|30002|ERROR" + + "|ERROR_CANNOT_CONNECT_TO_MANAGER|"; - // 发送数据到tdbus失败 - public static final String SEND_TO_BUS_ERROR = "ERROR-1-TDAgent|30003|ERROR|ERROR_SEND_TO_BUS|"; + // send data to dataProxy failed + public static final String SEND_TO_BUS_ERROR = "ERROR-1-INLONG_AGENT|30003|ERROR|ERROR_SEND_TO_BUS|"; - // 操作bdb异常 - public static final String BDB_ERROR = "ERROR-1-TDAgent|30003|ERROR|BDB_OPERATION_ERROR|"; + // operate bdb error + public static final String BDB_ERROR = "ERROR-1-INLONG_AGENT|30003|ERROR|BDB_OPERATION_ERROR|"; - // 内部缓存满 - public static final String MSG_BUFFER_FULL = "ERROR-1-TDAgent|40002|WARN|WARN_MSG_BUFFER_FULL|"; + // buffer full + public static final String MSG_BUFFER_FULL = "ERROR-1-INLONG_AGENT|40002|WARN|WARN_MSG_BUFFER_FULL|"; - // 监控到的事件不合法(任务已删除) - public static final String FOUND_EVENT_INVALID = "ERROR-1-TDAgent|30003|ERROR" + // found event invalid(task has been delete) + public static final String FOUND_EVENT_INVALID = "ERROR-1-INLONG_AGENT|30003|ERROR" + "|FOUND_EVENT_INVALID|"; } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java index 46896d6cdc..eb25d60f82 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java @@ -37,7 +37,7 @@ import java.util.regex.Pattern; /* * This class is mainly used for scanning log file that we want to read. We use this class at - * tdagent recover process, the do and redo tasks and the current log file access when we deploy a + * inlong_agent recover process, the do and redo tasks and the current log file access when we deploy a * new data source. */ public class FileScanner { @@ -114,10 +114,6 @@ public class FileScanner { private static ArrayList<String> getUpdatedOrNewFiles(String firstDir, String secondDir, String fileName, long depth, int maxFileNum) { - - // logger.info("getUpdatedOrNewFiles: firstdir: {}, seconddir: {} filename: {}", - // new Object[]{firstDir, secondDir, fileName}); - ArrayList<String> ret = new ArrayList<String>(); ArrayList<File> readyFiles = new ArrayList<File>(); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java index ffd74c0100..e0c6f464c6 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java @@ -198,7 +198,7 @@ public class WatchEntity { logger.info("Register a new directory: " + dirName); } catch (IOException e) { /** - * 捕获异常,不能注册的子目录就忽略。 + * catch error,ignore the child directory that can not register */ logger.error("Register directory {} error, skip it. ", dirName, e); continue; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java index 363b029783..935a1e4314 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java @@ -52,7 +52,7 @@ public class NewDateUtils { public static long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000; public static long HOUR_TIMEOUT_INTERVAL = 2 * 3600 * 1000; // data source config error */ - public static final String DATA_SOURCE_CONFIG_ERROR = "ERROR-0-TDAgent|10001|ERROR" + public static final String DATA_SOURCE_CONFIG_ERROR = "ERROR-0-INLONG_AGENT|10001|ERROR" + "|ERROR_DATA_SOURCE_CONFIG|"; /* Return the time in milliseconds for a data time. */ diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java index 7850678acd..eccfe50b8c 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java @@ -88,18 +88,18 @@ public class AgentBaseTestsHelper { private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime, TaskStateEnum state) { DataConfig dataConfig = new DataConfig(); - dataConfig.setInlongGroupId("testGroupId"); // 老字段 groupId - dataConfig.setInlongStreamId("testStreamId"); // 老字段 streamId - dataConfig.setDataReportType(1); // 老字段 reportType - dataConfig.setTaskType(3); // 老字段 任务类型,3 代表文件采集 - dataConfig.setTaskId(taskId); // 老字段 任务 id - dataConfig.setState(state.ordinal()); // 新增! 任务状态 1 正常 2 暂停 + dataConfig.setInlongGroupId("testGroupId"); + dataConfig.setInlongStreamId("testStreamId"); + dataConfig.setDataReportType(1); + dataConfig.setTaskType(3); + dataConfig.setTaskId(taskId); + dataConfig.setState(state.ordinal()); FileTaskConfig fileTaskConfig = new FileTaskConfig(); - fileTaskConfig.setPattern(pattern);// 正则 - fileTaskConfig.setTimeOffset("0d"); // 老字段 时间偏移 "-1d" 采一天前的 "-2h" 采 2 小时前的 - fileTaskConfig.setMaxFileCount(100); // 最大文件数 - fileTaskConfig.setCycleUnit("D"); // 新增! 任务周期 "D" 天 "h" 小时 - fileTaskConfig.setRetry(retry); // 新增! 是否补录,如果是补录任务则为 true + fileTaskConfig.setPattern(pattern); + fileTaskConfig.setTimeOffset("0d"); + fileTaskConfig.setMaxFileCount(100); + fileTaskConfig.setCycleUnit("D"); + fileTaskConfig.setRetry(retry); fileTaskConfig.setStartTime(startTime); fileTaskConfig.setEndTime(endTime); dataConfig.setExtParams(GSON.toJson(fileTaskConfig)); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java new file mode 100644 index 0000000000..084af58851 --- /dev/null +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sinks.filecollect; + +import org.apache.inlong.agent.common.AgentThreadFactory; +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.message.filecollect.PackageAckInfo; +import org.apache.inlong.agent.message.filecollect.SenderMessage; +import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; +import org.apache.inlong.agent.plugin.utils.file.FileDataUtils; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.common.enums.TaskStateEnum; +import org.apache.inlong.sdk.dataproxy.SendMessageCallback; +import org.apache.inlong.sdk.dataproxy.SendResult; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(SenderManager.class) +@PowerMockIgnore({"javax.management.*"}) +public class TestSenderManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(TestSenderManager.class); + private static final ClassLoader LOADER = TestSenderManager.class.getClassLoader(); + private static AgentBaseTestsHelper helper; + private static InstanceProfile profile; + private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( + 0, Integer.MAX_VALUE, + 1L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new AgentThreadFactory("TestLogfileCollectTask")); + + @BeforeClass + public static void setup() { + String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); + helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); + String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; + TaskProfile taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); + profile = taskProfile.createInstanceProfile("", fileName, + "20230927"); + } + + @AfterClass + public static void teardown() throws Exception { + helper.teardownAgentHome(); + } + + @Test + public void testNormalAck() { + List<SendMessageCallback> cbList = new ArrayList<>(); + try { + profile.set(TaskConstants.INODE_INFO, FileDataUtils.getInodeInfo(profile.getInstanceId())); + SenderManager senderManager = PowerMockito.spy(new SenderManager(profile, "inlongGroupId", "sourceName")); + PowerMockito.doNothing().when(senderManager, "createMessageSender", Mockito.anyString()); + + PowerMockito.doAnswer(invocation -> { + SendMessageCallback cb = invocation.getArgument(0); + cbList.add(cb); + return null; + }).when(senderManager, "asyncSendByMessageSender", Mockito.any(), + Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyLong(), Mockito.any(), + Mockito.anyLong(), Mockito.any(), + Mockito.any(), Mockito.anyBoolean()); + + senderManager.Start(); + Long packageIndex = 0L; + Long packageOffset = 100L; + List<byte[]> bodyList = new ArrayList<>(); + bodyList.add("123456789".getBytes(StandardCharsets.UTF_8)); + Integer resultBatchSize = 0; + for (int i = 0; i < bodyList.size(); i++) { + resultBatchSize += bodyList.get(i).length; + } + for (int i = 0; i < 10; i++) { + PackageAckInfo ackInfo = new PackageAckInfo(packageIndex++, packageOffset, resultBatchSize, false); + SenderMessage senderMessage = new SenderMessage("taskId", "instanceId", "groupId", "streamId", bodyList, + AgentUtils.getCurrentTime(), null, ackInfo); + senderManager.sendBatch(senderMessage); + packageOffset += 100; + } + Assert.assertTrue(cbList.size() == 10); + for (int i = 0; i < 5; i++) { + cbList.get(4 - i).onMessageAck(SendResult.OK); + } + + await().atMost(2, TimeUnit.SECONDS).until(() -> !senderManager.sendFinished()); + for (int i = 5; i < 10; i++) { + cbList.get(i).onMessageAck(SendResult.OK); + AgentUtils.silenceSleepInMs(10); + } + await().atMost(2, TimeUnit.SECONDS).until(() -> senderManager.sendFinished()); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue("testNormalAck failed", false); + } + } +} \ No newline at end of file