This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c48c78dd3 [INLONG-10399][Agent] Add global configurations updater 
(#10400)
6c48c78dd3 is described below

commit 6c48c78dd36d1119e1c40b1204ab9219c2ce1504
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Thu Jun 13 15:22:37 2024 +0800

    [INLONG-10399][Agent] Add global configurations updater (#10400)
    
    * [INLONG-10399][Agent] Add global configurations updater
    
    * [INLONG-10399][Agent] Add global configurations updater
---
 .../inlong/agent/constant/FetcherConstants.java    |  5 +-
 .../org/apache/inlong/agent/core/AgentManager.java | 19 +++++
 .../agent/plugin/fetcher/ManagerFetcher.java       | 83 +++++++++++++---------
 .../common/pojo/agent/AgentConfigRequest.java      |  1 +
 4 files changed, 72 insertions(+), 36 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
index 727748ae05..00c77f3ca6 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
@@ -36,8 +36,9 @@ public class FetcherConstants {
     public static final String DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH = 
"/inlong/manager/openapi";
 
     public static final String AGENT_MANAGER_TASK_HTTP_PATH = 
"agent.manager.task.http.path";
-    public static final String DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH = 
"/agent/reportAndGetTask";
-    public static final String DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH = 
"/agent/getExistTaskConfig";
+    public static final String DEFAULT_AGENT_MANAGER_EXIST_TASK_HTTP_PATH = 
"/agent/getExistTaskConfig";
+    public static final String AGENT_MANAGER_CONFIG_HTTP_PATH = 
"agent.manager.config.http.path";
+    public static final String DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH = 
"/agent/getConfig";
 
     public static final String INSTALLER_MANAGER_CONFIG_HTTP_PATH = 
"installer.manager.config.http.path";
     public static final String DEFAULT_INSTALLER_MANAGER_CONFIG_HTTP_PATH = 
"/installer/getConfig";
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index ca5247fe59..6e8deba0ae 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -22,14 +22,17 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.ProfileFetcher;
 import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.common.pojo.agent.AgentConfigInfo;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.lang.reflect.Constructor;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Agent Manager, the bridge for task manager, task store e.t.c it manages 
agent level operations and communicates
@@ -43,6 +46,8 @@ public class AgentManager extends AbstractDaemon {
     private final ProfileFetcher fetcher;
     private final AgentConfiguration conf;
     private final ExecutorService agentConfMonitor;
+    public static final int CONFIG_QUEUE_CAPACITY = 2;
+    private static BlockingQueue<AgentConfigInfo> configQueue = new 
LinkedBlockingQueue<>(CONFIG_QUEUE_CAPACITY);
 
     public AgentManager() {
         conf = AgentConfiguration.getAgentConf();
@@ -52,6 +57,20 @@ public class AgentManager extends AbstractDaemon {
         heartbeatManager = HeartbeatManager.getInstance(this);
     }
 
+    public static AgentConfigInfo getAgentConfigInfo() {
+        return configQueue.peek();
+    }
+
+    public void subNewAgentConfigInfo(AgentConfigInfo info) {
+        if (info == null) {
+            return;
+        }
+        if (configQueue.size() == CONFIG_QUEUE_CAPACITY) {
+            configQueue.poll();
+        }
+        configQueue.add(info);
+    }
+
     /**
      * init fetch by class name
      */
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index f5fad95d23..812e4c2513 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -26,8 +26,9 @@ import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.HttpManager;
 import org.apache.inlong.agent.utils.ThreadUtils;
-import org.apache.inlong.common.db.CommandEntity;
 import org.apache.inlong.common.enums.PullJobTypeEnum;
+import org.apache.inlong.common.pojo.agent.AgentConfigInfo;
+import org.apache.inlong.common.pojo.agent.AgentConfigRequest;
 import org.apache.inlong.common.pojo.agent.DataConfig;
 import org.apache.inlong.common.pojo.agent.TaskRequest;
 import org.apache.inlong.common.pojo.agent.TaskResult;
@@ -46,15 +47,17 @@ import java.util.Date;
 import java.util.List;
 
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
+import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
 import static org.apache.inlong.agent.constant.AgentConstants.AGENT_UNIQ_ID;
 import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_UNIQ_ID;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_FETCHER_INTERVAL;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_CONFIG_HTTP_PATH;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_RETURN_PARAM_DATA;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_TASK_HTTP_PATH;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_FETCHER_INTERVAL;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_EXIST_TASK_HTTP_PATH;
 import static 
org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData;
 import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalIp;
 import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalUuid;
@@ -69,14 +72,15 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
     private static final GsonBuilder gsonBuilder = new 
GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss");
     private static final Gson GSON = gsonBuilder.create();
     private final String baseManagerUrl;
-    private final String taskConfigUrl;
     private final String staticConfigUrl;
+    private final String agentConfigInfoUrl;
     private final AgentConfiguration conf;
     private final String uniqId;
     private final AgentManager agentManager;
     private final HttpManager httpManager;
     private String localIp;
     private String uuid;
+    private String clusterTag;
     private String clusterName;
 
     public ManagerFetcher(AgentManager agentManager) {
@@ -85,9 +89,10 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
         if (requiredKeys(conf)) {
             httpManager = new HttpManager(conf);
             baseManagerUrl = httpManager.getBaseUrl();
-            taskConfigUrl = buildTaskConfigUrl(baseManagerUrl);
             staticConfigUrl = buildStaticConfigUrl(baseManagerUrl);
+            agentConfigInfoUrl = buildAgentConfigInfoUrl(baseManagerUrl);
             uniqId = conf.get(AGENT_UNIQ_ID, DEFAULT_AGENT_UNIQ_ID);
+            clusterTag = conf.get(AGENT_CLUSTER_TAG);
             clusterName = conf.get(AGENT_CLUSTER_NAME);
         } else {
             throw new RuntimeException("init manager error, cannot find 
required key");
@@ -101,68 +106,74 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
     /**
      * Build task config url for manager according to config
      *
-     * example - 
http://127.0.0.1:8080/inlong/manager/openapi/fileAgent/getTaskConf
+     * example - http://127.0.0.1:8080/inlong/manager/openapi/agent/getTaskConf
      */
-    private String buildTaskConfigUrl(String baseUrl) {
-        return baseUrl + conf.get(AGENT_MANAGER_TASK_HTTP_PATH, 
DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH);
+    private String buildStaticConfigUrl(String baseUrl) {
+        return baseUrl + conf.get(AGENT_MANAGER_TASK_HTTP_PATH, 
DEFAULT_AGENT_MANAGER_EXIST_TASK_HTTP_PATH);
     }
 
     /**
-     * Build task config url for manager according to config
+     * Build agent config info url for manager according to config
      *
-     * example - 
http://127.0.0.1:8080/inlong/manager/openapi/fileAgent/getTaskConf
+     * example - http://127.0.0.1:8080/inlong/manager/openapi/agent/getConfig
      */
-    private String buildStaticConfigUrl(String baseUrl) {
-        return baseUrl + conf.get(AGENT_MANAGER_TASK_HTTP_PATH, 
DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH);
+    private String buildAgentConfigInfoUrl(String baseUrl) {
+        return baseUrl + conf.get(AGENT_MANAGER_CONFIG_HTTP_PATH, 
DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH);
     }
 
     /**
-     * Request manager to get commands, make sure it is not throwing exceptions
+     * Request manager to get task config, make sure it is not throwing 
exceptions
      */
-    public TaskResult fetchTaskConfig() {
-        LOGGER.info("fetchTaskConfig start");
-        String resultStr = httpManager.doSentPost(taskConfigUrl, 
getFetchRequest(null));
+    public TaskResult getStaticConfig() {
+        LOGGER.info("Get static config start");
+        String resultStr = httpManager.doSentPost(staticConfigUrl, 
getTaskRequest());
+        LOGGER.info("Url to get static config staticConfigUrl {}", 
staticConfigUrl);
         JsonObject resultData = getResultData(resultStr);
         JsonElement element = resultData.get(AGENT_MANAGER_RETURN_PARAM_DATA);
-        LOGGER.info("fetchTaskConfig end");
+        LOGGER.info("Get static config  end");
         if (element != null) {
-            LOGGER.info("fetchTaskConfig not null {}", resultData);
+            LOGGER.info("Get static config  not null {}", resultData);
             return GSON.fromJson(element.getAsJsonObject(), TaskResult.class);
         } else {
-            LOGGER.info("fetchTaskConfig nothing to do");
+            LOGGER.info("Get static config  nothing to do");
             return null;
         }
     }
 
     /**
-     * Request manager to get commands, make sure it is not throwing exceptions
+     * Request manager to get config, make sure it is not throwing exceptions
      */
-    public TaskResult getStaticConfig() {
-        LOGGER.info("Get static config start");
-        String resultStr = httpManager.doSentPost(staticConfigUrl, 
getFetchRequest(null));
-        LOGGER.info("Url to get static config staticConfigUrl {}", 
staticConfigUrl);
+    public AgentConfigInfo getAgentConfigInfo() {
+        LOGGER.info("Get agent config info");
+        String resultStr = httpManager.doSentPost(agentConfigInfoUrl, 
getAgentConfigInfoRequest());
+        LOGGER.info("Url to get agent config agentConfigInfoUrl {}", 
agentConfigInfoUrl);
         JsonObject resultData = getResultData(resultStr);
         JsonElement element = resultData.get(AGENT_MANAGER_RETURN_PARAM_DATA);
-        LOGGER.info("Get static config  end");
+        LOGGER.info("Get agent config end");
         if (element != null) {
-            LOGGER.info("Get static config  not null {}", resultData);
-            return GSON.fromJson(element.getAsJsonObject(), TaskResult.class);
+            LOGGER.info("Get agent config not null {}", resultData);
+            return GSON.fromJson(element.getAsJsonObject(), 
AgentConfigInfo.class);
         } else {
-            LOGGER.info("Get static config  nothing to do");
+            LOGGER.info("Get agent config nothing to do");
             return null;
         }
     }
 
-    /**
-     * Form file command fetch request
-     */
-    public TaskRequest getFetchRequest(List<CommandEntity> unackedCommands) {
+    public TaskRequest getTaskRequest() {
         TaskRequest request = new TaskRequest();
         request.setAgentIp(localIp);
         request.setUuid(uuid);
         request.setClusterName(clusterName);
         request.setPullJobType(PullJobTypeEnum.NEW.getType());
-        request.setCommandInfo(unackedCommands);
+        request.setCommandInfo(null);
+        return request;
+    }
+
+    public AgentConfigRequest getAgentConfigInfoRequest() {
+        AgentConfigRequest request = new AgentConfigRequest();
+        request.setClusterTag(clusterTag);
+        request.setClusterName(clusterName);
+        request.setIp(localIp);
         return request;
     }
 
@@ -171,7 +182,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
      *
      * @return runnable profile.
      */
-    private Runnable taskConfigFetchThread() {
+    private Runnable configFetchThread() {
         return () -> {
             Thread.currentThread().setName("ManagerFetcher");
             while (isRunnable()) {
@@ -185,6 +196,10 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
                         });
                         
agentManager.getTaskManager().submitTaskProfiles(taskProfiles);
                     }
+                    AgentConfigInfo config = getAgentConfigInfo();
+                    if (config != null) {
+                        agentManager.subNewAgentConfigInfo(config);
+                    }
                 } catch (Throwable ex) {
                     LOGGER.warn("exception caught", ex);
                     ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
ex);
@@ -243,7 +258,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
         // when agent start, check local ip and fetch manager ip list;
         localIp = fetchLocalIp();
         uuid = fetchLocalUuid();
-        submitWorker(taskConfigFetchThread());
+        submitWorker(configFetchThread());
     }
 
     @Override
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java
index f81bd3a281..857583bc86 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java
@@ -33,4 +33,5 @@ public class AgentConfigRequest {
 
     private String clusterName;
 
+    private String ip;
 }

Reply via email to