This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 88d3958e12 [INLONG-9858][Agent] Increase local read and write capabilities for module config (#9892) 88d3958e12 is described below commit 88d3958e127e6d5bcccfabdfb6ccc1f91b7e96c2 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Fri Mar 29 11:36:51 2024 +0800 [INLONG-9858][Agent] Increase local read and write capabilities for module config (#9892) --- .../inlong/agent/installer/ModuleManager.java | 106 +++++++++++++++++---- 1 file changed, 85 insertions(+), 21 deletions(-) diff --git a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java index 303d9bb0f3..dd33a2ed70 100755 --- a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java +++ b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java @@ -18,6 +18,7 @@ package org.apache.inlong.agent.installer; import org.apache.inlong.agent.common.AbstractDaemon; +import org.apache.inlong.agent.constant.AgentConstants; import org.apache.inlong.agent.installer.conf.InstallerConfiguration; import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.utils.AgentUtils; @@ -28,12 +29,27 @@ import org.apache.inlong.common.pojo.agent.installer.ModuleConfig; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Reader; +import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_REQUEST_TIMEOUT; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PREFIX_PATH; @@ -50,27 +66,24 @@ public class ModuleManager extends AbstractDaemon { public static final String MANAGER_AUTH_SECRET_ID = "manager.auth.secretId"; public static final String MANAGER_AUTH_SECRET_KEY = "manager.auth.secretKey"; public static final int CONFIG_QUEUE_CAPACITY = 1; - public static final int CORE_THREAD_SLEEP_TIME = 1000; + public static final int CORE_THREAD_SLEEP_TIME = 10000; + public static final int DOWNLOAD_PACKAGE_READ_BUFF_SIZE = 1024 * 1024; + public static final String LOCAL_CONFIG_FILE = "modules.json"; private static final Logger LOGGER = LoggerFactory.getLogger(ModuleManager.class); private final InstallerConfiguration conf; + private final String confPath; private final BlockingQueue<ConfigResult> configQueue; private String currentMd5 = ""; - private String currentStoragePath = "/tmp"; - private ClassLoader classLoader; + private Map<Integer, ModuleConfig> currentModules = new ConcurrentHashMap<>(); private static final GsonBuilder gsonBuilder = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss"); private static final Gson GSON = gsonBuilder.create(); - private final HttpManager httpManager; + private HttpManager httpManager; public ModuleManager() { conf = InstallerConfiguration.getInstallerConf(); + confPath = conf.get(AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME) + "/conf/"; configQueue = new LinkedBlockingQueue<>(CONFIG_QUEUE_CAPACITY); - classLoader = Thread.currentThread().getContextClassLoader(); - if (classLoader == null) { - classLoader = ModuleManager.class.getClassLoader(); - } - if (requiredKeys(conf)) { - httpManager = getHttpManager(conf); - } else { + if (!requiredKeys(conf)) { throw new RuntimeException("init module manager error, cannot find required key"); } } @@ -97,7 +110,7 @@ public class ModuleManager extends AbstractDaemon { configQueue.clear(); for (int i = 0; i < config.getModuleList().size(); i++) { LOGGER.info("submitModules index {} total {} {}", i, config.getModuleList().size(), - config.getModuleList().get(i)); + GSON.toJson(config.getModuleList().get(i))); } configQueue.add(config); } @@ -106,51 +119,103 @@ public class ModuleManager extends AbstractDaemon { return currentMd5; } + public ModuleConfig getModule(Integer moduleId) { + return currentModules.get(moduleId); + } + /** - * thread for core thread. + * Thread for core thread. * * @return runnable profile. */ private Runnable coreThread() { return () -> { - Thread.currentThread().setName("task-manager-core"); + Thread.currentThread().setName("module-manager-core"); + restoreFromLocalFile(confPath); while (isRunnable()) { try { - AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); dealWithConfigQueue(configQueue); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_MGR_HEARTBEAT, "", "", AgentUtils.getCurrentTime(), 1, 1); + AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); } catch (Throwable ex) { LOGGER.error("exception caught", ex); ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); + AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); } } }; } + public void restoreFromLocalFile(String confPath) { + LOGGER.info("restore modules from local file"); + String localModuleConfigPath = confPath + LOCAL_CONFIG_FILE; + try (Reader reader = new InputStreamReader( + new FileInputStream(localModuleConfigPath), StandardCharsets.UTF_8)) { + JsonElement tmpElement = JsonParser.parseReader(reader).getAsJsonObject(); + ConfigResult curConfig = GSON.fromJson(tmpElement.getAsJsonObject(), ConfigResult.class); + if (curConfig.getMd5() != null && curConfig.getModuleList() != null) { + currentMd5 = curConfig.getMd5(); + curConfig.getModuleList().forEach((module) -> { + currentModules.put(module.getId(), module); + }); + } else { + LOGGER.info("modules in local file invalid"); + } + } catch (FileNotFoundException e) { + LOGGER.info("local module json file {} not found", localModuleConfigPath); + } catch (Exception ioe) { + LOGGER.error("error restoredFromLocalFile {}", localModuleConfigPath, ioe); + } + } + + public void saveToLocalFile(String confPath) { + File temp = new File(confPath); + if (!temp.exists()) { + temp.mkdirs(); + } + File jsonPath = new File(temp.getPath() + "/" + LOCAL_CONFIG_FILE); + try (BufferedWriter writer = new BufferedWriter( + new OutputStreamWriter(new FileOutputStream(jsonPath), StandardCharsets.UTF_8))) { + String curConfig = GSON.toJson(ConfigResult.builder().md5(currentMd5).moduleNum(currentModules.size()) + .moduleList(currentModules.values().stream().collect(Collectors.toList())).build()); + writer.write(curConfig); + writer.flush(); + LOGGER.info("save modules to json file"); + } catch (IOException e) { + LOGGER.error("saveToLocalFile error: ", e); + } + } + private void dealWithConfigQueue(BlockingQueue<ConfigResult> queue) { ConfigResult config = queue.poll(); if (config == null) { return; } - LOGGER.info("Deal with config {}", config); - if (currentMd5.compareTo(config.getMd5()) == 0) { + LOGGER.info("deal with config {}", GSON.toJson(config)); + if (currentMd5.equals(config.getMd5())) { LOGGER.info("md5 no change {}, skip update", currentMd5); return; } if (updateModules(config.getModuleList())) { currentMd5 = config.getMd5(); + saveToLocalFile(confPath); } else { - LOGGER.error("Update modules failed!"); + LOGGER.error("update modules failed!"); } } - private boolean updateModules(List<ModuleConfig> modules) { + private boolean updateModules(List<ModuleConfig> managerModuleList) { + Map<Integer, ModuleConfig> modulesFromManager = new ConcurrentHashMap<>(); + managerModuleList.forEach((moduleConfig) -> { + modulesFromManager.put(moduleConfig.getId(), moduleConfig); + }); return true; } @Override public void start() throws Exception { + httpManager = getHttpManager(conf); submitWorker(coreThread()); } @@ -166,7 +231,6 @@ public class ModuleManager extends AbstractDaemon { */ @Override public void stop() throws Exception { - - LOGGER.info("stopping installer manager"); + waitForTerminate(); } }