This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 15db28f8d7 [INLONG-9347][Agent] Check task profile before save into db (#9348) 15db28f8d7 is described below commit 15db28f8d7c4b030cb8d5a4f4065b14bc7e7ba8c Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Tue Nov 28 20:09:26 2023 +0800 [INLONG-9347][Agent] Check task profile before save into db (#9348) --- .../java/org/apache/inlong/agent/plugin/file/Task.java | 5 +++++ .../apache/inlong/agent/core/task/file/TaskManager.java | 15 +++++++++++++++ .../java/org/apache/inlong/agent/core/task/MockTask.java | 5 +++++ .../org/apache/inlong/agent/plugin/task/CronTask.java | 5 +++++ .../agent/plugin/task/filecollect/LogFileCollectTask.java | 7 ++----- 5 files changed, 32 insertions(+), 5 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java index ce580a0bb7..e10d872ca3 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java @@ -49,4 +49,9 @@ public abstract class Task extends AbstractStateWrapper { * get task id */ public abstract String getTaskId(); + + /** + * is profile valid + */ + public abstract boolean isProfileValid(TaskProfile profile); } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java index f97a89f26c..4d8bc9fea6 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java @@ -369,6 +369,10 @@ public class TaskManager extends AbstractDaemon { LOGGER.error("taskMap size {} over limit {}", taskMap.size(), taskMaxLimit); return; } + if (!isProfileValid(taskProfile)) { + LOGGER.error("task profile invalid {}", taskProfile.toJsonStr()); + return; + } addToDb(taskProfile); TaskStateEnum state = TaskStateEnum.getTaskState(taskProfile.getInt(TASK_STATE)); if (state == TaskStateEnum.RUNNING) { @@ -417,6 +421,17 @@ public class TaskManager extends AbstractDaemon { taskMap.clear(); } + private boolean isProfileValid(TaskProfile profile) { + try { + Class<?> taskClass = Class.forName(profile.getTaskClass()); + Task task = (Task) taskClass.newInstance(); + return task.isProfileValid(profile); + } catch (Throwable t) { + LOGGER.error("isProfileValid error: ", t); + } + return false; + } + /** * add task to db, it was expected that there is no record refer the task id. * cause the task id will change if the task content changes, replace the record diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java index 23c5ad5cc5..88860c831d 100644 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java @@ -57,6 +57,11 @@ public class MockTask extends Task { return profile.getTaskId(); } + @Override + public boolean isProfileValid(TaskProfile profile) { + return true; + } + @Override public void addCallbacks() { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java index 0216eb96c6..fad0f98f2a 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/CronTask.java @@ -51,6 +51,11 @@ public class CronTask extends Task { return null; } + @Override + public boolean isProfileValid(TaskProfile profile) { + return true; + } + @Override public void addCallbacks() { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java index 9dc7d26c11..355bf3909f 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java @@ -97,10 +97,6 @@ public class LogFileCollectTask extends Task { @Override public void init(Object srcManager, TaskProfile taskProfile, Db basicDb) throws IOException { - if (!isProfileValid(taskProfile)) { - LOGGER.error("task profile invalid {}", taskProfile.toJsonStr()); - return; - } taskManager = (TaskManager) srcManager; commonInit(taskProfile, basicDb); if (retry) { @@ -129,7 +125,8 @@ public class LogFileCollectTask extends Task { } } - private boolean isProfileValid(TaskProfile profile) { + @Override + public boolean isProfileValid(TaskProfile profile) { if (!profile.allRequiredKeyExist()) { LOGGER.error("task profile needs all required key"); return false;