This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ef594ccf4a [INLONG-9467][Agent] Improve code exception detection to ensure task and instance state transitions (#9468) ef594ccf4a is described below commit ef594ccf4a5e2f48fbea039e5db68e1ce225516e Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Thu Dec 14 10:28:46 2023 +0800 [INLONG-9467][Agent] Improve code exception detection to ensure task and instance state transitions (#9468) --- .../inlong/agent/metrics/audit/AuditUtils.java | 1 + .../org/apache/inlong/agent/plugin/Instance.java | 2 +- .../agent/core/instance/InstanceManager.java | 25 +++-- .../inlong/agent/core/instance/MockInstance.java | 3 +- .../inlong/agent/plugin/instance/FileInstance.java | 22 +++-- .../inlong/agent/plugin/sources/LogFileSource.java | 110 +++++++++++---------- .../task/filecollect/LogFileCollectTask.java | 10 +- 7 files changed, 104 insertions(+), 69 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java index 290d3b71bb..c2d946b923 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java @@ -55,6 +55,7 @@ public class AuditUtils { public static final int AUDIT_ID_AGENT_INSTANCE_HEARTBEAT = 30010; public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 30011; public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 30014; + public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30015; private static boolean IS_AUDIT = true; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java index 90bac4c94f..990d7e60b2 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/Instance.java @@ -32,7 +32,7 @@ public abstract class Instance extends AbstractStateWrapper { * * @throws IOException */ - public abstract void init(Object instanceManager, InstanceProfile profile); + public abstract boolean init(Object instanceManager, InstanceProfile profile); /** * destroy instance. diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index 3a86f32fc2..3b74cf4e48 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -429,15 +429,22 @@ public class InstanceManager extends AbstractDaemon { try { Class<?> taskClass = Class.forName(instanceProfile.getInstanceClass()); Instance instance = (Instance) taskClass.newInstance(); - instance.init(this, instanceProfile); - instanceMap.put(instanceProfile.getInstanceId(), instance); - EXECUTOR_SERVICE.submit(instance); - LOGGER.info( - "add instance to memory instanceId {} instanceMap size {}, runningPool instance total {}, runningPool instance active {}", - instance.getInstanceId(), instanceMap.size(), EXECUTOR_SERVICE.getTaskCount(), - EXECUTOR_SERVICE.getActiveCount()); - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM, inlongGroupId, inlongStreamId, - instanceProfile.getSinkDataTime(), 1, 1); + boolean initSuc = instance.init(this, instanceProfile); + if (initSuc) { + instanceMap.put(instanceProfile.getInstanceId(), instance); + EXECUTOR_SERVICE.submit(instance); + LOGGER.info( + "add instance to memory instanceId {} instanceMap size {}, runningPool instance total {}, runningPool instance active {}", + instance.getInstanceId(), instanceMap.size(), EXECUTOR_SERVICE.getTaskCount(), + EXECUTOR_SERVICE.getActiveCount()); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM, inlongGroupId, inlongStreamId, + instanceProfile.getSinkDataTime(), 1, 1); + } else { + LOGGER.error( + "add instance to memory init failed instanceId {}", instance.getInstanceId()); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED, inlongGroupId, inlongStreamId, + instanceProfile.getSinkDataTime(), 1, 1); + } } catch (Throwable t) { LOGGER.error("add instance error {}", t.getMessage()); } diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java index 5e9bbbab03..dc4e16bebc 100644 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/MockInstance.java @@ -37,11 +37,12 @@ public class MockInstance extends Instance { private InstanceManager instanceManager; @Override - public void init(Object instanceManager, InstanceProfile profile) { + public boolean init(Object instanceManager, InstanceProfile profile) { this.instanceManager = (InstanceManager) instanceManager; this.profile = profile; LOGGER.info("init called " + index); initTime = index.getAndAdd(1); + return true; } @Override diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java index 23566acd9f..1f2a2cfc40 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java @@ -56,7 +56,7 @@ public class FileInstance extends Instance { private volatile int checkFinishCount = 0; @Override - public void init(Object srcManager, InstanceProfile srcProfile) { + public boolean init(Object srcManager, InstanceProfile srcProfile) { try { instanceManager = (InstanceManager) srcManager; profile = srcProfile; @@ -68,11 +68,13 @@ public class FileInstance extends Instance { sink = (Sink) Class.forName(profile.getSinkClass()).newInstance(); sink.init(profile); inited = true; - } catch (Throwable ex) { + return true; + } catch (Throwable e) { + handleSourceDeleted(); doChangeState(State.FATAL); - LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(), - ex); - ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); + LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(), e); + ThreadUtils.threadThrowableHandler(Thread.currentThread(), e); + return false; } } @@ -93,6 +95,15 @@ public class FileInstance extends Instance { public void run() { Thread.currentThread().setName("file-instance-core-" + getTaskId() + "-" + getInstanceId()); running = true; + try { + doRun(); + } catch (Throwable e) { + LOGGER.error("do run error: ", e); + } + running = false; + } + + private void doRun() { while (!isFinished()) { if (!source.sourceExist()) { handleSourceDeleted(); @@ -118,7 +129,6 @@ public class FileInstance extends Instance { sink.write(msg); } } - running = false; } private void handleReadEnd() { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index e4de812a8a..808edb7bde 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -181,7 +181,7 @@ public class LogFileSource extends AbstractSource { } catch (Exception ex) { LOGGER.error("init metadata error", ex); } - EXECUTOR_SERVICE.execute(coreThread()); + EXECUTOR_SERVICE.execute(run()); } catch (Exception ex) { stopRunning(); throw new FileException("error init stream for " + file.getPath(), ex); @@ -435,63 +435,71 @@ public class LogFileSource extends AbstractSource { return false; } - public Runnable coreThread() { + private Runnable run() { return () -> { AgentThreadFactory.nameThread("log-file-source-" + taskId + "-" + file); running = true; - long lastPrintTime = 0; - while (isRunnable() && fileExist) { - if (isInodeChanged()) { - fileExist = false; - LOGGER.info("inode changed, instance will restart and offset will be clean, file {}", - fileName); - break; - } - if (file.length() < bytePosition) { - fileExist = false; - LOGGER.info("file rotate, instance will restart and offset will be clean, file {}", - fileName); - break; + try { + doRun(); + } catch (Throwable e) { + LOGGER.error("do run error maybe file deleted: ", e); + } + running = false; + }; + } + + private void doRun() { + long lastPrintTime = 0; + while (isRunnable() && fileExist) { + if (isInodeChanged()) { + fileExist = false; + LOGGER.info("inode changed, instance will restart and offset will be clean, file {}", + fileName); + break; + } + if (file.length() < bytePosition) { + fileExist = false; + LOGGER.info("file rotate, instance will restart and offset will be clean, file {}", + fileName); + break; + } + boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); + if (!suc) { + break; + } + List<SourceData> lines = null; + try { + lines = readFromPos(bytePosition); + } catch (FileNotFoundException e) { + fileExist = false; + LOGGER.error("readFromPos file deleted error: ", e); + } catch (IOException e) { + LOGGER.error("readFromPos error: ", e); + } + MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); + if (lines.isEmpty()) { + if (queue.isEmpty()) { + emptyCount++; + } else { + emptyCount = 0; } - boolean suc = waitForPermit(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); - if (!suc) { + AgentUtils.silenceSleepInSeconds(1); + continue; + } + emptyCount = 0; + for (int i = 0; i < lines.size(); i++) { + boolean suc4Queue = waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).data.length()); + if (!suc4Queue) { break; } - List<SourceData> lines = null; - try { - lines = readFromPos(bytePosition); - } catch (FileNotFoundException e) { - fileExist = false; - LOGGER.error("readFromPos file deleted {}", e.getMessage()); - } catch (IOException e) { - LOGGER.error("readFromPos error {}", e.getMessage()); - } - MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); - if (lines.isEmpty()) { - if (queue.isEmpty()) { - emptyCount++; - } else { - emptyCount = 0; - } - AgentUtils.silenceSleepInSeconds(1); - continue; - } - emptyCount = 0; - for (int i = 0; i < lines.size(); i++) { - boolean suc4Queue = waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).data.length()); - if (!suc4Queue) { - break; - } - putIntoQueue(lines.get(i)); - } - if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) { - lastPrintTime = AgentUtils.getCurrentTime(); - LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len {}, reads lines size {}", - file.getName(), linePosition, bytePosition, file.length(), lines.size()); - } + putIntoQueue(lines.get(i)); } - running = false; - }; + if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) { + lastPrintTime = AgentUtils.getCurrentTime(); + LOGGER.info("path is {}, linePosition {}, bytePosition is {} file len {}, reads lines size {}", + file.getName(), linePosition, bytePosition, file.length(), lines.size()); + } + } } private void putIntoQueue(SourceData sourceData) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java index c506d698d0..52eba9ea80 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java @@ -248,6 +248,15 @@ public class LogFileCollectTask extends Task { public void run() { Thread.currentThread().setName("directory-task-core-" + getTaskId()); running = true; + try { + doRun(); + } catch (Throwable e) { + LOGGER.error("do run error: ", e); + } + running = false; + } + + private void doRun() { while (!isFinished()) { if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_TIME) { LOGGER.info("log file task running! taskId {}", getTaskId()); @@ -268,7 +277,6 @@ public class LogFileCollectTask extends Task { AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, inlongGroupId, inlongStreamId, AgentUtils.getCurrentTime(), 1, 1); } - running = false; } private void runForRetry() {