This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ef594ccf4a [INLONG-9467][Agent] Improve code exception detection to 
ensure task and instance state transitions (#9468)
ef594ccf4a is described below

commit ef594ccf4a5e2f48fbea039e5db68e1ce225516e
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Thu Dec 14 10:28:46 2023 +0800

    [INLONG-9467][Agent] Improve code exception detection to ensure task and 
instance state transitions (#9468)
---
 .../inlong/agent/metrics/audit/AuditUtils.java     |   1 +
 .../org/apache/inlong/agent/plugin/Instance.java   |   2 +-
 .../agent/core/instance/InstanceManager.java       |  25 +++--
 .../inlong/agent/core/instance/MockInstance.java   |   3 +-
 .../inlong/agent/plugin/instance/FileInstance.java |  22 +++--
 .../inlong/agent/plugin/sources/LogFileSource.java | 110 +++++++++++----------
 .../task/filecollect/LogFileCollectTask.java       |  10 +-
 7 files changed, 104 insertions(+), 69 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index 290d3b71bb..c2d946b923 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -55,6 +55,7 @@ public class AuditUtils {
     public static final int AUDIT_ID_AGENT_INSTANCE_HEARTBEAT = 30010;
     public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 30011;
     public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 30014;
+    public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30015;
 
     private static boolean IS_AUDIT = true;
 
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java
index 90bac4c94f..990d7e60b2 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java
@@ -32,7 +32,7 @@ public abstract class Instance extends AbstractStateWrapper {
      *
      * @throws IOException
      */
-    public abstract void init(Object instanceManager, InstanceProfile profile);
+    public abstract boolean init(Object instanceManager, InstanceProfile 
profile);
 
     /**
      * destroy instance.
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 3a86f32fc2..3b74cf4e48 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -429,15 +429,22 @@ public class InstanceManager extends AbstractDaemon {
         try {
             Class<?> taskClass = 
Class.forName(instanceProfile.getInstanceClass());
             Instance instance = (Instance) taskClass.newInstance();
-            instance.init(this, instanceProfile);
-            instanceMap.put(instanceProfile.getInstanceId(), instance);
-            EXECUTOR_SERVICE.submit(instance);
-            LOGGER.info(
-                    "add instance to memory instanceId {} instanceMap size {}, 
runningPool instance total {}, runningPool instance active {}",
-                    instance.getInstanceId(), instanceMap.size(), 
EXECUTOR_SERVICE.getTaskCount(),
-                    EXECUTOR_SERVICE.getActiveCount());
-            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM, 
inlongGroupId, inlongStreamId,
-                    instanceProfile.getSinkDataTime(), 1, 1);
+            boolean initSuc = instance.init(this, instanceProfile);
+            if (initSuc) {
+                instanceMap.put(instanceProfile.getInstanceId(), instance);
+                EXECUTOR_SERVICE.submit(instance);
+                LOGGER.info(
+                        "add instance to memory instanceId {} instanceMap size 
{}, runningPool instance total {}, runningPool instance active {}",
+                        instance.getInstanceId(), instanceMap.size(), 
EXECUTOR_SERVICE.getTaskCount(),
+                        EXECUTOR_SERVICE.getActiveCount());
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM, 
inlongGroupId, inlongStreamId,
+                        instanceProfile.getSinkDataTime(), 1, 1);
+            } else {
+                LOGGER.error(
+                        "add instance to memory init failed instanceId {}", 
instance.getInstanceId());
+                
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED, 
inlongGroupId, inlongStreamId,
+                        instanceProfile.getSinkDataTime(), 1, 1);
+            }
         } catch (Throwable t) {
             LOGGER.error("add instance error {}", t.getMessage());
         }
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java
index 5e9bbbab03..dc4e16bebc 100644
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java
@@ -37,11 +37,12 @@ public class MockInstance extends Instance {
     private InstanceManager instanceManager;
 
     @Override
-    public void init(Object instanceManager, InstanceProfile profile) {
+    public boolean init(Object instanceManager, InstanceProfile profile) {
         this.instanceManager = (InstanceManager) instanceManager;
         this.profile = profile;
         LOGGER.info("init called " + index);
         initTime = index.getAndAdd(1);
+        return true;
     }
 
     @Override
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index 23566acd9f..1f2a2cfc40 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -56,7 +56,7 @@ public class FileInstance extends Instance {
     private volatile int checkFinishCount = 0;
 
     @Override
-    public void init(Object srcManager, InstanceProfile srcProfile) {
+    public boolean init(Object srcManager, InstanceProfile srcProfile) {
         try {
             instanceManager = (InstanceManager) srcManager;
             profile = srcProfile;
@@ -68,11 +68,13 @@ public class FileInstance extends Instance {
             sink = (Sink) Class.forName(profile.getSinkClass()).newInstance();
             sink.init(profile);
             inited = true;
-        } catch (Throwable ex) {
+            return true;
+        } catch (Throwable e) {
+            handleSourceDeleted();
             doChangeState(State.FATAL);
-            LOGGER.error("init instance {} for task {} failed", 
profile.getInstanceId(), profile.getInstanceId(),
-                    ex);
-            ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
+            LOGGER.error("init instance {} for task {} failed", 
profile.getInstanceId(), profile.getInstanceId(), e);
+            ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
+            return false;
         }
     }
 
@@ -93,6 +95,15 @@ public class FileInstance extends Instance {
     public void run() {
         Thread.currentThread().setName("file-instance-core-" + getTaskId() + 
"-" + getInstanceId());
         running = true;
+        try {
+            doRun();
+        } catch (Throwable e) {
+            LOGGER.error("do run error: ", e);
+        }
+        running = false;
+    }
+
+    private void doRun() {
         while (!isFinished()) {
             if (!source.sourceExist()) {
                 handleSourceDeleted();
@@ -118,7 +129,6 @@ public class FileInstance extends Instance {
                 sink.write(msg);
             }
         }
-        running = false;
     }
 
     private void handleReadEnd() {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index e4de812a8a..808edb7bde 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -181,7 +181,7 @@ public class LogFileSource extends AbstractSource {
             } catch (Exception ex) {
                 LOGGER.error("init metadata error", ex);
             }
-            EXECUTOR_SERVICE.execute(coreThread());
+            EXECUTOR_SERVICE.execute(run());
         } catch (Exception ex) {
             stopRunning();
             throw new FileException("error init stream for " + file.getPath(), 
ex);
@@ -435,63 +435,71 @@ public class LogFileSource extends AbstractSource {
         return false;
     }
 
-    public Runnable coreThread() {
+    private Runnable run() {
         return () -> {
             AgentThreadFactory.nameThread("log-file-source-" + taskId + "-" + 
file);
             running = true;
-            long lastPrintTime = 0;
-            while (isRunnable() && fileExist) {
-                if (isInodeChanged()) {
-                    fileExist = false;
-                    LOGGER.info("inode changed, instance will restart and 
offset will be clean, file {}",
-                            fileName);
-                    break;
-                }
-                if (file.length() < bytePosition) {
-                    fileExist = false;
-                    LOGGER.info("file rotate, instance will restart and offset 
will be clean, file {}",
-                            fileName);
-                    break;
+            try {
+                doRun();
+            } catch (Throwable e) {
+                LOGGER.error("do run error maybe file deleted: ", e);
+            }
+            running = false;
+        };
+    }
+
+    private void doRun() {
+        long lastPrintTime = 0;
+        while (isRunnable() && fileExist) {
+            if (isInodeChanged()) {
+                fileExist = false;
+                LOGGER.info("inode changed, instance will restart and offset 
will be clean, file {}",
+                        fileName);
+                break;
+            }
+            if (file.length() < bytePosition) {
+                fileExist = false;
+                LOGGER.info("file rotate, instance will restart and offset 
will be clean, file {}",
+                        fileName);
+                break;
+            }
+            boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
+            if (!suc) {
+                break;
+            }
+            List<SourceData> lines = null;
+            try {
+                lines = readFromPos(bytePosition);
+            } catch (FileNotFoundException e) {
+                fileExist = false;
+                LOGGER.error("readFromPos file deleted error: ", e);
+            } catch (IOException e) {
+                LOGGER.error("readFromPos error: ", e);
+            }
+            
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
+            if (lines.isEmpty()) {
+                if (queue.isEmpty()) {
+                    emptyCount++;
+                } else {
+                    emptyCount = 0;
                 }
-                boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
-                if (!suc) {
+                AgentUtils.silenceSleepInSeconds(1);
+                continue;
+            }
+            emptyCount = 0;
+            for (int i = 0; i < lines.size(); i++) {
+                boolean suc4Queue = 
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).data.length());
+                if (!suc4Queue) {
                     break;
                 }
-                List<SourceData> lines = null;
-                try {
-                    lines = readFromPos(bytePosition);
-                } catch (FileNotFoundException e) {
-                    fileExist = false;
-                    LOGGER.error("readFromPos file deleted {}", 
e.getMessage());
-                } catch (IOException e) {
-                    LOGGER.error("readFromPos error {}", e.getMessage());
-                }
-                
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
-                if (lines.isEmpty()) {
-                    if (queue.isEmpty()) {
-                        emptyCount++;
-                    } else {
-                        emptyCount = 0;
-                    }
-                    AgentUtils.silenceSleepInSeconds(1);
-                    continue;
-                }
-                emptyCount = 0;
-                for (int i = 0; i < lines.size(); i++) {
-                    boolean suc4Queue = 
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).data.length());
-                    if (!suc4Queue) {
-                        break;
-                    }
-                    putIntoQueue(lines.get(i));
-                }
-                if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_INTERVAL_MS) {
-                    lastPrintTime = AgentUtils.getCurrentTime();
-                    LOGGER.info("path is {}, linePosition {}, bytePosition is 
{} file len {}, reads lines size {}",
-                            file.getName(), linePosition, bytePosition, 
file.length(), lines.size());
-                }
+                putIntoQueue(lines.get(i));
             }
-            running = false;
-        };
+            if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_INTERVAL_MS) {
+                lastPrintTime = AgentUtils.getCurrentTime();
+                LOGGER.info("path is {}, linePosition {}, bytePosition is {} 
file len {}, reads lines size {}",
+                        file.getName(), linePosition, bytePosition, 
file.length(), lines.size());
+            }
+        }
     }
 
     private void putIntoQueue(SourceData sourceData) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
index c506d698d0..52eba9ea80 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -248,6 +248,15 @@ public class LogFileCollectTask extends Task {
     public void run() {
         Thread.currentThread().setName("directory-task-core-" + getTaskId());
         running = true;
+        try {
+            doRun();
+        } catch (Throwable e) {
+            LOGGER.error("do run error: ", e);
+        }
+        running = false;
+    }
+
+    private void doRun() {
         while (!isFinished()) {
             if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_TIME) {
                 LOGGER.info("log file task running! taskId {}", getTaskId());
@@ -268,7 +277,6 @@ public class LogFileCollectTask extends Task {
             AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, 
inlongGroupId, inlongStreamId,
                     AgentUtils.getCurrentTime(), 1, 1);
         }
-        running = false;
     }
 
     private void runForRetry() {

Reply via email to