This is an automated email from the ASF dual-hosted git repository. wenweihuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 6c48c78dd3 [INLONG-10399][Agent] Add global configurations updater (#10400) 6c48c78dd3 is described below commit 6c48c78dd36d1119e1c40b1204ab9219c2ce1504 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Thu Jun 13 15:22:37 2024 +0800 [INLONG-10399][Agent] Add global configurations updater (#10400) * [INLONG-10399][Agent] Add global configurations updater * [INLONG-10399][Agent] Add global configurations updater --- .../inlong/agent/constant/FetcherConstants.java | 5 +- .../org/apache/inlong/agent/core/AgentManager.java | 19 +++++ .../agent/plugin/fetcher/ManagerFetcher.java | 83 +++++++++++++--------- .../common/pojo/agent/AgentConfigRequest.java | 1 + 4 files changed, 72 insertions(+), 36 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java index 727748ae05..00c77f3ca6 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java @@ -36,8 +36,9 @@ public class FetcherConstants { public static final String DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH = "/inlong/manager/openapi"; public static final String AGENT_MANAGER_TASK_HTTP_PATH = "agent.manager.task.http.path"; - public static final String DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH = "/agent/reportAndGetTask"; - public static final String DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH = "/agent/getExistTaskConfig"; + public static final String DEFAULT_AGENT_MANAGER_EXIST_TASK_HTTP_PATH = "/agent/getExistTaskConfig"; + public static final String AGENT_MANAGER_CONFIG_HTTP_PATH = "agent.manager.config.http.path"; + public static final String DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH = "/agent/getConfig"; public static final String INSTALLER_MANAGER_CONFIG_HTTP_PATH = "installer.manager.config.http.path"; public static final String DEFAULT_INSTALLER_MANAGER_CONFIG_HTTP_PATH = "/installer/getConfig"; diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java index ca5247fe59..6e8deba0ae 100755 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java @@ -22,14 +22,17 @@ import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.ProfileFetcher; import org.apache.inlong.agent.constant.AgentConstants; import org.apache.inlong.agent.core.task.TaskManager; +import org.apache.inlong.common.pojo.agent.AgentConfigInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.lang.reflect.Constructor; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; /** * Agent Manager, the bridge for task manager, task store e.t.c it manages agent level operations and communicates @@ -43,6 +46,8 @@ public class AgentManager extends AbstractDaemon { private final ProfileFetcher fetcher; private final AgentConfiguration conf; private final ExecutorService agentConfMonitor; + public static final int CONFIG_QUEUE_CAPACITY = 2; + private static BlockingQueue<AgentConfigInfo> configQueue = new LinkedBlockingQueue<>(CONFIG_QUEUE_CAPACITY); public AgentManager() { conf = AgentConfiguration.getAgentConf(); @@ -52,6 +57,20 @@ public class AgentManager extends AbstractDaemon { heartbeatManager = HeartbeatManager.getInstance(this); } + public static AgentConfigInfo getAgentConfigInfo() { + return configQueue.peek(); + } + + public void subNewAgentConfigInfo(AgentConfigInfo info) { + if (info == null) { + return; + } + if (configQueue.size() == CONFIG_QUEUE_CAPACITY) { + configQueue.poll(); + } + configQueue.add(info); + } + /** * init fetch by class name */ diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java index f5fad95d23..812e4c2513 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java @@ -26,8 +26,9 @@ import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.agent.utils.HttpManager; import org.apache.inlong.agent.utils.ThreadUtils; -import org.apache.inlong.common.db.CommandEntity; import org.apache.inlong.common.enums.PullJobTypeEnum; +import org.apache.inlong.common.pojo.agent.AgentConfigInfo; +import org.apache.inlong.common.pojo.agent.AgentConfigRequest; import org.apache.inlong.common.pojo.agent.DataConfig; import org.apache.inlong.common.pojo.agent.TaskRequest; import org.apache.inlong.common.pojo.agent.TaskResult; @@ -46,15 +47,17 @@ import java.util.Date; import java.util.List; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME; +import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_UNIQ_ID; import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_UNIQ_ID; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_FETCHER_INTERVAL; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_CONFIG_HTTP_PATH; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_RETURN_PARAM_DATA; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_TASK_HTTP_PATH; import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_FETCHER_INTERVAL; import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH; -import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH; +import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_EXIST_TASK_HTTP_PATH; import static org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData; import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalIp; import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalUuid; @@ -69,14 +72,15 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { private static final GsonBuilder gsonBuilder = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss"); private static final Gson GSON = gsonBuilder.create(); private final String baseManagerUrl; - private final String taskConfigUrl; private final String staticConfigUrl; + private final String agentConfigInfoUrl; private final AgentConfiguration conf; private final String uniqId; private final AgentManager agentManager; private final HttpManager httpManager; private String localIp; private String uuid; + private String clusterTag; private String clusterName; public ManagerFetcher(AgentManager agentManager) { @@ -85,9 +89,10 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { if (requiredKeys(conf)) { httpManager = new HttpManager(conf); baseManagerUrl = httpManager.getBaseUrl(); - taskConfigUrl = buildTaskConfigUrl(baseManagerUrl); staticConfigUrl = buildStaticConfigUrl(baseManagerUrl); + agentConfigInfoUrl = buildAgentConfigInfoUrl(baseManagerUrl); uniqId = conf.get(AGENT_UNIQ_ID, DEFAULT_AGENT_UNIQ_ID); + clusterTag = conf.get(AGENT_CLUSTER_TAG); clusterName = conf.get(AGENT_CLUSTER_NAME); } else { throw new RuntimeException("init manager error, cannot find required key"); @@ -101,68 +106,74 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { /** * Build task config url for manager according to config * - * example - http://127.0.0.1:8080/inlong/manager/openapi/fileAgent/getTaskConf + * example - http://127.0.0.1:8080/inlong/manager/openapi/agent/getTaskConf */ - private String buildTaskConfigUrl(String baseUrl) { - return baseUrl + conf.get(AGENT_MANAGER_TASK_HTTP_PATH, DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH); + private String buildStaticConfigUrl(String baseUrl) { + return baseUrl + conf.get(AGENT_MANAGER_TASK_HTTP_PATH, DEFAULT_AGENT_MANAGER_EXIST_TASK_HTTP_PATH); } /** - * Build task config url for manager according to config + * Build agent config info url for manager according to config * - * example - http://127.0.0.1:8080/inlong/manager/openapi/fileAgent/getTaskConf + * example - http://127.0.0.1:8080/inlong/manager/openapi/agent/getConfig */ - private String buildStaticConfigUrl(String baseUrl) { - return baseUrl + conf.get(AGENT_MANAGER_TASK_HTTP_PATH, DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH); + private String buildAgentConfigInfoUrl(String baseUrl) { + return baseUrl + conf.get(AGENT_MANAGER_CONFIG_HTTP_PATH, DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH); } /** - * Request manager to get commands, make sure it is not throwing exceptions + * Request manager to get task config, make sure it is not throwing exceptions */ - public TaskResult fetchTaskConfig() { - LOGGER.info("fetchTaskConfig start"); - String resultStr = httpManager.doSentPost(taskConfigUrl, getFetchRequest(null)); + public TaskResult getStaticConfig() { + LOGGER.info("Get static config start"); + String resultStr = httpManager.doSentPost(staticConfigUrl, getTaskRequest()); + LOGGER.info("Url to get static config staticConfigUrl {}", staticConfigUrl); JsonObject resultData = getResultData(resultStr); JsonElement element = resultData.get(AGENT_MANAGER_RETURN_PARAM_DATA); - LOGGER.info("fetchTaskConfig end"); + LOGGER.info("Get static config end"); if (element != null) { - LOGGER.info("fetchTaskConfig not null {}", resultData); + LOGGER.info("Get static config not null {}", resultData); return GSON.fromJson(element.getAsJsonObject(), TaskResult.class); } else { - LOGGER.info("fetchTaskConfig nothing to do"); + LOGGER.info("Get static config nothing to do"); return null; } } /** - * Request manager to get commands, make sure it is not throwing exceptions + * Request manager to get config, make sure it is not throwing exceptions */ - public TaskResult getStaticConfig() { - LOGGER.info("Get static config start"); - String resultStr = httpManager.doSentPost(staticConfigUrl, getFetchRequest(null)); - LOGGER.info("Url to get static config staticConfigUrl {}", staticConfigUrl); + public AgentConfigInfo getAgentConfigInfo() { + LOGGER.info("Get agent config info"); + String resultStr = httpManager.doSentPost(agentConfigInfoUrl, getAgentConfigInfoRequest()); + LOGGER.info("Url to get agent config agentConfigInfoUrl {}", agentConfigInfoUrl); JsonObject resultData = getResultData(resultStr); JsonElement element = resultData.get(AGENT_MANAGER_RETURN_PARAM_DATA); - LOGGER.info("Get static config end"); + LOGGER.info("Get agent config end"); if (element != null) { - LOGGER.info("Get static config not null {}", resultData); - return GSON.fromJson(element.getAsJsonObject(), TaskResult.class); + LOGGER.info("Get agent config not null {}", resultData); + return GSON.fromJson(element.getAsJsonObject(), AgentConfigInfo.class); } else { - LOGGER.info("Get static config nothing to do"); + LOGGER.info("Get agent config nothing to do"); return null; } } - /** - * Form file command fetch request - */ - public TaskRequest getFetchRequest(List<CommandEntity> unackedCommands) { + public TaskRequest getTaskRequest() { TaskRequest request = new TaskRequest(); request.setAgentIp(localIp); request.setUuid(uuid); request.setClusterName(clusterName); request.setPullJobType(PullJobTypeEnum.NEW.getType()); - request.setCommandInfo(unackedCommands); + request.setCommandInfo(null); + return request; + } + + public AgentConfigRequest getAgentConfigInfoRequest() { + AgentConfigRequest request = new AgentConfigRequest(); + request.setClusterTag(clusterTag); + request.setClusterName(clusterName); + request.setIp(localIp); return request; } @@ -171,7 +182,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { * * @return runnable profile. */ - private Runnable taskConfigFetchThread() { + private Runnable configFetchThread() { return () -> { Thread.currentThread().setName("ManagerFetcher"); while (isRunnable()) { @@ -185,6 +196,10 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { }); agentManager.getTaskManager().submitTaskProfiles(taskProfiles); } + AgentConfigInfo config = getAgentConfigInfo(); + if (config != null) { + agentManager.subNewAgentConfigInfo(config); + } } catch (Throwable ex) { LOGGER.warn("exception caught", ex); ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); @@ -243,7 +258,7 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { // when agent start, check local ip and fetch manager ip list; localIp = fetchLocalIp(); uuid = fetchLocalUuid(); - submitWorker(taskConfigFetchThread()); + submitWorker(configFetchThread()); } @Override diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java index f81bd3a281..857583bc86 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java @@ -33,4 +33,5 @@ public class AgentConfigRequest { private String clusterName; + private String ip; }