This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch branch-1.10 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/branch-1.10 by this push: new f4148e675f [INLONG-9457][Agent] Add task and instance heartbeat audit (#9458) f4148e675f is described below commit f4148e675f56892d2cda411d71b2937a52c59386 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Mon Dec 11 20:09:39 2023 +0800 [INLONG-9457][Agent] Add task and instance heartbeat audit (#9458) * [INLONG-9457][Agent] Add task and instance heartbeat audit * [INLONG-9457][Agent] Add task and instance heartbeat audit (cherry picked from commit 0dac3fa34493d91fb11626dd1c295a52f55c933f) --- .../org/apache/inlong/agent/conf/TaskProfile.java | 12 ++++++++++++ .../inlong/agent/metrics/audit/AuditUtils.java | 20 ++++++++++++-------- .../inlong/agent/core/instance/InstanceManager.java | 8 +++++++- .../inlong/agent/core/task/file/TaskManager.java | 3 +++ .../inlong/agent/plugin/instance/FileInstance.java | 5 +++++ .../plugin/task/filecollect/LogFileCollectTask.java | 5 +++++ inlong-manager/manager-web/sql/changes-1.10.0.sql | 15 ++++++++++++++- 7 files changed, 58 insertions(+), 10 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java index de863a7aa0..0da4a8d349 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java @@ -32,6 +32,10 @@ import org.slf4j.LoggerFactory; import java.text.ParseException; import java.util.TimeZone; +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY; import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE; @@ -89,6 +93,14 @@ public class TaskProfile extends AbstractConfiguration { set(TaskConstants.TASK_CLASS, className); } + public String getInlongGroupId() { + return get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID); + } + + public String getInlongStreamId() { + return get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID); + } + /** * parse json string to configuration instance. * diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java index faaf3400c6..290d3b71bb 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java @@ -42,15 +42,19 @@ public class AuditUtils { public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000; public static final int AUDIT_ID_AGENT_READ_SUCCESS = 3; public static final int AUDIT_ID_AGENT_SEND_SUCCESS = 4; - public static final int AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME = 47; - public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 48; public static final int AUDIT_ID_AGENT_SEND_FAILED = 10004; - public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 10026; - public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM = 49; - public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM = 50; - public static final int AUDIT_ID_AGENT_ADD_INSTANCE_DB = 51; - public static final int AUDIT_ID_AGENT_DEL_INSTANCE_DB = 52; - public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 10028; + public static final int AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME = 30001; + public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 30002; + public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM = 30003; + public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM = 30004; + public static final int AUDIT_ID_AGENT_ADD_INSTANCE_DB = 30005; + public static final int AUDIT_ID_AGENT_DEL_INSTANCE_DB = 30006; + public static final int AUDIT_ID_AGENT_TASK_MGR_HEARTBEAT = 30007; + public static final int AUDIT_ID_AGENT_TASK_HEARTBEAT = 30008; + public static final int AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT = 30009; + public static final int AUDIT_ID_AGENT_INSTANCE_HEARTBEAT = 30010; + public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 30011; + public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 30014; private static boolean IS_AUDIT = true; diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index 1ed8e86cf8..3a86f32fc2 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -62,7 +62,8 @@ public class InstanceManager extends AbstractDaemon { public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3"; // instance in db private final InstanceDb instanceDb; - TaskProfileDb taskProfileDb; + private TaskProfileDb taskProfileDb; + private TaskProfile taskFromDb; // task in memory private final ConcurrentHashMap<String, Instance> instanceMap; // instance profile queue. @@ -161,6 +162,10 @@ public class InstanceManager extends AbstractDaemon { cleanDbInstance(); dealWithActionQueue(actionQueue); keepPaceWithDb(); + String inlongGroupId = taskFromDb.getInlongGroupId(); + String inlongStreamId = taskFromDb.getInlongStreamId(); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT, inlongGroupId, inlongStreamId, + AgentUtils.getCurrentTime(), 1, 1); } catch (Throwable ex) { LOGGER.error("coreThread {}", ex); ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); @@ -323,6 +328,7 @@ public class InstanceManager extends AbstractDaemon { } private void restoreFromDb() { + taskFromDb = taskProfileDb.getTask(taskId); List<InstanceProfile> profileList = instanceDb.getInstances(taskId); profileList.forEach((profile) -> { InstanceStateEnum state = profile.getState(); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java index e188e4f207..684600dbb2 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java @@ -27,6 +27,7 @@ import org.apache.inlong.agent.core.task.TaskAction; import org.apache.inlong.agent.db.Db; import org.apache.inlong.agent.db.RocksDbImp; import org.apache.inlong.agent.db.TaskProfileDb; +import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.file.Task; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.agent.utils.ThreadUtils; @@ -197,6 +198,8 @@ public class TaskManager extends AbstractDaemon { printTaskDetail(); dealWithConfigQueue(configQueue); dealWithActionQueue(actionQueue); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_MGR_HEARTBEAT, "", "", + AgentUtils.getCurrentTime(), 1, 1); } catch (Throwable ex) { LOGGER.error("exception caught", ex); ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java index 1785b4245f..23566acd9f 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java @@ -23,6 +23,7 @@ import org.apache.inlong.agent.core.instance.ActionType; import org.apache.inlong.agent.core.instance.InstanceAction; import org.apache.inlong.agent.core.instance.InstanceManager; import org.apache.inlong.agent.core.task.OffsetManager; +import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.Instance; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.file.Sink; @@ -109,6 +110,10 @@ public class FileInstance extends Instance { checkFinishCount = 0; } AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME); + String inlongGroupId = profile.getInlongGroupId(); + String inlongStreamId = profile.getInlongStreamId(); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, inlongGroupId, inlongStreamId, + AgentUtils.getCurrentTime(), 1, 1); } else { sink.write(msg); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java index 25f3ef456d..c506d698d0 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java @@ -27,6 +27,7 @@ import org.apache.inlong.agent.core.instance.InstanceManager; import org.apache.inlong.agent.core.task.TaskAction; import org.apache.inlong.agent.core.task.file.TaskManager; import org.apache.inlong.agent.db.Db; +import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.file.Task; import org.apache.inlong.agent.plugin.task.filecollect.FileScanner.BasicFileInfo; import org.apache.inlong.agent.plugin.utils.file.FilePathUtil; @@ -262,6 +263,10 @@ public class LogFileCollectTask extends Task { } else { runForNormal(); } + String inlongGroupId = taskProfile.getInlongGroupId(); + String inlongStreamId = taskProfile.getInlongStreamId(); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, inlongGroupId, inlongStreamId, + AgentUtils.getCurrentTime(), 1, 1); } running = false; } diff --git a/inlong-manager/manager-web/sql/changes-1.10.0.sql b/inlong-manager/manager-web/sql/changes-1.10.0.sql index a09a83faf7..2ebdf6838a 100644 --- a/inlong-manager/manager-web/sql/changes-1.10.0.sql +++ b/inlong-manager/manager-web/sql/changes-1.10.0.sql @@ -34,7 +34,20 @@ VALUES ('audit_sort_mysql_binlog_input', 'MYSQL_BINLOG', 0, '29'), ('audit_sort_pulsar_input', 'PULSAR', 0, '31'), ('audit_sort_pulsar_output', 'PULSAR', 1, '32'), ('audit_sort_tube_input', 'TUBEMQ', 0, '33'), - ('audit_sort_tube_output', 'TUBEMQ', 1, '34'); + ('audit_sort_tube_output', 'TUBEMQ', 1, '34'), + ('audit_agent_sent_failed', 'AGENT', 2, '10004'), + ('audit_agent_read_realtime', 'AGENT', 3, '30001'), + ('audit_agent_send_realtime', 'AGENT', 4, '30002'), + ('audit_agent_add_instance_mem', 'AGENT', 5, '30003'), + ('audit_agent_del_instance_mem', 'AGENT', 6, '30004'), + ('audit_agent_add_instance_db', 'AGENT', 7, '30005'), + ('audit_agent_del_instance_db', 'AGENT', 8, '30006'), + ('audit_agent_task_mgr_heartbeat', 'AGENT', 9, '30007'), + ('audit_agent_task_heartbeat', 'AGENT', 10, '30008'), + ('audit_agent_instance_mgr_heartbeat', 'AGENT', 11, '30009'), + ('audit_agent_instance_heartbeat', 'AGENT', 12, '30010'), + ('audit_agent_sent_failed_realtime', 'AGENT', 13, '30011'), + ('audit_agent_del_instance_mem_unusual', 'AGENT', 14, '30014'); ALTER TABLE `operation_log` ADD COLUMN `inlong_group_id` varchar(256) DEFAULT NULL COMMENT 'Inlong group id';