This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 12efb53dae [INLONG-10302][Agent] Add an interface for limiting the number of instances obtained (#10303) 12efb53dae is described below commit 12efb53dae42dc38075746a5c062ea69dd6bb7fd Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Wed May 29 19:59:01 2024 +0800 [INLONG-10302][Agent] Add an interface for limiting the number of instances obtained (#10303) --- .../main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java | 5 +++-- .../main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java | 6 ++++++ .../main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java | 6 ++++++ .../main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java | 6 ++++++ .../java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java | 5 +++++ 5 files changed, 26 insertions(+), 2 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java index a6d8d03482..acf8287817 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java @@ -19,7 +19,6 @@ package org.apache.inlong.agent.plugin.task; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.core.instance.ActionType; import org.apache.inlong.agent.core.instance.InstanceAction; import org.apache.inlong.agent.core.instance.InstanceManager; @@ -58,7 +57,7 @@ public abstract class AbstractTask extends Task { this.taskProfile = taskProfile; this.basicDb = basicDb; auditVersion = Long.parseLong(taskProfile.get(TASK_AUDIT_VERSION)); - instanceManager = new InstanceManager(taskProfile.getTaskId(), taskProfile.getInt(TaskConstants.FILE_MAX_NUM), + instanceManager = new InstanceManager(taskProfile.getTaskId(), getInstanceLimit(), basicDb, taskManager.getTaskDb()); try { instanceManager.start(); @@ -69,6 +68,8 @@ public abstract class AbstractTask extends Task { initOK = true; } + protected abstract int getInstanceLimit(); + protected abstract void initTask(); protected void releaseTask() { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java index f83104e8a7..e7058c9929 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java @@ -36,10 +36,16 @@ public class KafkaTask extends AbstractTask { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTask.class); public static final String DEFAULT_KAFKA_INSTANCE = "org.apache.inlong.agent.plugin.instance.KafkaInstance"; + public static final int DEFAULT_INSTANCE_LIMIT = 1; private boolean isAdded = false; private String topic; private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHH"); + @Override + protected int getInstanceLimit() { + return DEFAULT_INSTANCE_LIMIT; + } + @Override protected void initTask() { LOGGER.info("kafka commonInit: {}", taskProfile.toJsonStr()); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java index 2a022ddb78..01f97029b2 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java @@ -35,10 +35,16 @@ public class MongoDBTask extends AbstractTask { private static final Logger LOGGER = LoggerFactory.getLogger(MongoDBTask.class); public static final String DEFAULT_MONGODB_INSTANCE = "org.apache.inlong.agent.plugin.instance.MongoDBInstance"; + public static final int DEFAULT_INSTANCE_LIMIT = 1; private boolean isAdded = false; private String collection; private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHH"); + @Override + protected int getInstanceLimit() { + return DEFAULT_INSTANCE_LIMIT; + } + @Override protected void initTask() { LOGGER.info("mongoDB commonInit: {}", taskProfile.toJsonStr()); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java index a105c45774..62b32dcbca 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java @@ -38,6 +38,7 @@ public class PulsarTask extends AbstractTask { private static final Logger LOGGER = LoggerFactory.getLogger(PulsarTask.class); public static final String DEFAULT_PULSAR_INSTANCE = "org.apache.inlong.agent.plugin.instance.PulsarInstance"; + public static final int DEFAULT_INSTANCE_LIMIT = 1; private boolean isAdded = false; private String tenant; private String namespace; @@ -45,6 +46,11 @@ public class PulsarTask extends AbstractTask { private String instanceId; private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHH"); + @Override + protected int getInstanceLimit() { + return DEFAULT_INSTANCE_LIMIT; + } + @Override protected void initTask() { LOGGER.info("pulsar commonInit: {}", taskProfile.toJsonStr()); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java index 4e6493c02a..44495b6059 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java @@ -87,6 +87,11 @@ public class LogFileTask extends AbstractTask { private volatile long coreThreadUpdateTime = 0; private BlockingQueue<InstanceProfile> instanceQueue; + @Override + protected int getInstanceLimit() { + return taskProfile.getInt(TaskConstants.FILE_MAX_NUM); + } + @Override protected void initTask() { instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY);