This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 4b99eac73d859906e5c4ab6d80628a9c57ab1bf2
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Mon May 15 19:16:35 2023 +0800

    [INLONG-8026][Agent] Improve the Agent performance (#8027)
    
    * Improve the Agent performance
    
    * Add InLong Changelog
    
    * Add InLong Changelog
    
    ---------
    
    Co-authored-by: doleyzi <dole...@tencent.com>
---
 CHANGES.md                                         |   4 +-
 .../inlong/agent/message/BatchProxyMessage.java    |   1 -
 .../inlong/agent/message/PackProxyMessage.java     |  54 ++++---
 .../org/apache/inlong/agent/core/AgentManager.java |  28 ++--
 .../apache/inlong/agent/core/conf/ConfigJetty.java |  17 +--
 .../apache/inlong/agent/core/job/JobManager.java   |  72 +++++----
 .../inlong/agent/core/trigger/TriggerManager.java  |  31 ++--
 .../agent/plugin/fetcher/ManagerFetcher.java       | 168 ++++++++-------------
 .../inlong/agent/plugin/sinks/ProxySink.java       |  90 +++++++----
 .../inlong/agent/plugin/sinks/PulsarSink.java      |   3 +-
 .../agent/plugin/sources/TextFileSource.java       |  16 --
 .../sources/reader/file/MonitorTextFile.java       |  15 +-
 .../org/apache/inlong/agent/plugin/MiniAgent.java  |   7 +-
 .../agent/plugin/trigger/TestTriggerManager.java   |   2 +-
 14 files changed, 255 insertions(+), 253 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index bac536234..48c92680f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -27,7 +27,9 @@
 
|:-----------------------------------------------------------:|:---------------------------------------------------------------------------------------------------------------------------------------------------|
 | [INLONG-7847](https://github.com/apache/inlong/issues/7847) | [Bug][Agent] 
Failed to create MySQL reader                                                   
                                                      |
 | [INLONG-7783](https://github.com/apache/inlong/issues/7783) | 
[Feature][Agent] Support sink data tor Kafka                                    
                                                                   |
-| [INLONG-7752](https://github.com/apache/inlong/issues/7752) | [Bug][Agent] 
PulsarSink threadPool throw reject exception                                    
                                                      |
+| [INLONG-7752](https://github.com/apache/inlong/issues/7752) | [Bug][Agent] 
PulsarSink threadPool throw reject exception                                    
                                                      |                         
+| [INLONG-7976](https://github.com/apache/inlong/issues/7976) | [Bug][Agent] 
The data collected by the agent is incomplete                                   
                                                      |
+| [INLONG-8026](https://github.com/apache/inlong/issues/8026) | 
[Improve][Agent] Improve the Agent performance                                  
                                                                   |
 
 ### DataProxy
 |                            ISSUE                            | Summary        
                                                                                
          |
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
index 39be15437..c34d8eaf5 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
@@ -41,7 +41,6 @@ public class BatchProxyMessage {
     private List<byte[]> dataList;
     private long dataTime;
     private Map<String, String> extraMap;
-    private boolean isSyncSend;
 
     public InLongMsg getInLongMsg() {
         InLongMsg message = InLongMsg.newInLongMsg(true);
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
index 6eb54cd1c..61b233abe 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
@@ -17,30 +17,28 @@
 
 package org.apache.inlong.agent.message;
 
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.common.msg.AttributeConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_SYNC;
 import static org.apache.inlong.common.msg.AttributeConstants.DATA_TIME;
 import static org.apache.inlong.common.msg.AttributeConstants.MESSAGE_TOPIC;
 import static org.apache.inlong.common.msg.AttributeConstants.STREAM_ID;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Handle List of BusMessage, which belong to the same stream id.
  */
@@ -58,7 +56,6 @@ public class PackProxyMessage {
     // streamId -> list of proxyMessage
     private final LinkedBlockingQueue<ProxyMessage> messageQueue;
     private final AtomicLong queueSize = new AtomicLong(0);
-    private boolean syncSend;
     private int currentSize;
     /**
      * extra map used when sending to dataproxy
@@ -79,9 +76,7 @@ public class PackProxyMessage {
         this.messageQueue = new LinkedBlockingQueue<>(maxQueueNumber);
         this.groupId = groupId;
         this.streamId = streamId;
-        // handle syncSend flag
-        this.syncSend = jobConf.getBoolean(PROXY_SEND_SYNC, false);
-        extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, 
String.valueOf(syncSend));
+        extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
     }
 
     public void generateExtraMap(String dataKey) {
@@ -106,18 +101,21 @@ public class PackProxyMessage {
     /**
      * Add proxy message to cache, proxy message should belong to the same 
stream id.
      */
-    public void addProxyMessage(ProxyMessage message) {
+    public boolean addProxyMessage(ProxyMessage message) {
         assert streamId.equals(message.getInlongStreamId());
         try {
             if (queueIsFull()) {
                 LOGGER.warn("message queue is greater than {}, stop adding 
message, "
                         + "maybe proxy get stuck", maxQueueNumber);
+                return false;
             }
             messageQueue.put(message);
             queueSize.addAndGet(message.getBody().length);
+            return true;
         } catch (Exception ex) {
             LOGGER.error("exception caught", ex);
         }
+        return false;
     }
 
     /**
@@ -144,8 +142,19 @@ public class PackProxyMessage {
             while (!messageQueue.isEmpty()) {
                 // pre check message size
                 ProxyMessage peekMessage = messageQueue.peek();
-                if (peekMessage == null
-                        || resultBatchSize + peekMessage.getBody().length > 
maxPackSize) {
+                if (peekMessage == null) {
+                    break;
+                }
+
+                // if the message size is greater than max pack size,should 
drop it.
+                int peekMessageLength = peekMessage.getBody().length;
+                if (peekMessageLength > maxPackSize) {
+                    LOGGER.warn("message size is {}, greater than max pack 
size {}, drop it!",
+                            peekMessage.getBody().length, maxPackSize);
+                    messageQueue.remove();
+                    break;
+                }
+                if (resultBatchSize + peekMessageLength > maxPackSize) {
                     break;
                 }
                 ProxyMessage message = messageQueue.remove();
@@ -159,8 +168,7 @@ public class PackProxyMessage {
             }
             // make sure result is not empty.
             if (!result.isEmpty()) {
-                return new BatchProxyMessage(jobId, groupId, streamId, result, 
AgentUtils.getCurrentTime(), extraMap,
-                        syncSend);
+                return new BatchProxyMessage(jobId, groupId, streamId, result, 
AgentUtils.getCurrentTime(), extraMap);
             }
         }
         return null;
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index 42e058823..a571ee9d9 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -17,6 +17,16 @@
 
 package org.apache.inlong.agent.core;
 
+import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CONF_PARENT;
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_CONF_PARENT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 import org.apache.inlong.agent.common.AbstractDaemon;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.JobProfile;
@@ -36,16 +46,6 @@ import org.apache.inlong.agent.db.TriggerProfileDb;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.lang.reflect.Constructor;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CONF_PARENT;
-import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_CONF_PARENT;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
-
 /**
  * Agent Manager, the bridge for job manager, task manager, db e.t.c it 
manages agent level operations and communicates
  * with outside system.
@@ -196,11 +196,17 @@ public class AgentManager extends AbstractDaemon {
     public void start() throws Exception {
         LOGGER.info("starting agent manager");
         agentConfMonitor.submit(startHotConfReplace());
+        LOGGER.info("starting job manager");
         jobManager.start();
+        LOGGER.info("starting trigger manager");
         triggerManager.start();
+        LOGGER.info("starting task manager");
         taskManager.start();
+        LOGGER.info("starting heartbeat manager");
         heartbeatManager.start();
+        LOGGER.info("starting task position manager");
         taskPositionManager.start();
+        LOGGER.info("starting read job from local");
         // read job profiles from local
         List<JobProfile> profileList = localProfile.readFromLocal();
         for (JobProfile profile : profileList) {
@@ -215,9 +221,11 @@ public class AgentManager extends AbstractDaemon {
                 jobManager.submitFileJobProfile(profile);
             }
         }
+        LOGGER.info("starting fetcher");
         if (fetcher != null) {
             fetcher.start();
         }
+        LOGGER.info("starting agent manager end");
     }
 
     /**
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
index d29cecdb7..cf9675e05 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
@@ -17,6 +17,10 @@
 
 package org.apache.inlong.agent.core.conf;
 
+import static org.apache.inlong.agent.constant.JobConstants.JOB_SOURCE_TYPE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
+
+import java.io.Closeable;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.conf.TriggerProfile;
@@ -32,11 +36,6 @@ import org.eclipse.jetty.servlet.ServletHolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-
-import static org.apache.inlong.agent.constant.JobConstants.JOB_SOURCE_TYPE;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
-
 /**
  * start http server and get job/agent config via http
  */
@@ -88,7 +87,7 @@ public class ConfigJetty implements Closeable {
             // trigger job is a special kind of job
             if (jobProfile.hasKey(JOB_TRIGGER)) {
                 triggerManager.submitTrigger(
-                        TriggerProfile.parseJsonStr(jobProfile.toJsonStr()));
+                        TriggerProfile.parseJsonStr(jobProfile.toJsonStr()), 
true);
             } else {
                 TaskTypeEnum taskType = TaskTypeEnum
                         .getTaskType(jobProfile.getInt(JOB_SOURCE_TYPE));
@@ -99,7 +98,7 @@ public class ConfigJetty implements Closeable {
                     case KAFKA:
                     case BINLOG:
                     case SQL:
-                        jobManager.submitJobProfile(jobProfile, true);
+                        jobManager.submitJobProfile(jobProfile, true, true);
                         break;
                     default:
                         LOGGER.error("source type not supported {}", taskType);
@@ -123,9 +122,9 @@ public class ConfigJetty implements Closeable {
     public void deleteJobConf(JobProfile jobProfile) {
         if (jobProfile != null) {
             if (jobProfile.hasKey(JOB_TRIGGER)) {
-                
triggerManager.deleteTrigger(TriggerProfile.parseJobProfile(jobProfile).getTriggerId());
+                
triggerManager.deleteTrigger(TriggerProfile.parseJobProfile(jobProfile).getTriggerId(),
 false);
             } else {
-                jobManager.deleteJob(jobProfile.getInstanceId());
+                jobManager.deleteJob(jobProfile.getInstanceId(), false);
             }
         }
     }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
index c4171d95f..f91285b2e 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
@@ -17,6 +17,27 @@
 
 package org.apache.inlong.agent.core.job;
 
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL;
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_TIME;
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT;
+import static 
org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_CHECK_INTERVAL;
+import static 
org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_TIME;
+import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT;
+import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
+import static org.apache.inlong.agent.constant.JobConstants.SQL_JOB_ID;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.inlong.agent.common.AbstractDaemon;
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
@@ -34,27 +55,6 @@ import org.apache.inlong.common.metric.MetricRegister;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL;
-import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_TIME;
-import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT;
-import static 
org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_CHECK_INTERVAL;
-import static 
org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_TIME;
-import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
-import static org.apache.inlong.agent.constant.JobConstants.SQL_JOB_ID;
-import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME;
-
 /**
  * JobManager maintains lots of jobs, and communicate between server and task 
manager.
  */
@@ -143,7 +143,7 @@ public class JobManager extends AbstractDaemon {
      * @param profile job profile.
      */
     public boolean submitFileJobProfile(JobProfile profile) {
-        return submitJobProfile(profile, false);
+        return submitJobProfile(profile, false, true);
     }
 
     /**
@@ -151,7 +151,7 @@ public class JobManager extends AbstractDaemon {
      *
      * @param profile job profile.
      */
-    public boolean submitJobProfile(JobProfile profile, boolean singleJob) {
+    public boolean submitJobProfile(JobProfile profile, boolean singleJob, 
boolean isNewJob) {
         if (!isJobValid(profile)) {
             return false;
         }
@@ -161,8 +161,19 @@ public class JobManager extends AbstractDaemon {
         } else {
             profile.set(JOB_INSTANCE_ID, AgentUtils.getUniqId(JOB_ID_PREFIX, 
jobId, index.incrementAndGet()));
         }
-        LOGGER.info("submit job profile {}", profile.toJsonStr());
-        getJobConfDb().storeJobFirstTime(profile);
+        LOGGER.info("submit job profile {} isNewJob {}", profile.toJsonStr(), 
isNewJob);
+        if (isNewJob) {
+            jobProfileDb.storeJobFirstTime(profile);
+        } else {
+            JobProfile jobFromDb = 
jobProfileDb.getJobById(profile.getInstanceId());
+            if (jobFromDb != null) {
+                jobFromDb.set(JOB_VERSION, profile.get(JOB_VERSION));
+                profile = jobFromDb;
+            } else {
+                LOGGER.info("submit job final profile null");
+            }
+        }
+        LOGGER.info("submit job final profile {}", profile.toJsonStr());
         addJob(new Job(profile));
         return true;
     }
@@ -192,13 +203,15 @@ public class JobManager extends AbstractDaemon {
      *
      * @param jobInstancId
      */
-    public boolean deleteJob(String jobInstancId) {
+    public boolean deleteJob(String jobInstancId, boolean isFrozen) {
         LOGGER.info("start to delete job, job id set {}", jobs.keySet());
         JobWrapper jobWrapper = jobs.remove(jobInstancId);
         if (jobWrapper != null) {
-            LOGGER.info("delete job instance with job id {}", jobInstancId);
+            LOGGER.info("delete job instance with job id {} isFrozen {}", 
jobInstancId, isFrozen);
             jobWrapper.cleanup();
-            getJobConfDb().deleteJob(jobInstancId);
+            if (!isFrozen) {
+                jobProfileDb.deleteJob(jobInstancId);
+            }
             return true;
         }
         return true;
@@ -208,7 +221,7 @@ public class JobManager extends AbstractDaemon {
      * start all accepted jobs.
      */
     private void startJobs() {
-        List<JobProfile> profileList = getJobConfDb().getRestartJobs();
+        List<JobProfile> profileList = jobProfileDb.getRestartJobs();
         for (JobProfile profile : profileList) {
             LOGGER.info("init starting job from db {}", profile.toJsonStr());
             addJob(new Job(profile));
@@ -283,6 +296,7 @@ public class JobManager extends AbstractDaemon {
      * @param jobId job id
      */
     public void markJobAsFailed(String jobId) {
+        LOGGER.info("markJobAsFailed {}", jobId);
         JobWrapper wrapper = jobs.remove(jobId);
         if (wrapper != null) {
             LOGGER.info("job instance {} is failed", jobId);
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
index e73f6b8d6..0f8c4a077 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
@@ -17,7 +17,15 @@
 
 package org.apache.inlong.agent.core.trigger;
 
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_TRIGGER_MAX_RUNNING_NUM;
+import static 
org.apache.inlong.agent.constant.AgentConstants.TRIGGER_MAX_RUNNING_NUM;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
+
 import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.common.AbstractDaemon;
 import org.apache.inlong.agent.conf.AgentConfiguration;
@@ -33,15 +41,6 @@ import org.apache.inlong.agent.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_TRIGGER_MAX_RUNNING_NUM;
-import static 
org.apache.inlong.agent.constant.AgentConstants.TRIGGER_MAX_RUNNING_NUM;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
-
 /**
  * manager for triggers.
  */
@@ -76,7 +75,7 @@ public class TriggerManager extends AbstractDaemon {
             Trigger trigger = (Trigger) triggerClass.newInstance();
             String triggerId = triggerProfile.get(JOB_ID);
             if (triggerMap.containsKey(triggerId)) {
-                deleteTrigger(triggerId);
+                deleteTrigger(triggerId, false);
                 LOGGER.warn("trigger {} is running, stop it", triggerId);
             }
             triggerMap.put(triggerId, trigger);
@@ -101,7 +100,7 @@ public class TriggerManager extends AbstractDaemon {
      *
      * @param triggerProfile trigger profile
      */
-    public void submitTrigger(TriggerProfile triggerProfile) {
+    public void submitTrigger(TriggerProfile triggerProfile, boolean isNewJob) 
{
         // make sure all required key exists.
         if (!triggerProfile.allRequiredKeyExist() || this.triggerMap.size() > 
maxRunningNum) {
             throw new IllegalArgumentException(
@@ -117,7 +116,7 @@ public class TriggerManager extends AbstractDaemon {
 
         LOGGER.info("submit trigger {}", triggerProfile.toJsonStr());
         // This action must be done before saving in db, because the 
job.instance.id is needed for the next recovery
-        manager.getJobManager().submitJobProfile(triggerProfile, true);
+        manager.getJobManager().submitJobProfile(triggerProfile, true, 
isNewJob);
         triggerProfileDB.storeTrigger(triggerProfile);
         restoreTrigger(triggerProfile);
     }
@@ -129,7 +128,7 @@ public class TriggerManager extends AbstractDaemon {
      *
      * @param triggerId trigger profile.
      */
-    public void deleteTrigger(String triggerId) {
+    public void deleteTrigger(String triggerId, boolean isFrozen) {
         // repeat check
         if (!triggerProfileDB.getTriggers().stream()
                 .anyMatch(profile -> 
profile.getTriggerId().equals(triggerId))) {
@@ -139,7 +138,7 @@ public class TriggerManager extends AbstractDaemon {
         LOGGER.info("delete trigger {}", triggerId);
         Trigger trigger = triggerMap.remove(triggerId);
         if (trigger != null) {
-            
manager.getJobManager().deleteJob(trigger.getTriggerProfile().getInstanceId());
+            
manager.getJobManager().deleteJob(trigger.getTriggerProfile().getInstanceId(), 
isFrozen);
             trigger.destroy();
         }
         triggerProfileDB.deleteTrigger(triggerId);
@@ -157,6 +156,10 @@ public class TriggerManager extends AbstractDaemon {
                         if (profile != null) {
                             Map<String, JobWrapper> jobWrapperMap = 
manager.getJobManager().getJobs();
                             JobWrapper job = 
jobWrapperMap.get(trigger.getTriggerProfile().getInstanceId());
+                            if (job == null) {
+                                LOGGER.error("job {} should not be null", 
trigger.getTriggerProfile().getInstanceId());
+                                return;
+                            }
                             String subTaskFile = 
profile.getOrDefault(JobConstants.JOB_DIR_FILTER_PATTERNS, "");
                             
Preconditions.checkArgument(StringUtils.isNotBlank(subTaskFile),
                                     String.format("Trigger %s fetched task 
file should not be null.", s));
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index c8f2168c7..44ec85ad5 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -17,55 +17,9 @@
 
 package org.apache.inlong.agent.plugin.fetcher;
 
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import org.apache.inlong.agent.cache.LocalFileCache;
-import org.apache.inlong.agent.common.AbstractDaemon;
-import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.conf.ProfileFetcher;
-import org.apache.inlong.agent.conf.TriggerProfile;
-import org.apache.inlong.agent.core.AgentManager;
-import org.apache.inlong.agent.db.CommandDb;
-import org.apache.inlong.agent.plugin.Trigger;
-import org.apache.inlong.agent.plugin.utils.PluginUtils;
-import org.apache.inlong.agent.pojo.ConfirmAgentIpRequest;
-import org.apache.inlong.agent.pojo.DbCollectorTaskRequestDto;
-import org.apache.inlong.agent.pojo.DbCollectorTaskResult;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.HttpManager;
-import org.apache.inlong.agent.utils.ThreadUtils;
-import org.apache.inlong.common.db.CommandEntity;
-import org.apache.inlong.common.enums.ManagerOpEnum;
-import org.apache.inlong.common.enums.PullJobTypeEnum;
-import org.apache.inlong.common.pojo.agent.CmdConfig;
-import org.apache.inlong.common.pojo.agent.TaskRequest;
-import org.apache.inlong.common.pojo.agent.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
-import static org.apache.inlong.agent.constant.AgentConstants.AGENT_HOME;
-import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_CACHE;
-import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_CACHE_TIMEOUT;
 import static org.apache.inlong.agent.constant.AgentConstants.AGENT_UNIQ_ID;
-import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_HOME;
-import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_LOCAL_CACHE;
-import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_LOCAL_CACHE_TIMEOUT;
 import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_UNIQ_ID;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_FETCHER_INTERVAL;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_DBCOLLECT_GETTASK_HTTP_PATH;
@@ -92,6 +46,43 @@ import static 
org.apache.inlong.agent.plugin.utils.PluginUtils.copyJobProfile;
 import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalIp;
 import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalUuid;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.inlong.agent.common.AbstractDaemon;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.conf.ProfileFetcher;
+import org.apache.inlong.agent.conf.TriggerProfile;
+import org.apache.inlong.agent.core.AgentManager;
+import org.apache.inlong.agent.db.CommandDb;
+import org.apache.inlong.agent.plugin.Trigger;
+import org.apache.inlong.agent.plugin.utils.PluginUtils;
+import org.apache.inlong.agent.pojo.ConfirmAgentIpRequest;
+import org.apache.inlong.agent.pojo.DbCollectorTaskRequestDto;
+import org.apache.inlong.agent.pojo.DbCollectorTaskResult;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.HttpManager;
+import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.db.CommandEntity;
+import org.apache.inlong.common.enums.ManagerOpEnum;
+import org.apache.inlong.common.enums.PullJobTypeEnum;
+import org.apache.inlong.common.pojo.agent.CmdConfig;
+import org.apache.inlong.common.pojo.agent.TaskRequest;
+import org.apache.inlong.common.pojo.agent.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Fetch command from Inlong-Manager
  */
@@ -108,11 +99,9 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
     private final String managerIpsCheckUrl;
     private final String managerDbCollectorTaskUrl;
     private final AgentConfiguration conf;
-    private final LocalFileCache localFileCache;
     private final String uniqId;
     private final AgentManager agentManager;
     private final HttpManager httpManager;
-    private List<String> managerList;
     private String localIp;
     private String uuid;
     private String clusterName;
@@ -129,7 +118,6 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
             managerTaskUrl = buildFileCollectTaskUrl(baseManagerUrl);
             managerIpsCheckUrl = buildIpCheckUrl(baseManagerUrl);
             managerDbCollectorTaskUrl = 
buildDbCollectorGetTaskUrl(baseManagerUrl);
-            localFileCache = getLocalFileCache();
             uniqId = conf.get(AGENT_UNIQ_ID, DEFAULT_AGENT_UNIQ_ID);
             clusterName = conf.get(AGENT_CLUSTER_NAME);
             this.commandDb = agentManager.getCommandDb();
@@ -144,7 +132,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
 
     /**
      * build base url for manager according to config
-     *
+     * <p>
      * example - http://127.0.0.1:8080/inlong/manager/openapi
      */
     private String buildBaseUrl() {
@@ -155,7 +143,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
 
     /**
      * build vip url for manager according to config
-     *
+     * <p>
      * example - 
http://127.0.0.1:8080/inlong/manager/openapi/agent/getManagerIpList
      */
     private String buildVipUrl(String baseUrl) {
@@ -164,7 +152,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
 
     /**
      * build file collect task url for manager according to config
-     *
+     * <p>
      * example - 
http://127.0.0.1:8080/inlong/manager/openapi/fileAgent/getTaskConf
      */
     private String buildFileCollectTaskUrl(String baseUrl) {
@@ -173,7 +161,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
 
     /**
      * build ip check url for manager according to config
-     *
+     * <p>
      * example - 
http://127.0.0.1:8080/inlong/manager/openapi/fileAgent/confirmAgentIp
      */
     private String buildIpCheckUrl(String baseUrl) {
@@ -182,7 +170,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
 
     /**
      * build db collector get task url for manager according to config
-     *
+     * <p>
      * example - 
http://127.0.0.1:8080/inlong/manager/openapi/dbcollector/getTask
      */
     private String buildDbCollectorGetTaskUrl(String baseUrl) {
@@ -190,17 +178,6 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
                 .get(AGENT_MANAGER_DBCOLLECT_GETTASK_HTTP_PATH, 
DEFAULT_AGENT_MANAGER_DBCOLLECTOR_GETTASK_HTTP_PATH);
     }
 
-    /**
-     * get localFileCache according to config
-     */
-    private LocalFileCache getLocalFileCache() {
-        Path localStorage = Paths.get(conf.get(AGENT_HOME, DEFAULT_AGENT_HOME),
-                conf.get(AGENT_LOCAL_CACHE, DEFAULT_AGENT_LOCAL_CACHE), 
"managerList.txt");
-        long timeout = 
TimeUnit.MINUTES.toMillis(conf.getInt(AGENT_LOCAL_CACHE_TIMEOUT,
-                DEFAULT_AGENT_LOCAL_CACHE_TIMEOUT));
-        return new LocalFileCache(localStorage.toFile(), timeout);
-    }
-
     /**
      * for manager to get job profiles
      *
@@ -215,7 +192,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
     /**
      * request manager to get manager vipUrl list, and store it to local file
      */
-    public void requestTdmList() {
+    public List<String> requestTdmList() {
         JsonObject result = 
getResultData(httpManager.doSendPost(managerVipUrl));
         JsonArray data = 
result.get(AGENT_MANAGER_RETURN_PARAM_DATA).getAsJsonArray();
         List<String> managerIpList = new ArrayList<>();
@@ -223,24 +200,26 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
             JsonObject asJsonObject = datum.getAsJsonObject();
             
managerIpList.add(asJsonObject.get(AGENT_MANAGER_RETURN_PARAM_IP).getAsString());
         }
-        if (managerIpList.isEmpty()) {
-            return;
-        }
-        localFileCache.writeToCache(String.join(",", managerIpList));
+        return managerIpList;
     }
 
     /**
      * request manager to get commands, make sure it is not throwing exceptions
      */
     public void fetchCommand() {
+        LOGGER.info("fetchCommand start");
         List<CommandEntity> unackedCommands = commandDb.getUnackedCommands();
         String resultStr = httpManager.doSentPost(managerTaskUrl, 
getFetchRequest(unackedCommands));
         JsonObject resultData = getResultData(resultStr);
         JsonElement element = resultData.get(AGENT_MANAGER_RETURN_PARAM_DATA);
         if (element != null) {
+            LOGGER.info("fetchCommand not null {}", resultData);
             dealWithFetchResult(GSON.fromJson(element.getAsJsonObject(), 
TaskResult.class));
+        } else {
+            LOGGER.info("fetchCommand nothing to do");
         }
         ackCommands(unackedCommands);
+        LOGGER.info("fetchCommand end");
     }
 
     private void ackCommands(List<CommandEntity> unackedCommands) {
@@ -274,7 +253,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
         if (profile == null) {
             return;
         }
-        agentManager.getJobManager().submitJobProfile(profile, true);
+        agentManager.getJobManager().submitJobProfile(profile, true, true);
     }
 
     /**
@@ -404,14 +383,19 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
         try {
             switch (requireNonNull(opType)) {
                 case ACTIVE:
+                    
agentManager.getTriggerManager().submitTrigger(triggerProfile, false);
+                    break;
                 case ADD:
-                    
agentManager.getTriggerManager().submitTrigger(triggerProfile);
+                    
agentManager.getTriggerManager().submitTrigger(triggerProfile, true);
                     break;
                 case DEL:
+                    
agentManager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId(), 
false);
+                    break;
                 case FROZEN:
-                    
agentManager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId());
+                    
agentManager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId(), 
true);
                     break;
                 default:
+                    LOGGER.error("can not handle option type {}", opType);
             }
         } catch (Exception e) {
             LOGGER.error("Deal with trigger profile err.", e);
@@ -429,12 +413,16 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
         try {
             switch (requireNonNull(opType)) {
                 case ACTIVE:
+                    success = 
agentManager.getJobManager().submitJobProfile(triggerProfile, true, false);
+                    break;
                 case ADD:
-                    success = 
agentManager.getJobManager().submitJobProfile(triggerProfile, true);
+                    success = 
agentManager.getJobManager().submitJobProfile(triggerProfile, true, true);
                     break;
                 case DEL:
+                    success = 
agentManager.getJobManager().deleteJob(triggerProfile.getTriggerId(), false);
+                    break;
                 case FROZEN:
-                    success = 
agentManager.getJobManager().deleteJob(triggerProfile.getTriggerId());
+                    success = 
agentManager.getJobManager().deleteJob(triggerProfile.getTriggerId(), true);
                     break;
                 default:
             }
@@ -458,31 +446,6 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
         return resultData.get(AGENT_MANAGER_RETURN_PARAM_IP).getAsString();
     }
 
-    /**
-     * fetch manager list, make sure it's not throwing exceptions
-     *
-     * @param isInitial is initial
-     * @param retryTime retry time
-     */
-    private void fetchTdmList(boolean isInitial, int retryTime) {
-        if (retryTime > MAX_RETRY) {
-            return;
-        }
-        try {
-            // check local cache time, make sure cache not timeout
-            if (!isInitial && !localFileCache.cacheIsExpired()) {
-                String result = localFileCache.getCacheInfo();
-                managerList = Arrays.stream(result.split(","))
-                        .map(String::trim)
-                        .collect(Collectors.toList());
-            } else {
-                requestTdmList();
-            }
-        } catch (Exception ex) {
-            fetchTdmList(false, retryTime + 1);
-        }
-    }
-
     /**
      * thread for profile fetcher.
      *
@@ -498,10 +461,6 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
                     
TimeUnit.SECONDS.sleep(AgentUtils.getRandomBySeed(configSleepTime));
                     // fetch commands from manager
                     fetchCommand();
-
-                    // fetch manager list from vip
-                    fetchTdmList(false, 0);
-
                     // fetch db collector task from manager
                     fetchDbCollectTask();
                 } catch (Throwable ex) {
@@ -527,7 +486,6 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
         // when agent start, check local ip and fetch manager ip list;
         localIp = fetchLocalIp();
         uuid = fetchLocalUuid();
-        fetchTdmList(true, 0);
         submitWorker(profileFetchThread());
     }
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 090ab2c33..04f51f96f 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -17,6 +17,16 @@
 
 package org.apache.inlong.agent.plugin.sinks;
 
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
@@ -31,15 +41,6 @@ import org.apache.inlong.agent.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
-
 /**
  * sink message data to inlong-dataproxy
  */
@@ -60,36 +61,58 @@ public class ProxySink extends AbstractSink {
 
     @Override
     public void write(Message message) {
+        boolean suc = false;
+        while (!suc) {
+            suc = putInCache(message);
+            if (!suc) {
+                AgentUtils.silenceSleepInMs(batchFlushInterval);
+            }
+        }
+    }
+
+    private boolean putInCache(Message message) {
         try {
-            if (message != null) {
+            if (message == null) {
+                return true;
+            }
+            message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, 
inlongGroupId);
+            message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, 
inlongStreamId);
+            extractStreamFromMessage(message, fieldSplitter);
+            if (message instanceof EndMessage) {
+                // increment the count of failed sinks
+                sinkMetric.sinkFailCount.incrementAndGet();
+                return true;
+            }
+            AtomicBoolean suc = new AtomicBoolean(false);
+            ProxyMessage proxyMessage = new ProxyMessage(message);
+            // add proxy message to cache.
+            cache.compute(proxyMessage.getBatchKey(),
+                    (s, packProxyMessage) -> {
+                        if (packProxyMessage == null) {
+                            packProxyMessage = new 
PackProxyMessage(jobInstanceId, jobConf, inlongGroupId,
+                                    proxyMessage.getInlongStreamId());
+                            
packProxyMessage.generateExtraMap(proxyMessage.getDataKey());
+                        }
+                        // add message to package proxy
+                        
suc.set(packProxyMessage.addProxyMessage(proxyMessage));
+                        return packProxyMessage;
+                    });
+            if (suc.get()) {
+                // semaphore should be acquired only when the message was put 
in cache successfully
                 senderManager.acquireSemaphore(1);
-                message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, 
inlongGroupId);
-                message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, 
inlongStreamId);
-                extractStreamFromMessage(message, fieldSplitter);
-                if (!(message instanceof EndMessage)) {
-                    ProxyMessage proxyMessage = new ProxyMessage(message);
-                    // add proxy message to cache.
-                    cache.compute(proxyMessage.getBatchKey(),
-                            (s, packProxyMessage) -> {
-                                if (packProxyMessage == null) {
-                                    packProxyMessage = new 
PackProxyMessage(jobInstanceId, jobConf, inlongGroupId,
-                                            proxyMessage.getInlongStreamId());
-                                    
packProxyMessage.generateExtraMap(proxyMessage.getDataKey());
-                                }
-                                // add message to package proxy
-                                packProxyMessage.addProxyMessage(proxyMessage);
-                                return packProxyMessage;
-                            });
-                    // increment the count of successful sinks
-                    sinkMetric.sinkSuccessCount.incrementAndGet();
-                }
+                // increment the count of successful sinks
+                sinkMetric.sinkSuccessCount.incrementAndGet();
+            } else {
+                // increment the count of failed sinks
+                sinkMetric.sinkFailCount.incrementAndGet();
             }
+            return suc.get();
         } catch (Exception e) {
-            sinkMetric.sinkFailCount.incrementAndGet();
             LOGGER.error("write message to Proxy sink error", e);
         } catch (Throwable t) {
             ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
         }
+        return false;
     }
 
     /**
@@ -125,11 +148,12 @@ public class ProxySink extends AbstractSink {
                         }
 
                     });
-                    AgentUtils.silenceSleepInMs(batchFlushInterval);
                 } catch (Exception ex) {
                     LOGGER.error("error caught", ex);
                 } catch (Throwable t) {
                     ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
t);
+                } finally {
+                    AgentUtils.silenceSleepInMs(batchFlushInterval);
                 }
             }
         };
@@ -145,6 +169,7 @@ public class ProxySink extends AbstractSink {
         senderManager = new SenderManager(jobConf, inlongGroupId, sourceName);
         try {
             senderManager.addMessageSender();
+            senderManager.Start();
         } catch (Throwable ex) {
             LOGGER.error("error while init sender for group id {}", 
inlongGroupId);
             ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
@@ -161,6 +186,7 @@ public class ProxySink extends AbstractSink {
         }
         shutdown = true;
         executorService.shutdown();
+        senderManager.Stop();
     }
 
     /**
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
index bf4aa6f50..599a7c46b 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
@@ -234,11 +234,12 @@ public class PulsarSink extends AbstractSink {
                             }
                         }
                     });
-                    AgentUtils.silenceSleepInMs(batchFlushInterval);
                 } catch (Exception ex) {
                     LOGGER.error("error caught", ex);
                 } catch (Throwable t) {
                     ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
t);
+                } finally {
+                    AgentUtils.silenceSleepInMs(batchFlushInterval);
                 }
             }
         };
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index 032deebb3..cf2725035 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -18,8 +18,6 @@
 package org.apache.inlong.agent.plugin.sources;
 
 import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.constant.DataCollectType;
-import org.apache.inlong.agent.constant.JobConstants;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.sources.reader.file.FileReaderOperator;
 import org.apache.inlong.agent.plugin.sources.reader.file.TriggerFileReader;
@@ -29,9 +27,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.LineNumberReader;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -84,17 +79,6 @@ public class TextFileSource extends AbstractSource {
 
     private int getStartPosition(JobProfile jobConf, File file) {
         int seekPosition;
-        if (jobConf.hasKey(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE) && 
DataCollectType.INCREMENT
-                
.equalsIgnoreCase(jobConf.get(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE))) {
-            try (LineNumberReader lineNumberReader = new LineNumberReader(new 
FileReader(file.getPath()))) {
-                lineNumberReader.skip(Long.MAX_VALUE);
-                seekPosition = lineNumberReader.getLineNumber();
-                return seekPosition;
-            } catch (IOException ex) {
-                LOGGER.error("get position error, file absolute path: {}", 
file.getAbsolutePath());
-                throw new RuntimeException(ex);
-            }
-        }
         seekPosition = jobConf.getInt(file.getAbsolutePath() + 
POSITION_SUFFIX, 0);
         return seekPosition;
     }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
index 6fcbf481c..526c17d27 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
@@ -42,7 +42,9 @@ import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_INT
 public final class MonitorTextFile {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MonitorTextFile.class);
-    // monitor thread pool
+    /**
+     * monitor thread pool
+      */
     private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
             0, Integer.MAX_VALUE,
             60L, TimeUnit.SECONDS,
@@ -165,7 +167,7 @@ public final class MonitorTextFile {
         }
 
         /**
-         * reset the position and bytePositionreset the position and 
bytePosition
+         * Reset the position and bytePosition
          */
         private void resetPosition() {
             LOGGER.info("reset position {}", fileReaderOperator.file.toPath());
@@ -182,18 +184,15 @@ public final class MonitorTextFile {
         /**
          * Determine whether the inode has changed
          *
-         * @param currentFileKey
-         * @return
+         * @param currentFileKey current file key
+         * @return true if the inode changed, otherwise false
          */
         private boolean isInodeChanged(String currentFileKey) {
             if (fileReaderOperator.fileKey == null || currentFileKey == null) {
                 return false;
             }
 
-            if (fileReaderOperator.fileKey.equals(currentFileKey)) {
-                return false;
-            }
-            return true;
+            return !fileReaderOperator.fileKey.equals(currentFileKey);
         }
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
index 77c2efadc..4556a1474 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
@@ -93,18 +93,19 @@ public class MiniAgent {
     }
 
     public void submitTrigger(TriggerProfile triggerProfile) {
-        manager.getTriggerManager().submitTrigger(triggerProfile);
+        manager.getTriggerManager().submitTrigger(triggerProfile, true);
         triggerProfileCache.add(triggerProfile);
     }
 
     public void cleanupJobs() {
-        jobProfileCache.forEach(jobProfile -> 
manager.getJobManager().deleteJob(jobProfile.getInstanceId()));
+        jobProfileCache.forEach(jobProfile -> 
manager.getJobManager().deleteJob(jobProfile.getInstanceId(), false));
         jobProfileCache.clear();
     }
 
     public void cleanupTriggers() {
         triggerProfileCache
-                .forEach(triggerProfile -> 
manager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId()));
+                .forEach(triggerProfile -> 
manager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId(),
+                        false));
         triggerProfileCache.clear();
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
index fc5cd75e2..3d41ef0a4 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
@@ -163,7 +163,7 @@ public class TestTriggerManager {
         });
 
         // shutdown trigger
-        
agent.getManager().getTriggerManager().deleteTrigger(triggerProfile1.getTriggerId());
+        
agent.getManager().getTriggerManager().deleteTrigger(triggerProfile1.getTriggerId(),
 false);
         await().atMost(10, TimeUnit.SECONDS).until(() -> 
trigger.getWatchers().size() == 0);
         TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() + 
"/1.log");
     }

Reply via email to