This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ef4d4ff  [INLONG-3317][Agent] Change the default heartbeat and report 
interval to 10s (#3319)
ef4d4ff is described below

commit ef4d4ff3968a3a18dac4e273cd7e3b0039fe553b
Author: kipshi <[email protected]>
AuthorDate: Wed Mar 23 12:45:15 2022 +0800

    [INLONG-3317][Agent] Change the default heartbeat and report interval to 
10s (#3319)
---
 .../inlong/agent/constant/FetcherConstants.java    |  4 +++-
 .../apache/inlong/agent/core/HeartbeatManager.java |  9 +++++---
 .../agent/core/task/TaskPositionManager.java       | 26 ++++++++++++++--------
 inlong-agent/agent-docker/Dockerfile               |  2 ++
 inlong-agent/agent-docker/agent-docker.sh          |  2 ++
 5 files changed, 30 insertions(+), 13 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
index 216a7ff..95da1bf 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
@@ -23,8 +23,10 @@ package org.apache.inlong.agent.constant;
 public class FetcherConstants {
 
     public static final String AGENT_FETCHER_INTERVAL = 
"agent.fetcher.interval";
-    public static final int DEFAULT_AGENT_FETCHER_INTERVAL = 60;
+    public static final int DEFAULT_AGENT_FETCHER_INTERVAL = 10;
 
+    public static final String AGENT_HEARTBEAT_INTERVAL = 
"agent.heartbeat.interval";
+    public static final int DEFAULT_AGENT_HEARTBEAT_INTERVAL = 10;
     public static final String AGENT_MANAGER_REQUEST_TIMEOUT = 
"agent.manager.request.timeout";
     // default is 30s
     public static final int DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT = 30;
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 56fd8e7..2160a3f 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -36,13 +36,14 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_HEARTBEAT_INTERVAL;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
-import static 
org.apache.inlong.agent.core.task.TaskPositionManager.DEFAULT_FLUSH_TIMEOUT;
 
 public class HeartbeatManager extends AbstractDaemon {
 
@@ -60,9 +61,9 @@ public class HeartbeatManager extends AbstractDaemon {
      * Init heartbeat manager.
      */
     public HeartbeatManager(AgentManager agentManager) {
+        this.conf = AgentConfiguration.getAgentConf();
         this.agentManager = agentManager;
         jobmanager = agentManager.getJobManager();
-        conf = AgentConfiguration.getAgentConf();
         httpManager = new HttpManager(conf);
         baseManagerUrl = buildBaseUrl();
         reportSnapshotUrl = builReportSnapShotUrl(baseManagerUrl);
@@ -129,7 +130,9 @@ public class HeartbeatManager extends AbstractDaemon {
                     TaskSnapshotRequest taskSnapshotRequest = getHeartBeat();
                     httpManager.doSentPost(reportSnapshotUrl, 
taskSnapshotRequest);
                     LOGGER.info(" {} report to manager", taskSnapshotRequest);
-                    TimeUnit.SECONDS.sleep(DEFAULT_FLUSH_TIMEOUT);
+                    int heartbeatInterval = 
conf.getInt(AGENT_HEARTBEAT_INTERVAL,
+                            DEFAULT_AGENT_HEARTBEAT_INTERVAL);
+                    TimeUnit.SECONDS.sleep(heartbeatInterval);
                 } catch (Exception ex) {
                     LOGGER.error("error caught", ex);
                 }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
index 72fc137..1ee670e 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
@@ -17,17 +17,21 @@
 
 package org.apache.inlong.agent.core.task;
 
-import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 import org.apache.inlong.agent.common.AbstractDaemon;
+import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.core.AgentManager;
 import org.apache.inlong.agent.db.JobProfileDb;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_FETCHER_INTERVAL;
+
 /**
  * used to store task position to db, task position is stored as properties in 
JobProfile.
  * where key is task read file name and value is task sink position
@@ -36,15 +40,15 @@ import org.slf4j.LoggerFactory;
 public class TaskPositionManager extends AbstractDaemon {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskPositionManager.class);
-    public static final int DEFAULT_FLUSH_TIMEOUT = 30;
 
     private final AgentManager agentManager;
     private final JobProfileDb jobConfDb;
     private ConcurrentHashMap<String, ConcurrentHashMap<String, Long>> 
jobTaskPositionMap;
-
+    private final AgentConfiguration conf;
     private static volatile TaskPositionManager taskPositionManager = null;
 
     private TaskPositionManager(AgentManager agentManager) {
+        this.conf = AgentConfiguration.getAgentConf();
         this.agentManager = agentManager;
         this.jobConfDb = agentManager.getJobManager().getJobConfDb();
         this.jobTaskPositionMap = new ConcurrentHashMap<>();
@@ -52,6 +56,7 @@ public class TaskPositionManager extends AbstractDaemon {
 
     /**
      * task position manager singleton, can only generated by agent manager
+     *
      * @param agentManager
      * @return
      */
@@ -68,6 +73,7 @@ public class TaskPositionManager extends AbstractDaemon {
 
     /**
      * get taskPositionManager singleton
+     *
      * @return
      */
     public static TaskPositionManager getTaskPositionManager() {
@@ -91,13 +97,15 @@ public class TaskPositionManager extends AbstractDaemon {
                         JobProfile jobProfile = jobConfDb.getJobById(jobId);
                         if (jobProfile == null) {
                             LOGGER.warn("jobProfile {} cannot be found in db, "
-                                + "might be deleted by standalone mode, now 
delete job position in memory", jobId);
+                                    + "might be deleted by standalone mode, 
now delete job position in memory", jobId);
                             deleteJobPosition(jobId);
                             continue;
                         }
                         flushJobProfile(jobId, jobProfile);
                     }
-                    TimeUnit.SECONDS.sleep(DEFAULT_FLUSH_TIMEOUT);
+                    int flushTime = conf.getInt(AGENT_HEARTBEAT_INTERVAL,
+                            DEFAULT_AGENT_FETCHER_INTERVAL);
+                    TimeUnit.SECONDS.sleep(flushTime);
                 } catch (Exception ex) {
                     LOGGER.error("error caught", ex);
                 }
@@ -107,7 +115,7 @@ public class TaskPositionManager extends AbstractDaemon {
 
     private void flushJobProfile(String jobId, JobProfile jobProfile) {
         jobTaskPositionMap.get(jobId).forEach(
-            (fileName, position) -> jobProfile.setLong(fileName + 
POSITION_SUFFIX, position)
+                (fileName, position) -> jobProfile.setLong(fileName + 
POSITION_SUFFIX, position)
         );
         if (jobConfDb.checkJobfinished(jobProfile)) {
             LOGGER.info("Cannot update job profile {}, delete memory job in 
jobTaskPosition", jobId);
diff --git a/inlong-agent/agent-docker/Dockerfile 
b/inlong-agent/agent-docker/Dockerfile
index 131145b..5a9189d 100644
--- a/inlong-agent/agent-docker/Dockerfile
+++ b/inlong-agent/agent-docker/Dockerfile
@@ -33,6 +33,8 @@ ENV MANAGER_OPENAPI_PORT=8082
 ENV DATAPROXY_IP=127.0.0.1
 ENV DATAPROXY_PORT=46801
 ENV ETH_NETWORK=eth0
+ENV AGENT_FETCH_INTERVAL=10
+ENV AGENT_HEARTBEAT_INTERVAL=10
 ADD agent-docker.sh bin/
 RUN chmod +x bin/agent-docker.sh
 CMD ["bin/agent-docker.sh"]
diff --git a/inlong-agent/agent-docker/agent-docker.sh 
b/inlong-agent/agent-docker/agent-docker.sh
index d16cb1b..400e9bd 100644
--- a/inlong-agent/agent-docker/agent-docker.sh
+++ b/inlong-agent/agent-docker/agent-docker.sh
@@ -22,6 +22,8 @@ local_ip=$(ifconfig $ETH_NETWORK | grep "inet" | grep -v 
"inet6" | awk '{print $
 cat <<EOF > ${file_path}/conf/agent.properties
 agent.fetcher.classname=org.apache.inlong.agent.plugin.fetcher.ManagerFetcher
 agent.local.ip=$local_ip
+agent.fetcher.interval=$AGENT_FETCH_INTERVAL
+agent.heartbeat.interval=$AGENT_HEARTBEAT_INTERVAL
 agent.manager.vip.http.host=$MANAGER_OPENAPI_IP
 agent.manager.vip.http.port=$MANAGER_OPENAPI_PORT
 agent.dataproxy.http.host=$DATAPROXY_IP

Reply via email to