This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 093333de40 [INLONG-9906][Agent] Add configuration comparison logic and 
processing of comparison results (#9908)
093333de40 is described below

commit 093333de4084f2345e56d8c8646e72508e9a8503
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Mon Apr 1 15:36:02 2024 +0800

    [INLONG-9906][Agent] Add configuration comparison logic and processing of 
comparison results (#9908)
    
    * [INLONG-9906][Agent] Add configuration comparison logic and processing of 
comparison results
    
    * [INLONG-9906][Agent] Modify based on comments
    
    * Update 
inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
    
    Co-authored-by: AloysZhang <lofterzh...@gmail.com>
    
    ---------
    
    Co-authored-by: AloysZhang <lofterzh...@gmail.com>
---
 .../inlong/agent/installer/ModuleManager.java      | 151 +++++++++++++++++++++
 1 file changed, 151 insertions(+)

diff --git 
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
 
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
index bc7bf89e8c..00dfc9668d 100755
--- 
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
+++ 
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
@@ -22,10 +22,12 @@ import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.installer.conf.InstallerConfiguration;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ExcuteLinux;
 import org.apache.inlong.agent.utils.HttpManager;
 import org.apache.inlong.agent.utils.ThreadUtils;
 import org.apache.inlong.common.pojo.agent.installer.ConfigResult;
 import org.apache.inlong.common.pojo.agent.installer.ModuleConfig;
+import org.apache.inlong.common.pojo.agent.installer.ModuleStateEnum;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -216,9 +218,158 @@ public class ModuleManager extends AbstractDaemon {
         managerModuleList.forEach((moduleConfig) -> {
             modulesFromManager.put(moduleConfig.getId(), moduleConfig);
         });
+        traverseManagerModulesToLocal(modulesFromManager);
+        traverseLocalModulesToManager(modulesFromManager);
         return true;
     }
 
+    private void traverseManagerModulesToLocal(Map<Integer, ModuleConfig> 
modulesFromManager) {
+        modulesFromManager.values().forEach((managerModule) -> {
+            ModuleConfig localModule = 
currentModules.get(managerModule.getId());
+            if (localModule == null) {
+                LOGGER.info("traverseManagerModulesToLocal module {} {} {} not 
found in local, add it",
+                        managerModule.getId(), managerModule.getName(), 
managerModule.getVersion());
+                addModule(managerModule);
+            } else {
+                if (managerModule.getMd5().equals(localModule.getMd5())) {
+                    LOGGER.info("traverseManagerModulesToLocal module {} {} {} 
md5 no change, do nothing",
+                            localModule.getId(), localModule.getName(), 
localModule.getVersion());
+                } else {
+                    LOGGER.info("traverseManagerModulesToLocal module {} {} {} 
md5 changed, update it",
+                            localModule.getId(), localModule.getName(), 
localModule.getVersion());
+                    updateModule(localModule, managerModule);
+                }
+            }
+        });
+    }
+
+    private void traverseLocalModulesToManager(Map<Integer, ModuleConfig> 
modulesFromManager) {
+        currentModules.values().forEach((localModule) -> {
+            ModuleConfig managerModule = 
modulesFromManager.get(localModule.getId());
+            if (managerModule == null) {
+                LOGGER.info("traverseLocalModulesToManager module {} {} {} not 
found in local, delete it",
+                        localModule.getId(), localModule.getName(), 
localModule.getVersion());
+                deleteModule(localModule);
+            }
+        });
+    }
+
+    private void addModule(ModuleConfig module) {
+        LOGGER.info("add module {} start", module.getName());
+        addAndSaveModuleConfig(module);
+        if (!downloadModule(module)) {
+            LOGGER.error("add module {} but download failed", 
module.getName());
+            return;
+        }
+        saveModuleState(module.getId(), ModuleStateEnum.DOWNLOADED);
+        installModule(module);
+        saveModuleState(module.getId(), ModuleStateEnum.INSTALLED);
+        startModule(module);
+        LOGGER.info("add module {} end", module.getId());
+    }
+
+    private void deleteModule(ModuleConfig module) {
+        LOGGER.info("delete module {} start", module.getId());
+        stopModule(module);
+        uninstallModule(module);
+        deleteAndSaveModuleConfig(module);
+        LOGGER.info("delete module {} end", module.getId());
+    }
+
+    private void updateModule(ModuleConfig localModule, ModuleConfig 
managerModule) {
+        LOGGER.info("update module {} start", localModule.getId());
+        if 
(localModule.getPackageConfig().getMd5().equals(managerModule.getPackageConfig().getMd5()))
 {
+            LOGGER.info("package md5 changed, will reinstall", 
localModule.getId());
+            deleteModule(localModule);
+            addModule(managerModule);
+        } else {
+            LOGGER.info("package md5 no chang, will restart", 
localModule.getId());
+            restartModule(localModule, managerModule);
+        }
+        LOGGER.info("update module {} end", localModule.getId());
+    }
+
+    private void addAndSaveModuleConfig(ModuleConfig module) {
+        module.setState(ModuleStateEnum.NEW);
+        if (currentModules.containsKey(module.getId())) {
+            LOGGER.error("should not happen! module {} found! will force to 
replace it!", module.getId());
+        }
+        currentModules.put(module.getId(), module);
+        saveToLocalFile(confPath);
+    }
+
+    private void deleteAndSaveModuleConfig(ModuleConfig module) {
+        if (!currentModules.containsKey(module.getId())) {
+            LOGGER.error("should not happen! module {} not found!", 
module.getId());
+            return;
+        }
+        currentModules.remove(module.getId());
+        saveToLocalFile(confPath);
+    }
+
+    private boolean saveModuleState(Integer moduleId, ModuleStateEnum state) {
+        ModuleConfig module = currentModules.get(moduleId);
+        if (module == null) {
+            LOGGER.error("should not happen! module {} not found!", moduleId);
+            return false;
+        }
+        module.setState(state);
+        saveToLocalFile(confPath);
+        LOGGER.info("save module state to {} {}", moduleId, state);
+        return true;
+    }
+
+    private void restartModule(ModuleConfig localModule, ModuleConfig 
managerModule) {
+        stopModule(localModule);
+        startModule(managerModule);
+    }
+
+    private void installModule(ModuleConfig module) {
+        LOGGER.info("install module {} with cmd {}", module.getId(), 
module.getInstallCommand());
+        String ret = ExcuteLinux.exeCmd(module.getInstallCommand());
+        LOGGER.info("install module {} return {} ", module.getId(), ret);
+    }
+
+    private boolean startModule(ModuleConfig module) {
+        LOGGER.info("start module {} with cmd {}", module.getId(), 
module.getStartCommand());
+        for (int i = 0; i < module.getProcessesNum(); i++) {
+            String ret = ExcuteLinux.exeCmd(module.getStartCommand());
+            LOGGER.info("start [{}] module {} return {} ", i, module.getId(), 
ret);
+        }
+        if (isProcessAllStarted(module)) {
+            LOGGER.info("start module {} success", module.getId());
+            return true;
+        } else {
+            LOGGER.info("start module {} failed", module.getId());
+            return false;
+        }
+    }
+
+    private void stopModule(ModuleConfig module) {
+        LOGGER.info("stop module {} with cmd {}", module.getId(), 
module.getStopCommand());
+        String ret = ExcuteLinux.exeCmd(module.getStopCommand());
+        LOGGER.info("stop module {} return {} ", module.getId(), ret);
+    }
+
+    private void uninstallModule(ModuleConfig module) {
+        LOGGER.info("uninstall module {} with cmd {}", module.getId(), 
module.getUninstallCommand());
+        String ret = ExcuteLinux.exeCmd(module.getUninstallCommand());
+        LOGGER.info("uninstall module {} return {} ", module.getId(), ret);
+    }
+
+    private boolean isProcessAllStarted(ModuleConfig module) {
+        String ret = ExcuteLinux.exeCmd(module.getCheckCommand());
+        String[] processArray = ret.split("\n");
+        int cnt = 0;
+        for (int i = 0; i < processArray.length; i++) {
+            if (processArray[i].length() > 0) {
+                cnt++;
+            }
+        }
+        LOGGER.info("get module process num {} {}", module.getName(), cnt);
+        return cnt >= module.getProcessesNum();
+    }
+
     private boolean downloadModule(ModuleConfig module) {
         LOGGER.info("download module {} begin with url {}", module.getId(), 
module.getPackageConfig().getDownloadUrl());
         try {

Reply via email to