This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f01af15cc8 [INLONG-9415][Agent] Increase audit reports related to 
instance maintenance (#9416)
f01af15cc8 is described below

commit f01af15cc84a0deaf33b8cb045c4bcb72a673f43
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Tue Dec 5 14:10:41 2023 +0800

    [INLONG-9415][Agent] Increase audit reports related to instance maintenance 
(#9416)
---
 .../apache/inlong/agent/conf/InstanceProfile.java  | 12 +++++++++
 .../inlong/agent/metrics/audit/AuditUtils.java     |  5 ++++
 .../agent/core/instance/InstanceManager.java       | 30 +++++++++++++++++++---
 .../plugin/sinks/filecollect/AbstractSink.java     |  8 ++----
 .../agent/plugin/sources/file/AbstractSource.java  |  8 ++----
 .../plugin/sources/reader/file/AbstractReader.java |  8 ++----
 6 files changed, 49 insertions(+), 22 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
index acc6444aba..06d783c953 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java
@@ -32,6 +32,10 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE;
 import static org.apache.inlong.agent.constant.TaskConstants.JOB_MQ_ClUSTERS;
 import static org.apache.inlong.agent.constant.TaskConstants.JOB_MQ_TOPIC;
@@ -105,6 +109,14 @@ public class InstanceProfile extends AbstractConfiguration 
implements Comparable
         return get(TaskConstants.PREDEFINE_FIELDS, "");
     }
 
+    public String getInlongGroupId() {
+        return get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
+    }
+
+    public String getInlongStreamId() {
+        return get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
+    }
+
     @Override
     public boolean allRequiredKeyExist() {
         return hasKey(TaskConstants.FILE_UPDATE_TIME);
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index eda8754b1d..faaf3400c6 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -46,6 +46,11 @@ public class AuditUtils {
     public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 48;
     public static final int AUDIT_ID_AGENT_SEND_FAILED = 10004;
     public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 10026;
+    public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM = 49;
+    public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM = 50;
+    public static final int AUDIT_ID_AGENT_ADD_INSTANCE_DB = 51;
+    public static final int AUDIT_ID_AGENT_DEL_INSTANCE_DB = 52;
+    public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 10028;
 
     private static boolean IS_AUDIT = true;
 
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index e600c5b671..1ed8e86cf8 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -26,6 +26,7 @@ import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.db.InstanceDb;
 import org.apache.inlong.agent.db.TaskProfileDb;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Instance;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.DateTransUtils;
@@ -218,7 +219,7 @@ public class InstanceManager extends AbstractDaemon {
                 LOGGER.info("instance has expired, delete from db dataTime {} 
taskId {} instanceId {}",
                         instanceFromDb.getSourceDataTime(), 
instanceFromDb.getTaskId(),
                         instanceFromDb.getInstanceId());
-                instanceDb.deleteInstance(instanceFromDb.getTaskId(), 
instanceFromDb.getInstanceId());
+                deleteFromDb(instanceFromDb.getInstanceId());
                 iterator.remove();
             }
         }
@@ -347,14 +348,14 @@ public class InstanceManager extends AbstractDaemon {
                     profile.getInstanceId());
             return;
         }
-        addToDb(profile);
+        addToDb(profile, true);
         addToMemory(profile);
     }
 
     private void finishInstance(InstanceProfile profile) {
         profile.setState(InstanceStateEnum.FINISHED);
         profile.setModifyTime(AgentUtils.getCurrentTime());
-        addToDb(profile);
+        addToDb(profile, false);
         deleteFromMemory(profile.getInstanceId());
         LOGGER.info("finished instance state {} taskId {} instanceId {}", 
profile.getState(),
                 profile.getTaskId(), profile.getInstanceId());
@@ -366,9 +367,14 @@ public class InstanceManager extends AbstractDaemon {
     }
 
     private void deleteFromDb(String instanceId) {
+        InstanceProfile profile = instanceDb.getInstance(taskId, instanceId);
+        String inlongGroupId = profile.getInlongGroupId();
+        String inlongStreamId = profile.getInlongStreamId();
         instanceDb.deleteInstance(taskId, instanceId);
         LOGGER.info("delete instance from db: taskId {} instanceId {} result 
{}", taskId,
                 instanceId, instanceDb.getInstance(taskId, instanceId));
+        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB, 
inlongGroupId, inlongStreamId,
+                profile.getSinkDataTime(), 1, 1);
     }
 
     private void deleteFromMemory(String instanceId) {
@@ -378,26 +384,40 @@ public class InstanceManager extends AbstractDaemon {
                     instanceId);
             return;
         }
+        String inlongGroupId = instance.getProfile().getInlongGroupId();
+        String inlongStreamId = instance.getProfile().getInlongStreamId();
         instance.destroy();
         instanceMap.remove(instanceId);
         LOGGER.info("delete instance from memory: taskId {} instanceId {}", 
taskId, instance.getInstanceId());
+        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM, 
inlongGroupId, inlongStreamId,
+                instance.getProfile().getSinkDataTime(), 1, 1);
     }
 
-    private void addToDb(InstanceProfile profile) {
+    private void addToDb(InstanceProfile profile, boolean addNew) {
         LOGGER.info("add instance to db state {} instanceId {}", 
profile.getState(), profile.getInstanceId());
         instanceDb.storeInstance(profile);
+        if (addNew) {
+            String inlongGroupId = profile.getInlongGroupId();
+            String inlongStreamId = profile.getInlongStreamId();
+            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_DB, 
inlongGroupId, inlongStreamId,
+                    profile.getSinkDataTime(), 1, 1);
+        }
     }
 
     /**
      * add instance to memory, if there is a record refer to the instance id 
exist we need to destroy it first.
      */
     private void addToMemory(InstanceProfile instanceProfile) {
+        String inlongGroupId = instanceProfile.getInlongGroupId();
+        String inlongStreamId = instanceProfile.getInlongStreamId();
         Instance oldInstance = 
instanceMap.get(instanceProfile.getInstanceId());
         if (oldInstance != null) {
             oldInstance.destroy();
             instanceMap.remove(instanceProfile.getInstanceId());
             LOGGER.error("old instance {} should not exist, try stop it first",
                     instanceProfile.getInstanceId());
+            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL, 
inlongGroupId, inlongStreamId,
+                    instanceProfile.getSinkDataTime(), 1, 1);
         }
         LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr());
         try {
@@ -410,6 +430,8 @@ public class InstanceManager extends AbstractDaemon {
                     "add instance to memory instanceId {} instanceMap size {}, 
runningPool instance total {}, runningPool instance active {}",
                     instance.getInstanceId(), instanceMap.size(), 
EXECUTOR_SERVICE.getTaskCount(),
                     EXECUTOR_SERVICE.getActiveCount());
+            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM, 
inlongGroupId, inlongStreamId,
+                    instanceProfile.getSinkDataTime(), 1, 1);
         } catch (Throwable t) {
             LOGGER.error("add instance error {}", t.getMessage());
         }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java
index a92dd770e3..369f2a66d7 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java
@@ -32,11 +32,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
-import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
-import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
@@ -72,8 +68,8 @@ public abstract class AbstractSink implements Sink {
     public void init(InstanceProfile profile) {
         this.profile = profile;
         jobInstanceId = profile.getInstanceId();
-        inlongGroupId = profile.get(PROXY_INLONG_GROUP_ID, 
DEFAULT_PROXY_INLONG_GROUP_ID);
-        inlongStreamId = profile.get(PROXY_INLONG_STREAM_ID, 
DEFAULT_PROXY_INLONG_STREAM_ID);
+        inlongGroupId = profile.getInlongGroupId();
+        inlongStreamId = profile.getInlongStreamId();
         cache = new ProxyMessageCache(this.profile, inlongGroupId, 
inlongStreamId);
         batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL, 
DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index 1d085424d5..5dc79377de 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -27,10 +27,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
-import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
@@ -48,8 +44,8 @@ public abstract class AbstractSource implements Source {
 
     @Override
     public void init(InstanceProfile profile) {
-        inlongGroupId = profile.get(PROXY_INLONG_GROUP_ID, 
DEFAULT_PROXY_INLONG_GROUP_ID);
-        inlongStreamId = profile.get(PROXY_INLONG_STREAM_ID, 
DEFAULT_PROXY_INLONG_STREAM_ID);
+        inlongGroupId = profile.getInlongGroupId();
+        inlongStreamId = profile.getInlongStreamId();
         // register metric
         this.dimensions = new HashMap<>();
         dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractReader.java
index a8987b8eb3..611c29945b 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractReader.java
@@ -34,10 +34,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
-import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
@@ -59,8 +55,8 @@ public abstract class AbstractReader implements Reader {
 
     @Override
     public void init(InstanceProfile profile) {
-        inlongGroupId = profile.get(PROXY_INLONG_GROUP_ID, 
DEFAULT_PROXY_INLONG_GROUP_ID);
-        inlongStreamId = profile.get(PROXY_INLONG_STREAM_ID, 
DEFAULT_PROXY_INLONG_STREAM_ID);
+        inlongGroupId = profile.getInlongGroupId();
+        inlongStreamId = profile.getInlongStreamId();
 
         this.dimensions = new HashMap<>();
         dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());

Reply via email to