This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 093333de40 [INLONG-9906][Agent] Add configuration comparison logic and processing of comparison results (#9908) 093333de40 is described below commit 093333de4084f2345e56d8c8646e72508e9a8503 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Mon Apr 1 15:36:02 2024 +0800 [INLONG-9906][Agent] Add configuration comparison logic and processing of comparison results (#9908) * [INLONG-9906][Agent] Add configuration comparison logic and processing of comparison results * [INLONG-9906][Agent] Modify based on comments * Update inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java Co-authored-by: AloysZhang <lofterzh...@gmail.com> --------- Co-authored-by: AloysZhang <lofterzh...@gmail.com> --- .../inlong/agent/installer/ModuleManager.java | 151 +++++++++++++++++++++ 1 file changed, 151 insertions(+) diff --git a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java index bc7bf89e8c..00dfc9668d 100755 --- a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java +++ b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java @@ -22,10 +22,12 @@ import org.apache.inlong.agent.constant.AgentConstants; import org.apache.inlong.agent.installer.conf.InstallerConfiguration; import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.ExcuteLinux; import org.apache.inlong.agent.utils.HttpManager; import org.apache.inlong.agent.utils.ThreadUtils; import org.apache.inlong.common.pojo.agent.installer.ConfigResult; import org.apache.inlong.common.pojo.agent.installer.ModuleConfig; +import org.apache.inlong.common.pojo.agent.installer.ModuleStateEnum; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -216,9 +218,158 @@ public class ModuleManager extends AbstractDaemon { managerModuleList.forEach((moduleConfig) -> { modulesFromManager.put(moduleConfig.getId(), moduleConfig); }); + traverseManagerModulesToLocal(modulesFromManager); + traverseLocalModulesToManager(modulesFromManager); return true; } + private void traverseManagerModulesToLocal(Map<Integer, ModuleConfig> modulesFromManager) { + modulesFromManager.values().forEach((managerModule) -> { + ModuleConfig localModule = currentModules.get(managerModule.getId()); + if (localModule == null) { + LOGGER.info("traverseManagerModulesToLocal module {} {} {} not found in local, add it", + managerModule.getId(), managerModule.getName(), managerModule.getVersion()); + addModule(managerModule); + } else { + if (managerModule.getMd5().equals(localModule.getMd5())) { + LOGGER.info("traverseManagerModulesToLocal module {} {} {} md5 no change, do nothing", + localModule.getId(), localModule.getName(), localModule.getVersion()); + } else { + LOGGER.info("traverseManagerModulesToLocal module {} {} {} md5 changed, update it", + localModule.getId(), localModule.getName(), localModule.getVersion()); + updateModule(localModule, managerModule); + } + } + }); + } + + private void traverseLocalModulesToManager(Map<Integer, ModuleConfig> modulesFromManager) { + currentModules.values().forEach((localModule) -> { + ModuleConfig managerModule = modulesFromManager.get(localModule.getId()); + if (managerModule == null) { + LOGGER.info("traverseLocalModulesToManager module {} {} {} not found in local, delete it", + localModule.getId(), localModule.getName(), localModule.getVersion()); + deleteModule(localModule); + } + }); + } + + private void addModule(ModuleConfig module) { + LOGGER.info("add module {} start", module.getName()); + addAndSaveModuleConfig(module); + if (!downloadModule(module)) { + LOGGER.error("add module {} but download failed", module.getName()); + return; + } + saveModuleState(module.getId(), ModuleStateEnum.DOWNLOADED); + installModule(module); + saveModuleState(module.getId(), ModuleStateEnum.INSTALLED); + startModule(module); + LOGGER.info("add module {} end", module.getId()); + } + + private void deleteModule(ModuleConfig module) { + LOGGER.info("delete module {} start", module.getId()); + stopModule(module); + uninstallModule(module); + deleteAndSaveModuleConfig(module); + LOGGER.info("delete module {} end", module.getId()); + } + + private void updateModule(ModuleConfig localModule, ModuleConfig managerModule) { + LOGGER.info("update module {} start", localModule.getId()); + if (localModule.getPackageConfig().getMd5().equals(managerModule.getPackageConfig().getMd5())) { + LOGGER.info("package md5 changed, will reinstall", localModule.getId()); + deleteModule(localModule); + addModule(managerModule); + } else { + LOGGER.info("package md5 no chang, will restart", localModule.getId()); + restartModule(localModule, managerModule); + } + LOGGER.info("update module {} end", localModule.getId()); + } + + private void addAndSaveModuleConfig(ModuleConfig module) { + module.setState(ModuleStateEnum.NEW); + if (currentModules.containsKey(module.getId())) { + LOGGER.error("should not happen! module {} found! will force to replace it!", module.getId()); + } + currentModules.put(module.getId(), module); + saveToLocalFile(confPath); + } + + private void deleteAndSaveModuleConfig(ModuleConfig module) { + if (!currentModules.containsKey(module.getId())) { + LOGGER.error("should not happen! module {} not found!", module.getId()); + return; + } + currentModules.remove(module.getId()); + saveToLocalFile(confPath); + } + + private boolean saveModuleState(Integer moduleId, ModuleStateEnum state) { + ModuleConfig module = currentModules.get(moduleId); + if (module == null) { + LOGGER.error("should not happen! module {} not found!", moduleId); + return false; + } + module.setState(state); + saveToLocalFile(confPath); + LOGGER.info("save module state to {} {}", moduleId, state); + return true; + } + + private void restartModule(ModuleConfig localModule, ModuleConfig managerModule) { + stopModule(localModule); + startModule(managerModule); + } + + private void installModule(ModuleConfig module) { + LOGGER.info("install module {} with cmd {}", module.getId(), module.getInstallCommand()); + String ret = ExcuteLinux.exeCmd(module.getInstallCommand()); + LOGGER.info("install module {} return {} ", module.getId(), ret); + } + + private boolean startModule(ModuleConfig module) { + LOGGER.info("start module {} with cmd {}", module.getId(), module.getStartCommand()); + for (int i = 0; i < module.getProcessesNum(); i++) { + String ret = ExcuteLinux.exeCmd(module.getStartCommand()); + LOGGER.info("start [{}] module {} return {} ", i, module.getId(), ret); + } + if (isProcessAllStarted(module)) { + LOGGER.info("start module {} success", module.getId()); + return true; + } else { + LOGGER.info("start module {} failed", module.getId()); + return false; + } + } + + private void stopModule(ModuleConfig module) { + LOGGER.info("stop module {} with cmd {}", module.getId(), module.getStopCommand()); + String ret = ExcuteLinux.exeCmd(module.getStopCommand()); + LOGGER.info("stop module {} return {} ", module.getId(), ret); + } + + private void uninstallModule(ModuleConfig module) { + LOGGER.info("uninstall module {} with cmd {}", module.getId(), module.getUninstallCommand()); + String ret = ExcuteLinux.exeCmd(module.getUninstallCommand()); + LOGGER.info("uninstall module {} return {} ", module.getId(), ret); + } + + private boolean isProcessAllStarted(ModuleConfig module) { + String ret = ExcuteLinux.exeCmd(module.getCheckCommand()); + String[] processArray = ret.split("\n"); + int cnt = 0; + for (int i = 0; i < processArray.length; i++) { + if (processArray[i].length() > 0) { + cnt++; + } + } + LOGGER.info("get module process num {} {}", module.getName(), cnt); + return cnt >= module.getProcessesNum(); + } + private boolean downloadModule(ModuleConfig module) { LOGGER.info("download module {} begin with url {}", module.getId(), module.getPackageConfig().getDownloadUrl()); try {