This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.7 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 4b99eac73d859906e5c4ab6d80628a9c57ab1bf2 Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Mon May 15 19:16:35 2023 +0800 [INLONG-8026][Agent] Improve the Agent performance (#8027) * Improve the Agent performance * Add InLong Changelog * Add InLong Changelog --------- Co-authored-by: doleyzi <dole...@tencent.com> --- CHANGES.md | 4 +- .../inlong/agent/message/BatchProxyMessage.java | 1 - .../inlong/agent/message/PackProxyMessage.java | 54 ++++--- .../org/apache/inlong/agent/core/AgentManager.java | 28 ++-- .../apache/inlong/agent/core/conf/ConfigJetty.java | 17 +-- .../apache/inlong/agent/core/job/JobManager.java | 72 +++++---- .../inlong/agent/core/trigger/TriggerManager.java | 31 ++-- .../agent/plugin/fetcher/ManagerFetcher.java | 168 ++++++++------------- .../inlong/agent/plugin/sinks/ProxySink.java | 90 +++++++---- .../inlong/agent/plugin/sinks/PulsarSink.java | 3 +- .../agent/plugin/sources/TextFileSource.java | 16 -- .../sources/reader/file/MonitorTextFile.java | 15 +- .../org/apache/inlong/agent/plugin/MiniAgent.java | 7 +- .../agent/plugin/trigger/TestTriggerManager.java | 2 +- 14 files changed, 255 insertions(+), 253 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index bac536234..48c92680f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -27,7 +27,9 @@ |:-----------------------------------------------------------:|:---------------------------------------------------------------------------------------------------------------------------------------------------| | [INLONG-7847](https://github.com/apache/inlong/issues/7847) | [Bug][Agent] Failed to create MySQL reader | | [INLONG-7783](https://github.com/apache/inlong/issues/7783) | [Feature][Agent] Support sink data tor Kafka | -| [INLONG-7752](https://github.com/apache/inlong/issues/7752) | [Bug][Agent] PulsarSink threadPool throw reject exception | +| [INLONG-7752](https://github.com/apache/inlong/issues/7752) | [Bug][Agent] PulsarSink threadPool throw reject exception | +| [INLONG-7976](https://github.com/apache/inlong/issues/7976) | [Bug][Agent] The data collected by the agent is incomplete | +| [INLONG-8026](https://github.com/apache/inlong/issues/8026) | [Improve][Agent] Improve the Agent performance | ### DataProxy | ISSUE | Summary | diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java index 39be15437..c34d8eaf5 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java @@ -41,7 +41,6 @@ public class BatchProxyMessage { private List<byte[]> dataList; private long dataTime; private Map<String, String> extraMap; - private boolean isSyncSend; public InLongMsg getInLongMsg() { InLongMsg message = InLongMsg.newInLongMsg(true); diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java index 6eb54cd1c..61b233abe 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java @@ -17,30 +17,28 @@ package org.apache.inlong.agent.message; -import org.apache.inlong.agent.conf.JobProfile; -import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.common.msg.AttributeConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicLong; - import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_SYNC; import static org.apache.inlong.common.msg.AttributeConstants.DATA_TIME; import static org.apache.inlong.common.msg.AttributeConstants.MESSAGE_TOPIC; import static org.apache.inlong.common.msg.AttributeConstants.STREAM_ID; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.common.msg.AttributeConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Handle List of BusMessage, which belong to the same stream id. */ @@ -58,7 +56,6 @@ public class PackProxyMessage { // streamId -> list of proxyMessage private final LinkedBlockingQueue<ProxyMessage> messageQueue; private final AtomicLong queueSize = new AtomicLong(0); - private boolean syncSend; private int currentSize; /** * extra map used when sending to dataproxy @@ -79,9 +76,7 @@ public class PackProxyMessage { this.messageQueue = new LinkedBlockingQueue<>(maxQueueNumber); this.groupId = groupId; this.streamId = streamId; - // handle syncSend flag - this.syncSend = jobConf.getBoolean(PROXY_SEND_SYNC, false); - extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, String.valueOf(syncSend)); + extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false"); } public void generateExtraMap(String dataKey) { @@ -106,18 +101,21 @@ public class PackProxyMessage { /** * Add proxy message to cache, proxy message should belong to the same stream id. */ - public void addProxyMessage(ProxyMessage message) { + public boolean addProxyMessage(ProxyMessage message) { assert streamId.equals(message.getInlongStreamId()); try { if (queueIsFull()) { LOGGER.warn("message queue is greater than {}, stop adding message, " + "maybe proxy get stuck", maxQueueNumber); + return false; } messageQueue.put(message); queueSize.addAndGet(message.getBody().length); + return true; } catch (Exception ex) { LOGGER.error("exception caught", ex); } + return false; } /** @@ -144,8 +142,19 @@ public class PackProxyMessage { while (!messageQueue.isEmpty()) { // pre check message size ProxyMessage peekMessage = messageQueue.peek(); - if (peekMessage == null - || resultBatchSize + peekMessage.getBody().length > maxPackSize) { + if (peekMessage == null) { + break; + } + + // if the message size is greater than max pack size,should drop it. + int peekMessageLength = peekMessage.getBody().length; + if (peekMessageLength > maxPackSize) { + LOGGER.warn("message size is {}, greater than max pack size {}, drop it!", + peekMessage.getBody().length, maxPackSize); + messageQueue.remove(); + break; + } + if (resultBatchSize + peekMessageLength > maxPackSize) { break; } ProxyMessage message = messageQueue.remove(); @@ -159,8 +168,7 @@ public class PackProxyMessage { } // make sure result is not empty. if (!result.isEmpty()) { - return new BatchProxyMessage(jobId, groupId, streamId, result, AgentUtils.getCurrentTime(), extraMap, - syncSend); + return new BatchProxyMessage(jobId, groupId, streamId, result, AgentUtils.getCurrentTime(), extraMap); } } return null; diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java index 42e058823..a571ee9d9 100755 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java @@ -17,6 +17,16 @@ package org.apache.inlong.agent.core; +import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CONF_PARENT; +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_CONF_PARENT; +import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER; + +import java.io.File; +import java.lang.reflect.Constructor; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import org.apache.inlong.agent.common.AbstractDaemon; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.JobProfile; @@ -36,16 +46,6 @@ import org.apache.inlong.agent.db.TriggerProfileDb; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.lang.reflect.Constructor; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CONF_PARENT; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_CONF_PARENT; -import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER; - /** * Agent Manager, the bridge for job manager, task manager, db e.t.c it manages agent level operations and communicates * with outside system. @@ -196,11 +196,17 @@ public class AgentManager extends AbstractDaemon { public void start() throws Exception { LOGGER.info("starting agent manager"); agentConfMonitor.submit(startHotConfReplace()); + LOGGER.info("starting job manager"); jobManager.start(); + LOGGER.info("starting trigger manager"); triggerManager.start(); + LOGGER.info("starting task manager"); taskManager.start(); + LOGGER.info("starting heartbeat manager"); heartbeatManager.start(); + LOGGER.info("starting task position manager"); taskPositionManager.start(); + LOGGER.info("starting read job from local"); // read job profiles from local List<JobProfile> profileList = localProfile.readFromLocal(); for (JobProfile profile : profileList) { @@ -215,9 +221,11 @@ public class AgentManager extends AbstractDaemon { jobManager.submitFileJobProfile(profile); } } + LOGGER.info("starting fetcher"); if (fetcher != null) { fetcher.start(); } + LOGGER.info("starting agent manager end"); } /** diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java index d29cecdb7..cf9675e05 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java @@ -17,6 +17,10 @@ package org.apache.inlong.agent.core.conf; +import static org.apache.inlong.agent.constant.JobConstants.JOB_SOURCE_TYPE; +import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER; + +import java.io.Closeable; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.JobProfile; import org.apache.inlong.agent.conf.TriggerProfile; @@ -32,11 +36,6 @@ import org.eclipse.jetty.servlet.ServletHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; - -import static org.apache.inlong.agent.constant.JobConstants.JOB_SOURCE_TYPE; -import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER; - /** * start http server and get job/agent config via http */ @@ -88,7 +87,7 @@ public class ConfigJetty implements Closeable { // trigger job is a special kind of job if (jobProfile.hasKey(JOB_TRIGGER)) { triggerManager.submitTrigger( - TriggerProfile.parseJsonStr(jobProfile.toJsonStr())); + TriggerProfile.parseJsonStr(jobProfile.toJsonStr()), true); } else { TaskTypeEnum taskType = TaskTypeEnum .getTaskType(jobProfile.getInt(JOB_SOURCE_TYPE)); @@ -99,7 +98,7 @@ public class ConfigJetty implements Closeable { case KAFKA: case BINLOG: case SQL: - jobManager.submitJobProfile(jobProfile, true); + jobManager.submitJobProfile(jobProfile, true, true); break; default: LOGGER.error("source type not supported {}", taskType); @@ -123,9 +122,9 @@ public class ConfigJetty implements Closeable { public void deleteJobConf(JobProfile jobProfile) { if (jobProfile != null) { if (jobProfile.hasKey(JOB_TRIGGER)) { - triggerManager.deleteTrigger(TriggerProfile.parseJobProfile(jobProfile).getTriggerId()); + triggerManager.deleteTrigger(TriggerProfile.parseJobProfile(jobProfile).getTriggerId(), false); } else { - jobManager.deleteJob(jobProfile.getInstanceId()); + jobManager.deleteJob(jobProfile.getInstanceId(), false); } } } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java index c4171d95f..f91285b2e 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java @@ -17,6 +17,27 @@ package org.apache.inlong.agent.core.job; +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL; +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_TIME; +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT; +import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_CHECK_INTERVAL; +import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_TIME; +import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT; +import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION; +import static org.apache.inlong.agent.constant.JobConstants.JOB_ID; +import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX; +import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID; +import static org.apache.inlong.agent.constant.JobConstants.SQL_JOB_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.inlong.agent.common.AbstractDaemon; import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.AgentConfiguration; @@ -34,27 +55,6 @@ import org.apache.inlong.common.metric.MetricRegister; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_TIME; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT; -import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_CHECK_INTERVAL; -import static org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_TIME; -import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT; -import static org.apache.inlong.agent.constant.JobConstants.JOB_ID; -import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX; -import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID; -import static org.apache.inlong.agent.constant.JobConstants.SQL_JOB_ID; -import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME; - /** * JobManager maintains lots of jobs, and communicate between server and task manager. */ @@ -143,7 +143,7 @@ public class JobManager extends AbstractDaemon { * @param profile job profile. */ public boolean submitFileJobProfile(JobProfile profile) { - return submitJobProfile(profile, false); + return submitJobProfile(profile, false, true); } /** @@ -151,7 +151,7 @@ public class JobManager extends AbstractDaemon { * * @param profile job profile. */ - public boolean submitJobProfile(JobProfile profile, boolean singleJob) { + public boolean submitJobProfile(JobProfile profile, boolean singleJob, boolean isNewJob) { if (!isJobValid(profile)) { return false; } @@ -161,8 +161,19 @@ public class JobManager extends AbstractDaemon { } else { profile.set(JOB_INSTANCE_ID, AgentUtils.getUniqId(JOB_ID_PREFIX, jobId, index.incrementAndGet())); } - LOGGER.info("submit job profile {}", profile.toJsonStr()); - getJobConfDb().storeJobFirstTime(profile); + LOGGER.info("submit job profile {} isNewJob {}", profile.toJsonStr(), isNewJob); + if (isNewJob) { + jobProfileDb.storeJobFirstTime(profile); + } else { + JobProfile jobFromDb = jobProfileDb.getJobById(profile.getInstanceId()); + if (jobFromDb != null) { + jobFromDb.set(JOB_VERSION, profile.get(JOB_VERSION)); + profile = jobFromDb; + } else { + LOGGER.info("submit job final profile null"); + } + } + LOGGER.info("submit job final profile {}", profile.toJsonStr()); addJob(new Job(profile)); return true; } @@ -192,13 +203,15 @@ public class JobManager extends AbstractDaemon { * * @param jobInstancId */ - public boolean deleteJob(String jobInstancId) { + public boolean deleteJob(String jobInstancId, boolean isFrozen) { LOGGER.info("start to delete job, job id set {}", jobs.keySet()); JobWrapper jobWrapper = jobs.remove(jobInstancId); if (jobWrapper != null) { - LOGGER.info("delete job instance with job id {}", jobInstancId); + LOGGER.info("delete job instance with job id {} isFrozen {}", jobInstancId, isFrozen); jobWrapper.cleanup(); - getJobConfDb().deleteJob(jobInstancId); + if (!isFrozen) { + jobProfileDb.deleteJob(jobInstancId); + } return true; } return true; @@ -208,7 +221,7 @@ public class JobManager extends AbstractDaemon { * start all accepted jobs. */ private void startJobs() { - List<JobProfile> profileList = getJobConfDb().getRestartJobs(); + List<JobProfile> profileList = jobProfileDb.getRestartJobs(); for (JobProfile profile : profileList) { LOGGER.info("init starting job from db {}", profile.toJsonStr()); addJob(new Job(profile)); @@ -283,6 +296,7 @@ public class JobManager extends AbstractDaemon { * @param jobId job id */ public void markJobAsFailed(String jobId) { + LOGGER.info("markJobAsFailed {}", jobId); JobWrapper wrapper = jobs.remove(jobId); if (wrapper != null) { LOGGER.info("job instance {} is failed", jobId); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java index e73f6b8d6..0f8c4a077 100755 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java @@ -17,7 +17,15 @@ package org.apache.inlong.agent.core.trigger; +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_TRIGGER_MAX_RUNNING_NUM; +import static org.apache.inlong.agent.constant.AgentConstants.TRIGGER_MAX_RUNNING_NUM; +import static org.apache.inlong.agent.constant.JobConstants.JOB_ID; + import com.google.common.base.Preconditions; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.inlong.agent.common.AbstractDaemon; import org.apache.inlong.agent.conf.AgentConfiguration; @@ -33,15 +41,6 @@ import org.apache.inlong.agent.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; - -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_TRIGGER_MAX_RUNNING_NUM; -import static org.apache.inlong.agent.constant.AgentConstants.TRIGGER_MAX_RUNNING_NUM; -import static org.apache.inlong.agent.constant.JobConstants.JOB_ID; - /** * manager for triggers. */ @@ -76,7 +75,7 @@ public class TriggerManager extends AbstractDaemon { Trigger trigger = (Trigger) triggerClass.newInstance(); String triggerId = triggerProfile.get(JOB_ID); if (triggerMap.containsKey(triggerId)) { - deleteTrigger(triggerId); + deleteTrigger(triggerId, false); LOGGER.warn("trigger {} is running, stop it", triggerId); } triggerMap.put(triggerId, trigger); @@ -101,7 +100,7 @@ public class TriggerManager extends AbstractDaemon { * * @param triggerProfile trigger profile */ - public void submitTrigger(TriggerProfile triggerProfile) { + public void submitTrigger(TriggerProfile triggerProfile, boolean isNewJob) { // make sure all required key exists. if (!triggerProfile.allRequiredKeyExist() || this.triggerMap.size() > maxRunningNum) { throw new IllegalArgumentException( @@ -117,7 +116,7 @@ public class TriggerManager extends AbstractDaemon { LOGGER.info("submit trigger {}", triggerProfile.toJsonStr()); // This action must be done before saving in db, because the job.instance.id is needed for the next recovery - manager.getJobManager().submitJobProfile(triggerProfile, true); + manager.getJobManager().submitJobProfile(triggerProfile, true, isNewJob); triggerProfileDB.storeTrigger(triggerProfile); restoreTrigger(triggerProfile); } @@ -129,7 +128,7 @@ public class TriggerManager extends AbstractDaemon { * * @param triggerId trigger profile. */ - public void deleteTrigger(String triggerId) { + public void deleteTrigger(String triggerId, boolean isFrozen) { // repeat check if (!triggerProfileDB.getTriggers().stream() .anyMatch(profile -> profile.getTriggerId().equals(triggerId))) { @@ -139,7 +138,7 @@ public class TriggerManager extends AbstractDaemon { LOGGER.info("delete trigger {}", triggerId); Trigger trigger = triggerMap.remove(triggerId); if (trigger != null) { - manager.getJobManager().deleteJob(trigger.getTriggerProfile().getInstanceId()); + manager.getJobManager().deleteJob(trigger.getTriggerProfile().getInstanceId(), isFrozen); trigger.destroy(); } triggerProfileDB.deleteTrigger(triggerId); @@ -157,6 +156,10 @@ public class TriggerManager extends AbstractDaemon { if (profile != null) { Map<String, JobWrapper> jobWrapperMap = manager.getJobManager().getJobs(); JobWrapper job = jobWrapperMap.get(trigger.getTriggerProfile().getInstanceId()); + if (job == null) { + LOGGER.error("job {} should not be null", trigger.getTriggerProfile().getInstanceId()); + return; + } String subTaskFile = profile.getOrDefault(JobConstants.JOB_DIR_FILTER_PATTERNS, ""); Preconditions.checkArgument(StringUtils.isNotBlank(subTaskFile), String.format("Trigger %s fetched task file should not be null.", s)); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java index c8f2168c7..44ec85ad5 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java @@ -17,55 +17,9 @@ package org.apache.inlong.agent.plugin.fetcher; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import org.apache.inlong.agent.cache.LocalFileCache; -import org.apache.inlong.agent.common.AbstractDaemon; -import org.apache.inlong.agent.conf.AgentConfiguration; -import org.apache.inlong.agent.conf.JobProfile; -import org.apache.inlong.agent.conf.ProfileFetcher; -import org.apache.inlong.agent.conf.TriggerProfile; -import org.apache.inlong.agent.core.AgentManager; -import org.apache.inlong.agent.db.CommandDb; -import org.apache.inlong.agent.plugin.Trigger; -import org.apache.inlong.agent.plugin.utils.PluginUtils; -import org.apache.inlong.agent.pojo.ConfirmAgentIpRequest; -import org.apache.inlong.agent.pojo.DbCollectorTaskRequestDto; -import org.apache.inlong.agent.pojo.DbCollectorTaskResult; -import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.agent.utils.HttpManager; -import org.apache.inlong.agent.utils.ThreadUtils; -import org.apache.inlong.common.db.CommandEntity; -import org.apache.inlong.common.enums.ManagerOpEnum; -import org.apache.inlong.common.enums.PullJobTypeEnum; -import org.apache.inlong.common.pojo.agent.CmdConfig; -import org.apache.inlong.common.pojo.agent.TaskRequest; -import org.apache.inlong.common.pojo.agent.TaskResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - import static java.util.Objects.requireNonNull; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME; -import static org.apache.inlong.agent.constant.AgentConstants.AGENT_HOME; -import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_CACHE; -import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_CACHE_TIMEOUT; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_UNIQ_ID; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_HOME; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_LOCAL_CACHE; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_LOCAL_CACHE_TIMEOUT; import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_UNIQ_ID; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_FETCHER_INTERVAL; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_DBCOLLECT_GETTASK_HTTP_PATH; @@ -92,6 +46,43 @@ import static org.apache.inlong.agent.plugin.utils.PluginUtils.copyJobProfile; import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalIp; import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalUuid; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.inlong.agent.common.AbstractDaemon; +import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.conf.ProfileFetcher; +import org.apache.inlong.agent.conf.TriggerProfile; +import org.apache.inlong.agent.core.AgentManager; +import org.apache.inlong.agent.db.CommandDb; +import org.apache.inlong.agent.plugin.Trigger; +import org.apache.inlong.agent.plugin.utils.PluginUtils; +import org.apache.inlong.agent.pojo.ConfirmAgentIpRequest; +import org.apache.inlong.agent.pojo.DbCollectorTaskRequestDto; +import org.apache.inlong.agent.pojo.DbCollectorTaskResult; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.HttpManager; +import org.apache.inlong.agent.utils.ThreadUtils; +import org.apache.inlong.common.db.CommandEntity; +import org.apache.inlong.common.enums.ManagerOpEnum; +import org.apache.inlong.common.enums.PullJobTypeEnum; +import org.apache.inlong.common.pojo.agent.CmdConfig; +import org.apache.inlong.common.pojo.agent.TaskRequest; +import org.apache.inlong.common.pojo.agent.TaskResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Fetch command from Inlong-Manager */ @@ -108,11 +99,9 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { private final String managerIpsCheckUrl; private final String managerDbCollectorTaskUrl; private final AgentConfiguration conf; - private final LocalFileCache localFileCache; private final String uniqId; private final AgentManager agentManager; private final HttpManager httpManager; - private List<String> managerList; private String localIp; private String uuid; private String clusterName; @@ -129,7 +118,6 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { managerTaskUrl = buildFileCollectTaskUrl(baseManagerUrl); managerIpsCheckUrl = buildIpCheckUrl(baseManagerUrl); managerDbCollectorTaskUrl = buildDbCollectorGetTaskUrl(baseManagerUrl); - localFileCache = getLocalFileCache(); uniqId = conf.get(AGENT_UNIQ_ID, DEFAULT_AGENT_UNIQ_ID); clusterName = conf.get(AGENT_CLUSTER_NAME); this.commandDb = agentManager.getCommandDb(); @@ -144,7 +132,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { /** * build base url for manager according to config - * + * <p> * example - http://127.0.0.1:8080/inlong/manager/openapi */ private String buildBaseUrl() { @@ -155,7 +143,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { /** * build vip url for manager according to config - * + * <p> * example - http://127.0.0.1:8080/inlong/manager/openapi/agent/getManagerIpList */ private String buildVipUrl(String baseUrl) { @@ -164,7 +152,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { /** * build file collect task url for manager according to config - * + * <p> * example - http://127.0.0.1:8080/inlong/manager/openapi/fileAgent/getTaskConf */ private String buildFileCollectTaskUrl(String baseUrl) { @@ -173,7 +161,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { /** * build ip check url for manager according to config - * + * <p> * example - http://127.0.0.1:8080/inlong/manager/openapi/fileAgent/confirmAgentIp */ private String buildIpCheckUrl(String baseUrl) { @@ -182,7 +170,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { /** * build db collector get task url for manager according to config - * + * <p> * example - http://127.0.0.1:8080/inlong/manager/openapi/dbcollector/getTask */ private String buildDbCollectorGetTaskUrl(String baseUrl) { @@ -190,17 +178,6 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { .get(AGENT_MANAGER_DBCOLLECT_GETTASK_HTTP_PATH, DEFAULT_AGENT_MANAGER_DBCOLLECTOR_GETTASK_HTTP_PATH); } - /** - * get localFileCache according to config - */ - private LocalFileCache getLocalFileCache() { - Path localStorage = Paths.get(conf.get(AGENT_HOME, DEFAULT_AGENT_HOME), - conf.get(AGENT_LOCAL_CACHE, DEFAULT_AGENT_LOCAL_CACHE), "managerList.txt"); - long timeout = TimeUnit.MINUTES.toMillis(conf.getInt(AGENT_LOCAL_CACHE_TIMEOUT, - DEFAULT_AGENT_LOCAL_CACHE_TIMEOUT)); - return new LocalFileCache(localStorage.toFile(), timeout); - } - /** * for manager to get job profiles * @@ -215,7 +192,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { /** * request manager to get manager vipUrl list, and store it to local file */ - public void requestTdmList() { + public List<String> requestTdmList() { JsonObject result = getResultData(httpManager.doSendPost(managerVipUrl)); JsonArray data = result.get(AGENT_MANAGER_RETURN_PARAM_DATA).getAsJsonArray(); List<String> managerIpList = new ArrayList<>(); @@ -223,24 +200,26 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { JsonObject asJsonObject = datum.getAsJsonObject(); managerIpList.add(asJsonObject.get(AGENT_MANAGER_RETURN_PARAM_IP).getAsString()); } - if (managerIpList.isEmpty()) { - return; - } - localFileCache.writeToCache(String.join(",", managerIpList)); + return managerIpList; } /** * request manager to get commands, make sure it is not throwing exceptions */ public void fetchCommand() { + LOGGER.info("fetchCommand start"); List<CommandEntity> unackedCommands = commandDb.getUnackedCommands(); String resultStr = httpManager.doSentPost(managerTaskUrl, getFetchRequest(unackedCommands)); JsonObject resultData = getResultData(resultStr); JsonElement element = resultData.get(AGENT_MANAGER_RETURN_PARAM_DATA); if (element != null) { + LOGGER.info("fetchCommand not null {}", resultData); dealWithFetchResult(GSON.fromJson(element.getAsJsonObject(), TaskResult.class)); + } else { + LOGGER.info("fetchCommand nothing to do"); } ackCommands(unackedCommands); + LOGGER.info("fetchCommand end"); } private void ackCommands(List<CommandEntity> unackedCommands) { @@ -274,7 +253,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { if (profile == null) { return; } - agentManager.getJobManager().submitJobProfile(profile, true); + agentManager.getJobManager().submitJobProfile(profile, true, true); } /** @@ -404,14 +383,19 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { try { switch (requireNonNull(opType)) { case ACTIVE: + agentManager.getTriggerManager().submitTrigger(triggerProfile, false); + break; case ADD: - agentManager.getTriggerManager().submitTrigger(triggerProfile); + agentManager.getTriggerManager().submitTrigger(triggerProfile, true); break; case DEL: + agentManager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId(), false); + break; case FROZEN: - agentManager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId()); + agentManager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId(), true); break; default: + LOGGER.error("can not handle option type {}", opType); } } catch (Exception e) { LOGGER.error("Deal with trigger profile err.", e); @@ -429,12 +413,16 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { try { switch (requireNonNull(opType)) { case ACTIVE: + success = agentManager.getJobManager().submitJobProfile(triggerProfile, true, false); + break; case ADD: - success = agentManager.getJobManager().submitJobProfile(triggerProfile, true); + success = agentManager.getJobManager().submitJobProfile(triggerProfile, true, true); break; case DEL: + success = agentManager.getJobManager().deleteJob(triggerProfile.getTriggerId(), false); + break; case FROZEN: - success = agentManager.getJobManager().deleteJob(triggerProfile.getTriggerId()); + success = agentManager.getJobManager().deleteJob(triggerProfile.getTriggerId(), true); break; default: } @@ -458,31 +446,6 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { return resultData.get(AGENT_MANAGER_RETURN_PARAM_IP).getAsString(); } - /** - * fetch manager list, make sure it's not throwing exceptions - * - * @param isInitial is initial - * @param retryTime retry time - */ - private void fetchTdmList(boolean isInitial, int retryTime) { - if (retryTime > MAX_RETRY) { - return; - } - try { - // check local cache time, make sure cache not timeout - if (!isInitial && !localFileCache.cacheIsExpired()) { - String result = localFileCache.getCacheInfo(); - managerList = Arrays.stream(result.split(",")) - .map(String::trim) - .collect(Collectors.toList()); - } else { - requestTdmList(); - } - } catch (Exception ex) { - fetchTdmList(false, retryTime + 1); - } - } - /** * thread for profile fetcher. * @@ -498,10 +461,6 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { TimeUnit.SECONDS.sleep(AgentUtils.getRandomBySeed(configSleepTime)); // fetch commands from manager fetchCommand(); - - // fetch manager list from vip - fetchTdmList(false, 0); - // fetch db collector task from manager fetchDbCollectTask(); } catch (Throwable ex) { @@ -527,7 +486,6 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { // when agent start, check local ip and fetch manager ip list; localIp = fetchLocalIp(); uuid = fetchLocalUuid(); - fetchTdmList(true, 0); submitWorker(profileFetchThread()); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java index 090ab2c33..04f51f96f 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java @@ -17,6 +17,16 @@ package org.apache.inlong.agent.plugin.sinks; +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.JobProfile; import org.apache.inlong.agent.constant.CommonConstants; @@ -31,15 +41,6 @@ import org.apache.inlong.agent.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER; - /** * sink message data to inlong-dataproxy */ @@ -60,36 +61,58 @@ public class ProxySink extends AbstractSink { @Override public void write(Message message) { + boolean suc = false; + while (!suc) { + suc = putInCache(message); + if (!suc) { + AgentUtils.silenceSleepInMs(batchFlushInterval); + } + } + } + + private boolean putInCache(Message message) { try { - if (message != null) { + if (message == null) { + return true; + } + message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, inlongGroupId); + message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, inlongStreamId); + extractStreamFromMessage(message, fieldSplitter); + if (message instanceof EndMessage) { + // increment the count of failed sinks + sinkMetric.sinkFailCount.incrementAndGet(); + return true; + } + AtomicBoolean suc = new AtomicBoolean(false); + ProxyMessage proxyMessage = new ProxyMessage(message); + // add proxy message to cache. + cache.compute(proxyMessage.getBatchKey(), + (s, packProxyMessage) -> { + if (packProxyMessage == null) { + packProxyMessage = new PackProxyMessage(jobInstanceId, jobConf, inlongGroupId, + proxyMessage.getInlongStreamId()); + packProxyMessage.generateExtraMap(proxyMessage.getDataKey()); + } + // add message to package proxy + suc.set(packProxyMessage.addProxyMessage(proxyMessage)); + return packProxyMessage; + }); + if (suc.get()) { + // semaphore should be acquired only when the message was put in cache successfully senderManager.acquireSemaphore(1); - message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, inlongGroupId); - message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, inlongStreamId); - extractStreamFromMessage(message, fieldSplitter); - if (!(message instanceof EndMessage)) { - ProxyMessage proxyMessage = new ProxyMessage(message); - // add proxy message to cache. - cache.compute(proxyMessage.getBatchKey(), - (s, packProxyMessage) -> { - if (packProxyMessage == null) { - packProxyMessage = new PackProxyMessage(jobInstanceId, jobConf, inlongGroupId, - proxyMessage.getInlongStreamId()); - packProxyMessage.generateExtraMap(proxyMessage.getDataKey()); - } - // add message to package proxy - packProxyMessage.addProxyMessage(proxyMessage); - return packProxyMessage; - }); - // increment the count of successful sinks - sinkMetric.sinkSuccessCount.incrementAndGet(); - } + // increment the count of successful sinks + sinkMetric.sinkSuccessCount.incrementAndGet(); + } else { + // increment the count of failed sinks + sinkMetric.sinkFailCount.incrementAndGet(); } + return suc.get(); } catch (Exception e) { - sinkMetric.sinkFailCount.incrementAndGet(); LOGGER.error("write message to Proxy sink error", e); } catch (Throwable t) { ThreadUtils.threadThrowableHandler(Thread.currentThread(), t); } + return false; } /** @@ -125,11 +148,12 @@ public class ProxySink extends AbstractSink { } }); - AgentUtils.silenceSleepInMs(batchFlushInterval); } catch (Exception ex) { LOGGER.error("error caught", ex); } catch (Throwable t) { ThreadUtils.threadThrowableHandler(Thread.currentThread(), t); + } finally { + AgentUtils.silenceSleepInMs(batchFlushInterval); } } }; @@ -145,6 +169,7 @@ public class ProxySink extends AbstractSink { senderManager = new SenderManager(jobConf, inlongGroupId, sourceName); try { senderManager.addMessageSender(); + senderManager.Start(); } catch (Throwable ex) { LOGGER.error("error while init sender for group id {}", inlongGroupId); ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); @@ -161,6 +186,7 @@ public class ProxySink extends AbstractSink { } shutdown = true; executorService.shutdown(); + senderManager.Stop(); } /** diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java index bf4aa6f50..599a7c46b 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java @@ -234,11 +234,12 @@ public class PulsarSink extends AbstractSink { } } }); - AgentUtils.silenceSleepInMs(batchFlushInterval); } catch (Exception ex) { LOGGER.error("error caught", ex); } catch (Throwable t) { ThreadUtils.threadThrowableHandler(Thread.currentThread(), t); + } finally { + AgentUtils.silenceSleepInMs(batchFlushInterval); } } }; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java index 032deebb3..cf2725035 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java @@ -18,8 +18,6 @@ package org.apache.inlong.agent.plugin.sources; import org.apache.inlong.agent.conf.JobProfile; -import org.apache.inlong.agent.constant.DataCollectType; -import org.apache.inlong.agent.constant.JobConstants; import org.apache.inlong.agent.plugin.Reader; import org.apache.inlong.agent.plugin.sources.reader.file.FileReaderOperator; import org.apache.inlong.agent.plugin.sources.reader.file.TriggerFileReader; @@ -29,9 +27,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.io.LineNumberReader; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -84,17 +79,6 @@ public class TextFileSource extends AbstractSource { private int getStartPosition(JobProfile jobConf, File file) { int seekPosition; - if (jobConf.hasKey(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE) && DataCollectType.INCREMENT - .equalsIgnoreCase(jobConf.get(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE))) { - try (LineNumberReader lineNumberReader = new LineNumberReader(new FileReader(file.getPath()))) { - lineNumberReader.skip(Long.MAX_VALUE); - seekPosition = lineNumberReader.getLineNumber(); - return seekPosition; - } catch (IOException ex) { - LOGGER.error("get position error, file absolute path: {}", file.getAbsolutePath()); - throw new RuntimeException(ex); - } - } seekPosition = jobConf.getInt(file.getAbsolutePath() + POSITION_SUFFIX, 0); return seekPosition; } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java index 6fcbf481c..526c17d27 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java @@ -42,7 +42,9 @@ import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_INT public final class MonitorTextFile { private static final Logger LOGGER = LoggerFactory.getLogger(MonitorTextFile.class); - // monitor thread pool + /** + * monitor thread pool + */ private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, @@ -165,7 +167,7 @@ public final class MonitorTextFile { } /** - * reset the position and bytePositionreset the position and bytePosition + * Reset the position and bytePosition */ private void resetPosition() { LOGGER.info("reset position {}", fileReaderOperator.file.toPath()); @@ -182,18 +184,15 @@ public final class MonitorTextFile { /** * Determine whether the inode has changed * - * @param currentFileKey - * @return + * @param currentFileKey current file key + * @return true if the inode changed, otherwise false */ private boolean isInodeChanged(String currentFileKey) { if (fileReaderOperator.fileKey == null || currentFileKey == null) { return false; } - if (fileReaderOperator.fileKey.equals(currentFileKey)) { - return false; - } - return true; + return !fileReaderOperator.fileKey.equals(currentFileKey); } } } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java index 77c2efadc..4556a1474 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java @@ -93,18 +93,19 @@ public class MiniAgent { } public void submitTrigger(TriggerProfile triggerProfile) { - manager.getTriggerManager().submitTrigger(triggerProfile); + manager.getTriggerManager().submitTrigger(triggerProfile, true); triggerProfileCache.add(triggerProfile); } public void cleanupJobs() { - jobProfileCache.forEach(jobProfile -> manager.getJobManager().deleteJob(jobProfile.getInstanceId())); + jobProfileCache.forEach(jobProfile -> manager.getJobManager().deleteJob(jobProfile.getInstanceId(), false)); jobProfileCache.clear(); } public void cleanupTriggers() { triggerProfileCache - .forEach(triggerProfile -> manager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId())); + .forEach(triggerProfile -> manager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId(), + false)); triggerProfileCache.clear(); } } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java index fc5cd75e2..3d41ef0a4 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java @@ -163,7 +163,7 @@ public class TestTriggerManager { }); // shutdown trigger - agent.getManager().getTriggerManager().deleteTrigger(triggerProfile1.getTriggerId()); + agent.getManager().getTriggerManager().deleteTrigger(triggerProfile1.getTriggerId(), false); await().atMost(10, TimeUnit.SECONDS).until(() -> trigger.getWatchers().size() == 0); TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() + "/1.log"); }