This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 12efb53dae [INLONG-10302][Agent] Add an interface for limiting the 
number of instances obtained (#10303)
12efb53dae is described below

commit 12efb53dae42dc38075746a5c062ea69dd6bb7fd
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Wed May 29 19:59:01 2024 +0800

    [INLONG-10302][Agent] Add an interface for limiting the number of instances 
obtained (#10303)
---
 .../main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java | 5 +++--
 .../main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java    | 6 ++++++
 .../main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java  | 6 ++++++
 .../main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java   | 6 ++++++
 .../java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java  | 5 +++++
 5 files changed, 26 insertions(+), 2 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
index a6d8d03482..acf8287817 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
@@ -19,7 +19,6 @@ package org.apache.inlong.agent.plugin.task;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.core.instance.ActionType;
 import org.apache.inlong.agent.core.instance.InstanceAction;
 import org.apache.inlong.agent.core.instance.InstanceManager;
@@ -58,7 +57,7 @@ public abstract class AbstractTask extends Task {
         this.taskProfile = taskProfile;
         this.basicDb = basicDb;
         auditVersion = Long.parseLong(taskProfile.get(TASK_AUDIT_VERSION));
-        instanceManager = new InstanceManager(taskProfile.getTaskId(), 
taskProfile.getInt(TaskConstants.FILE_MAX_NUM),
+        instanceManager = new InstanceManager(taskProfile.getTaskId(), 
getInstanceLimit(),
                 basicDb, taskManager.getTaskDb());
         try {
             instanceManager.start();
@@ -69,6 +68,8 @@ public abstract class AbstractTask extends Task {
         initOK = true;
     }
 
+    protected abstract int getInstanceLimit();
+
     protected abstract void initTask();
 
     protected void releaseTask() {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
index f83104e8a7..e7058c9929 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/KafkaTask.java
@@ -36,10 +36,16 @@ public class KafkaTask extends AbstractTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaTask.class);
     public static final String DEFAULT_KAFKA_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
+    public static final int DEFAULT_INSTANCE_LIMIT = 1;
     private boolean isAdded = false;
     private String topic;
     private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyyMMddHH");
 
+    @Override
+    protected int getInstanceLimit() {
+        return DEFAULT_INSTANCE_LIMIT;
+    }
+
     @Override
     protected void initTask() {
         LOGGER.info("kafka commonInit: {}", taskProfile.toJsonStr());
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
index 2a022ddb78..01f97029b2 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MongoDBTask.java
@@ -35,10 +35,16 @@ public class MongoDBTask extends AbstractTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MongoDBTask.class);
     public static final String DEFAULT_MONGODB_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.MongoDBInstance";
+    public static final int DEFAULT_INSTANCE_LIMIT = 1;
     private boolean isAdded = false;
     private String collection;
     private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyyMMddHH");
 
+    @Override
+    protected int getInstanceLimit() {
+        return DEFAULT_INSTANCE_LIMIT;
+    }
+
     @Override
     protected void initTask() {
         LOGGER.info("mongoDB commonInit: {}", taskProfile.toJsonStr());
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
index a105c45774..62b32dcbca 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PulsarTask.java
@@ -38,6 +38,7 @@ public class PulsarTask extends AbstractTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarTask.class);
     public static final String DEFAULT_PULSAR_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.PulsarInstance";
+    public static final int DEFAULT_INSTANCE_LIMIT = 1;
     private boolean isAdded = false;
     private String tenant;
     private String namespace;
@@ -45,6 +46,11 @@ public class PulsarTask extends AbstractTask {
     private String instanceId;
     private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyyMMddHH");
 
+    @Override
+    protected int getInstanceLimit() {
+        return DEFAULT_INSTANCE_LIMIT;
+    }
+
     @Override
     protected void initTask() {
         LOGGER.info("pulsar commonInit: {}", taskProfile.toJsonStr());
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
index 4e6493c02a..44495b6059 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java
@@ -87,6 +87,11 @@ public class LogFileTask extends AbstractTask {
     private volatile long coreThreadUpdateTime = 0;
     private BlockingQueue<InstanceProfile> instanceQueue;
 
+    @Override
+    protected int getInstanceLimit() {
+        return taskProfile.getInt(TaskConstants.FILE_MAX_NUM);
+    }
+
     @Override
     protected void initTask() {
         instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY);

Reply via email to