This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f01af15cc8 [INLONG-9415][Agent] Increase audit reports related to instance maintenance (#9416) f01af15cc8 is described below commit f01af15cc84a0deaf33b8cb045c4bcb72a673f43 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Tue Dec 5 14:10:41 2023 +0800 [INLONG-9415][Agent] Increase audit reports related to instance maintenance (#9416) --- .../apache/inlong/agent/conf/InstanceProfile.java | 12 +++++++++ .../inlong/agent/metrics/audit/AuditUtils.java | 5 ++++ .../agent/core/instance/InstanceManager.java | 30 +++++++++++++++++++--- .../plugin/sinks/filecollect/AbstractSink.java | 8 ++---- .../agent/plugin/sources/file/AbstractSource.java | 8 ++---- .../plugin/sources/reader/file/AbstractReader.java | 8 ++---- 6 files changed, 49 insertions(+), 22 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java index acc6444aba..06d783c953 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java @@ -32,6 +32,10 @@ import org.slf4j.LoggerFactory; import java.util.List; +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; import static org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE; import static org.apache.inlong.agent.constant.TaskConstants.JOB_MQ_ClUSTERS; import static org.apache.inlong.agent.constant.TaskConstants.JOB_MQ_TOPIC; @@ -105,6 +109,14 @@ public class InstanceProfile extends AbstractConfiguration implements Comparable return get(TaskConstants.PREDEFINE_FIELDS, ""); } + public String getInlongGroupId() { + return get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID); + } + + public String getInlongStreamId() { + return get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID); + } + @Override public boolean allRequiredKeyExist() { return hasKey(TaskConstants.FILE_UPDATE_TIME); diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java index eda8754b1d..faaf3400c6 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java @@ -46,6 +46,11 @@ public class AuditUtils { public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 48; public static final int AUDIT_ID_AGENT_SEND_FAILED = 10004; public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 10026; + public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM = 49; + public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM = 50; + public static final int AUDIT_ID_AGENT_ADD_INSTANCE_DB = 51; + public static final int AUDIT_ID_AGENT_DEL_INSTANCE_DB = 52; + public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 10028; private static boolean IS_AUDIT = true; diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index e600c5b671..1ed8e86cf8 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -26,6 +26,7 @@ import org.apache.inlong.agent.constant.CycleUnitType; import org.apache.inlong.agent.db.Db; import org.apache.inlong.agent.db.InstanceDb; import org.apache.inlong.agent.db.TaskProfileDb; +import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.Instance; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.agent.utils.DateTransUtils; @@ -218,7 +219,7 @@ public class InstanceManager extends AbstractDaemon { LOGGER.info("instance has expired, delete from db dataTime {} taskId {} instanceId {}", instanceFromDb.getSourceDataTime(), instanceFromDb.getTaskId(), instanceFromDb.getInstanceId()); - instanceDb.deleteInstance(instanceFromDb.getTaskId(), instanceFromDb.getInstanceId()); + deleteFromDb(instanceFromDb.getInstanceId()); iterator.remove(); } } @@ -347,14 +348,14 @@ public class InstanceManager extends AbstractDaemon { profile.getInstanceId()); return; } - addToDb(profile); + addToDb(profile, true); addToMemory(profile); } private void finishInstance(InstanceProfile profile) { profile.setState(InstanceStateEnum.FINISHED); profile.setModifyTime(AgentUtils.getCurrentTime()); - addToDb(profile); + addToDb(profile, false); deleteFromMemory(profile.getInstanceId()); LOGGER.info("finished instance state {} taskId {} instanceId {}", profile.getState(), profile.getTaskId(), profile.getInstanceId()); @@ -366,9 +367,14 @@ public class InstanceManager extends AbstractDaemon { } private void deleteFromDb(String instanceId) { + InstanceProfile profile = instanceDb.getInstance(taskId, instanceId); + String inlongGroupId = profile.getInlongGroupId(); + String inlongStreamId = profile.getInlongStreamId(); instanceDb.deleteInstance(taskId, instanceId); LOGGER.info("delete instance from db: taskId {} instanceId {} result {}", taskId, instanceId, instanceDb.getInstance(taskId, instanceId)); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB, inlongGroupId, inlongStreamId, + profile.getSinkDataTime(), 1, 1); } private void deleteFromMemory(String instanceId) { @@ -378,26 +384,40 @@ public class InstanceManager extends AbstractDaemon { instanceId); return; } + String inlongGroupId = instance.getProfile().getInlongGroupId(); + String inlongStreamId = instance.getProfile().getInlongStreamId(); instance.destroy(); instanceMap.remove(instanceId); LOGGER.info("delete instance from memory: taskId {} instanceId {}", taskId, instance.getInstanceId()); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM, inlongGroupId, inlongStreamId, + instance.getProfile().getSinkDataTime(), 1, 1); } - private void addToDb(InstanceProfile profile) { + private void addToDb(InstanceProfile profile, boolean addNew) { LOGGER.info("add instance to db state {} instanceId {}", profile.getState(), profile.getInstanceId()); instanceDb.storeInstance(profile); + if (addNew) { + String inlongGroupId = profile.getInlongGroupId(); + String inlongStreamId = profile.getInlongStreamId(); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_DB, inlongGroupId, inlongStreamId, + profile.getSinkDataTime(), 1, 1); + } } /** * add instance to memory, if there is a record refer to the instance id exist we need to destroy it first. */ private void addToMemory(InstanceProfile instanceProfile) { + String inlongGroupId = instanceProfile.getInlongGroupId(); + String inlongStreamId = instanceProfile.getInlongStreamId(); Instance oldInstance = instanceMap.get(instanceProfile.getInstanceId()); if (oldInstance != null) { oldInstance.destroy(); instanceMap.remove(instanceProfile.getInstanceId()); LOGGER.error("old instance {} should not exist, try stop it first", instanceProfile.getInstanceId()); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL, inlongGroupId, inlongStreamId, + instanceProfile.getSinkDataTime(), 1, 1); } LOGGER.info("instanceProfile {}", instanceProfile.toJsonStr()); try { @@ -410,6 +430,8 @@ public class InstanceManager extends AbstractDaemon { "add instance to memory instanceId {} instanceMap size {}, runningPool instance total {}, runningPool instance active {}", instance.getInstanceId(), instanceMap.size(), EXECUTOR_SERVICE.getTaskCount(), EXECUTOR_SERVICE.getActiveCount()); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_ADD_INSTANCE_MEM, inlongGroupId, inlongStreamId, + instanceProfile.getSinkDataTime(), 1, 1); } catch (Throwable t) { LOGGER.error("add instance error {}", t.getMessage()); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java index a92dd770e3..369f2a66d7 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java @@ -32,11 +32,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL; -import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID; -import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID; @@ -72,8 +68,8 @@ public abstract class AbstractSink implements Sink { public void init(InstanceProfile profile) { this.profile = profile; jobInstanceId = profile.getInstanceId(); - inlongGroupId = profile.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID); - inlongStreamId = profile.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID); + inlongGroupId = profile.getInlongGroupId(); + inlongStreamId = profile.getInlongStreamId(); cache = new ProxyMessageCache(this.profile, inlongGroupId, inlongStreamId); batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL, DEFAULT_PROXY_BATCH_FLUSH_INTERVAL); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java index 1d085424d5..5dc79377de 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java @@ -27,10 +27,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; -import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID; -import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID; @@ -48,8 +44,8 @@ public abstract class AbstractSource implements Source { @Override public void init(InstanceProfile profile) { - inlongGroupId = profile.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID); - inlongStreamId = profile.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID); + inlongGroupId = profile.getInlongGroupId(); + inlongStreamId = profile.getInlongStreamId(); // register metric this.dimensions = new HashMap<>(); dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName()); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractReader.java index a8987b8eb3..611c29945b 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/AbstractReader.java @@ -34,10 +34,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; -import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID; -import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID; @@ -59,8 +55,8 @@ public abstract class AbstractReader implements Reader { @Override public void init(InstanceProfile profile) { - inlongGroupId = profile.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID); - inlongStreamId = profile.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID); + inlongGroupId = profile.getInlongGroupId(); + inlongStreamId = profile.getInlongStreamId(); this.dimensions = new HashMap<>(); dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());