This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/branch-1.10 by this push:
     new f4148e675f [INLONG-9457][Agent] Add task and instance heartbeat audit 
(#9458)
f4148e675f is described below

commit f4148e675f56892d2cda411d71b2937a52c59386
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Mon Dec 11 20:09:39 2023 +0800

    [INLONG-9457][Agent] Add task and instance heartbeat audit (#9458)
    
    * [INLONG-9457][Agent] Add task and instance heartbeat audit
    
    * [INLONG-9457][Agent] Add task and instance heartbeat audit
    
    (cherry picked from commit 0dac3fa34493d91fb11626dd1c295a52f55c933f)
---
 .../org/apache/inlong/agent/conf/TaskProfile.java    | 12 ++++++++++++
 .../inlong/agent/metrics/audit/AuditUtils.java       | 20 ++++++++++++--------
 .../inlong/agent/core/instance/InstanceManager.java  |  8 +++++++-
 .../inlong/agent/core/task/file/TaskManager.java     |  3 +++
 .../inlong/agent/plugin/instance/FileInstance.java   |  5 +++++
 .../plugin/task/filecollect/LogFileCollectTask.java  |  5 +++++
 inlong-manager/manager-web/sql/changes-1.10.0.sql    | 15 ++++++++++++++-
 7 files changed, 58 insertions(+), 10 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
index de863a7aa0..0da4a8d349 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java
@@ -32,6 +32,10 @@ import org.slf4j.LoggerFactory;
 import java.text.ParseException;
 import java.util.TimeZone;
 
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
 
@@ -89,6 +93,14 @@ public class TaskProfile extends AbstractConfiguration {
         set(TaskConstants.TASK_CLASS, className);
     }
 
+    public String getInlongGroupId() {
+        return get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
+    }
+
+    public String getInlongStreamId() {
+        return get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
+    }
+
     /**
      * parse json string to configuration instance.
      *
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index faaf3400c6..290d3b71bb 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -42,15 +42,19 @@ public class AuditUtils {
     public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000;
     public static final int AUDIT_ID_AGENT_READ_SUCCESS = 3;
     public static final int AUDIT_ID_AGENT_SEND_SUCCESS = 4;
-    public static final int AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME = 47;
-    public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 48;
     public static final int AUDIT_ID_AGENT_SEND_FAILED = 10004;
-    public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 10026;
-    public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM = 49;
-    public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM = 50;
-    public static final int AUDIT_ID_AGENT_ADD_INSTANCE_DB = 51;
-    public static final int AUDIT_ID_AGENT_DEL_INSTANCE_DB = 52;
-    public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 10028;
+    public static final int AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME = 30001;
+    public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 30002;
+    public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM = 30003;
+    public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM = 30004;
+    public static final int AUDIT_ID_AGENT_ADD_INSTANCE_DB = 30005;
+    public static final int AUDIT_ID_AGENT_DEL_INSTANCE_DB = 30006;
+    public static final int AUDIT_ID_AGENT_TASK_MGR_HEARTBEAT = 30007;
+    public static final int AUDIT_ID_AGENT_TASK_HEARTBEAT = 30008;
+    public static final int AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT = 30009;
+    public static final int AUDIT_ID_AGENT_INSTANCE_HEARTBEAT = 30010;
+    public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 30011;
+    public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 30014;
 
     private static boolean IS_AUDIT = true;
 
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 1ed8e86cf8..3a86f32fc2 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -62,7 +62,8 @@ public class InstanceManager extends AbstractDaemon {
     public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "3";
     // instance in db
     private final InstanceDb instanceDb;
-    TaskProfileDb taskProfileDb;
+    private TaskProfileDb taskProfileDb;
+    private TaskProfile taskFromDb;
     // task in memory
     private final ConcurrentHashMap<String, Instance> instanceMap;
     // instance profile queue.
@@ -161,6 +162,10 @@ public class InstanceManager extends AbstractDaemon {
                     cleanDbInstance();
                     dealWithActionQueue(actionQueue);
                     keepPaceWithDb();
+                    String inlongGroupId = taskFromDb.getInlongGroupId();
+                    String inlongStreamId = taskFromDb.getInlongStreamId();
+                    
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT, inlongGroupId, 
inlongStreamId,
+                            AgentUtils.getCurrentTime(), 1, 1);
                 } catch (Throwable ex) {
                     LOGGER.error("coreThread {}", ex);
                     ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
ex);
@@ -323,6 +328,7 @@ public class InstanceManager extends AbstractDaemon {
     }
 
     private void restoreFromDb() {
+        taskFromDb = taskProfileDb.getTask(taskId);
         List<InstanceProfile> profileList = instanceDb.getInstances(taskId);
         profileList.forEach((profile) -> {
             InstanceStateEnum state = profile.getState();
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
index e188e4f207..684600dbb2 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
@@ -27,6 +27,7 @@ import org.apache.inlong.agent.core.task.TaskAction;
 import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.db.RocksDbImp;
 import org.apache.inlong.agent.db.TaskProfileDb;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.file.Task;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.ThreadUtils;
@@ -197,6 +198,8 @@ public class TaskManager extends AbstractDaemon {
                     printTaskDetail();
                     dealWithConfigQueue(configQueue);
                     dealWithActionQueue(actionQueue);
+                    
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_MGR_HEARTBEAT, "", "",
+                            AgentUtils.getCurrentTime(), 1, 1);
                 } catch (Throwable ex) {
                     LOGGER.error("exception caught", ex);
                     ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
ex);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index 1785b4245f..23566acd9f 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -23,6 +23,7 @@ import org.apache.inlong.agent.core.instance.ActionType;
 import org.apache.inlong.agent.core.instance.InstanceAction;
 import org.apache.inlong.agent.core.instance.InstanceManager;
 import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Instance;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Sink;
@@ -109,6 +110,10 @@ public class FileInstance extends Instance {
                     checkFinishCount = 0;
                 }
                 AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+                String inlongGroupId = profile.getInlongGroupId();
+                String inlongStreamId = profile.getInlongStreamId();
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, 
inlongGroupId, inlongStreamId,
+                        AgentUtils.getCurrentTime(), 1, 1);
             } else {
                 sink.write(msg);
             }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
index 25f3ef456d..c506d698d0 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -27,6 +27,7 @@ import org.apache.inlong.agent.core.instance.InstanceManager;
 import org.apache.inlong.agent.core.task.TaskAction;
 import org.apache.inlong.agent.core.task.file.TaskManager;
 import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.file.Task;
 import 
org.apache.inlong.agent.plugin.task.filecollect.FileScanner.BasicFileInfo;
 import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
@@ -262,6 +263,10 @@ public class LogFileCollectTask extends Task {
             } else {
                 runForNormal();
             }
+            String inlongGroupId = taskProfile.getInlongGroupId();
+            String inlongStreamId = taskProfile.getInlongStreamId();
+            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_HEARTBEAT, 
inlongGroupId, inlongStreamId,
+                    AgentUtils.getCurrentTime(), 1, 1);
         }
         running = false;
     }
diff --git a/inlong-manager/manager-web/sql/changes-1.10.0.sql 
b/inlong-manager/manager-web/sql/changes-1.10.0.sql
index a09a83faf7..2ebdf6838a 100644
--- a/inlong-manager/manager-web/sql/changes-1.10.0.sql
+++ b/inlong-manager/manager-web/sql/changes-1.10.0.sql
@@ -34,7 +34,20 @@ VALUES ('audit_sort_mysql_binlog_input', 'MYSQL_BINLOG', 0, 
'29'),
        ('audit_sort_pulsar_input', 'PULSAR', 0, '31'),
        ('audit_sort_pulsar_output', 'PULSAR', 1, '32'),
        ('audit_sort_tube_input', 'TUBEMQ', 0, '33'),
-       ('audit_sort_tube_output', 'TUBEMQ', 1, '34');
+       ('audit_sort_tube_output', 'TUBEMQ', 1, '34'),
+       ('audit_agent_sent_failed', 'AGENT', 2, '10004'),
+       ('audit_agent_read_realtime', 'AGENT', 3, '30001'),
+       ('audit_agent_send_realtime', 'AGENT', 4, '30002'),
+       ('audit_agent_add_instance_mem', 'AGENT', 5, '30003'),
+       ('audit_agent_del_instance_mem', 'AGENT', 6, '30004'),
+       ('audit_agent_add_instance_db', 'AGENT', 7, '30005'),
+       ('audit_agent_del_instance_db', 'AGENT', 8, '30006'),
+       ('audit_agent_task_mgr_heartbeat', 'AGENT', 9, '30007'),
+       ('audit_agent_task_heartbeat', 'AGENT', 10, '30008'),
+       ('audit_agent_instance_mgr_heartbeat', 'AGENT', 11, '30009'),
+       ('audit_agent_instance_heartbeat', 'AGENT', 12, '30010'),
+       ('audit_agent_sent_failed_realtime', 'AGENT', 13, '30011'),
+       ('audit_agent_del_instance_mem_unusual', 'AGENT', 14, '30014');
 
 ALTER TABLE `operation_log`
     ADD COLUMN  `inlong_group_id`  varchar(256) DEFAULT NULL COMMENT 'Inlong 
group id';

Reply via email to