This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 88d3958e12 [INLONG-9858][Agent] Increase local read and write 
capabilities for module config (#9892)
88d3958e12 is described below

commit 88d3958e127e6d5bcccfabdfb6ccc1f91b7e96c2
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Fri Mar 29 11:36:51 2024 +0800

    [INLONG-9858][Agent] Increase local read and write capabilities for module 
config (#9892)
---
 .../inlong/agent/installer/ModuleManager.java      | 106 +++++++++++++++++----
 1 file changed, 85 insertions(+), 21 deletions(-)

diff --git 
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
 
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
index 303d9bb0f3..dd33a2ed70 100755
--- 
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
+++ 
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.agent.installer;
 
 import org.apache.inlong.agent.common.AbstractDaemon;
+import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.installer.conf.InstallerConfiguration;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
@@ -28,12 +29,27 @@ import 
org.apache.inlong.common.pojo.agent.installer.ModuleConfig;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_REQUEST_TIMEOUT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
@@ -50,27 +66,24 @@ public class ModuleManager extends AbstractDaemon {
     public static final String MANAGER_AUTH_SECRET_ID = 
"manager.auth.secretId";
     public static final String MANAGER_AUTH_SECRET_KEY = 
"manager.auth.secretKey";
     public static final int CONFIG_QUEUE_CAPACITY = 1;
-    public static final int CORE_THREAD_SLEEP_TIME = 1000;
+    public static final int CORE_THREAD_SLEEP_TIME = 10000;
+    public static final int DOWNLOAD_PACKAGE_READ_BUFF_SIZE = 1024 * 1024;
+    public static final String LOCAL_CONFIG_FILE = "modules.json";
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ModuleManager.class);
     private final InstallerConfiguration conf;
+    private final String confPath;
     private final BlockingQueue<ConfigResult> configQueue;
     private String currentMd5 = "";
-    private String currentStoragePath = "/tmp";
-    private ClassLoader classLoader;
+    private Map<Integer, ModuleConfig> currentModules = new 
ConcurrentHashMap<>();
     private static final GsonBuilder gsonBuilder = new 
GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss");
     private static final Gson GSON = gsonBuilder.create();
-    private final HttpManager httpManager;
+    private HttpManager httpManager;
 
     public ModuleManager() {
         conf = InstallerConfiguration.getInstallerConf();
+        confPath = conf.get(AgentConstants.AGENT_HOME, 
AgentConstants.DEFAULT_AGENT_HOME) + "/conf/";
         configQueue = new LinkedBlockingQueue<>(CONFIG_QUEUE_CAPACITY);
-        classLoader = Thread.currentThread().getContextClassLoader();
-        if (classLoader == null) {
-            classLoader = ModuleManager.class.getClassLoader();
-        }
-        if (requiredKeys(conf)) {
-            httpManager = getHttpManager(conf);
-        } else {
+        if (!requiredKeys(conf)) {
             throw new RuntimeException("init module manager error, cannot find 
required key");
         }
     }
@@ -97,7 +110,7 @@ public class ModuleManager extends AbstractDaemon {
         configQueue.clear();
         for (int i = 0; i < config.getModuleList().size(); i++) {
             LOGGER.info("submitModules index {} total {} {}", i, 
config.getModuleList().size(),
-                    config.getModuleList().get(i));
+                    GSON.toJson(config.getModuleList().get(i)));
         }
         configQueue.add(config);
     }
@@ -106,51 +119,103 @@ public class ModuleManager extends AbstractDaemon {
         return currentMd5;
     }
 
+    public ModuleConfig getModule(Integer moduleId) {
+        return currentModules.get(moduleId);
+    }
+
     /**
-     * thread for core thread.
+     * Thread for core thread.
      *
      * @return runnable profile.
      */
     private Runnable coreThread() {
         return () -> {
-            Thread.currentThread().setName("task-manager-core");
+            Thread.currentThread().setName("module-manager-core");
+            restoreFromLocalFile(confPath);
             while (isRunnable()) {
                 try {
-                    AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
                     dealWithConfigQueue(configQueue);
                     
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_MGR_HEARTBEAT, "", "",
                             AgentUtils.getCurrentTime(), 1, 1);
+                    AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
                 } catch (Throwable ex) {
                     LOGGER.error("exception caught", ex);
                     ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
ex);
+                    AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
                 }
             }
         };
     }
 
+    public void restoreFromLocalFile(String confPath) {
+        LOGGER.info("restore modules from local file");
+        String localModuleConfigPath = confPath + LOCAL_CONFIG_FILE;
+        try (Reader reader = new InputStreamReader(
+                new FileInputStream(localModuleConfigPath), 
StandardCharsets.UTF_8)) {
+            JsonElement tmpElement = 
JsonParser.parseReader(reader).getAsJsonObject();
+            ConfigResult curConfig = 
GSON.fromJson(tmpElement.getAsJsonObject(), ConfigResult.class);
+            if (curConfig.getMd5() != null && curConfig.getModuleList() != 
null) {
+                currentMd5 = curConfig.getMd5();
+                curConfig.getModuleList().forEach((module) -> {
+                    currentModules.put(module.getId(), module);
+                });
+            } else {
+                LOGGER.info("modules in local file invalid");
+            }
+        } catch (FileNotFoundException e) {
+            LOGGER.info("local module json file {} not found", 
localModuleConfigPath);
+        } catch (Exception ioe) {
+            LOGGER.error("error restoredFromLocalFile {}", 
localModuleConfigPath, ioe);
+        }
+    }
+
+    public void saveToLocalFile(String confPath) {
+        File temp = new File(confPath);
+        if (!temp.exists()) {
+            temp.mkdirs();
+        }
+        File jsonPath = new File(temp.getPath() + "/" + LOCAL_CONFIG_FILE);
+        try (BufferedWriter writer = new BufferedWriter(
+                new OutputStreamWriter(new FileOutputStream(jsonPath), 
StandardCharsets.UTF_8))) {
+            String curConfig = 
GSON.toJson(ConfigResult.builder().md5(currentMd5).moduleNum(currentModules.size())
+                    
.moduleList(currentModules.values().stream().collect(Collectors.toList())).build());
+            writer.write(curConfig);
+            writer.flush();
+            LOGGER.info("save modules to json file");
+        } catch (IOException e) {
+            LOGGER.error("saveToLocalFile error: ", e);
+        }
+    }
+
     private void dealWithConfigQueue(BlockingQueue<ConfigResult> queue) {
         ConfigResult config = queue.poll();
         if (config == null) {
             return;
         }
-        LOGGER.info("Deal with config {}", config);
-        if (currentMd5.compareTo(config.getMd5()) == 0) {
+        LOGGER.info("deal with config {}", GSON.toJson(config));
+        if (currentMd5.equals(config.getMd5())) {
             LOGGER.info("md5 no change {}, skip update", currentMd5);
             return;
         }
         if (updateModules(config.getModuleList())) {
             currentMd5 = config.getMd5();
+            saveToLocalFile(confPath);
         } else {
-            LOGGER.error("Update modules failed!");
+            LOGGER.error("update modules failed!");
         }
     }
 
-    private boolean updateModules(List<ModuleConfig> modules) {
+    private boolean updateModules(List<ModuleConfig> managerModuleList) {
+        Map<Integer, ModuleConfig> modulesFromManager = new 
ConcurrentHashMap<>();
+        managerModuleList.forEach((moduleConfig) -> {
+            modulesFromManager.put(moduleConfig.getId(), moduleConfig);
+        });
         return true;
     }
 
     @Override
     public void start() throws Exception {
+        httpManager = getHttpManager(conf);
         submitWorker(coreThread());
     }
 
@@ -166,7 +231,6 @@ public class ModuleManager extends AbstractDaemon {
      */
     @Override
     public void stop() throws Exception {
-
-        LOGGER.info("stopping installer manager");
+        waitForTerminate();
     }
 }

Reply via email to