This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ef4d4ff [INLONG-3317][Agent] Change the default heartbeat and report
interval to 10s (#3319)
ef4d4ff is described below
commit ef4d4ff3968a3a18dac4e273cd7e3b0039fe553b
Author: kipshi <[email protected]>
AuthorDate: Wed Mar 23 12:45:15 2022 +0800
[INLONG-3317][Agent] Change the default heartbeat and report interval to
10s (#3319)
---
.../inlong/agent/constant/FetcherConstants.java | 4 +++-
.../apache/inlong/agent/core/HeartbeatManager.java | 9 +++++---
.../agent/core/task/TaskPositionManager.java | 26 ++++++++++++++--------
inlong-agent/agent-docker/Dockerfile | 2 ++
inlong-agent/agent-docker/agent-docker.sh | 2 ++
5 files changed, 30 insertions(+), 13 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
index 216a7ff..95da1bf 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
@@ -23,8 +23,10 @@ package org.apache.inlong.agent.constant;
public class FetcherConstants {
public static final String AGENT_FETCHER_INTERVAL =
"agent.fetcher.interval";
- public static final int DEFAULT_AGENT_FETCHER_INTERVAL = 60;
+ public static final int DEFAULT_AGENT_FETCHER_INTERVAL = 10;
+ public static final String AGENT_HEARTBEAT_INTERVAL =
"agent.heartbeat.interval";
+ public static final int DEFAULT_AGENT_HEARTBEAT_INTERVAL = 10;
public static final String AGENT_MANAGER_REQUEST_TIMEOUT =
"agent.manager.request.timeout";
// default is 30s
public static final int DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT = 30;
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 56fd8e7..2160a3f 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -36,13 +36,14 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
+import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_HEARTBEAT_INTERVAL;
import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH;
import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
-import static
org.apache.inlong.agent.core.task.TaskPositionManager.DEFAULT_FLUSH_TIMEOUT;
public class HeartbeatManager extends AbstractDaemon {
@@ -60,9 +61,9 @@ public class HeartbeatManager extends AbstractDaemon {
* Init heartbeat manager.
*/
public HeartbeatManager(AgentManager agentManager) {
+ this.conf = AgentConfiguration.getAgentConf();
this.agentManager = agentManager;
jobmanager = agentManager.getJobManager();
- conf = AgentConfiguration.getAgentConf();
httpManager = new HttpManager(conf);
baseManagerUrl = buildBaseUrl();
reportSnapshotUrl = builReportSnapShotUrl(baseManagerUrl);
@@ -129,7 +130,9 @@ public class HeartbeatManager extends AbstractDaemon {
TaskSnapshotRequest taskSnapshotRequest = getHeartBeat();
httpManager.doSentPost(reportSnapshotUrl,
taskSnapshotRequest);
LOGGER.info(" {} report to manager", taskSnapshotRequest);
- TimeUnit.SECONDS.sleep(DEFAULT_FLUSH_TIMEOUT);
+ int heartbeatInterval =
conf.getInt(AGENT_HEARTBEAT_INTERVAL,
+ DEFAULT_AGENT_HEARTBEAT_INTERVAL);
+ TimeUnit.SECONDS.sleep(heartbeatInterval);
} catch (Exception ex) {
LOGGER.error("error caught", ex);
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
index 72fc137..1ee670e 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
@@ -17,17 +17,21 @@
package org.apache.inlong.agent.core.task;
-import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
import org.apache.inlong.agent.common.AbstractDaemon;
+import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.db.JobProfileDb;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL;
+import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_FETCHER_INTERVAL;
+
/**
* used to store task position to db, task position is stored as properties in
JobProfile.
* where key is task read file name and value is task sink position
@@ -36,15 +40,15 @@ import org.slf4j.LoggerFactory;
public class TaskPositionManager extends AbstractDaemon {
private static final Logger LOGGER =
LoggerFactory.getLogger(TaskPositionManager.class);
- public static final int DEFAULT_FLUSH_TIMEOUT = 30;
private final AgentManager agentManager;
private final JobProfileDb jobConfDb;
private ConcurrentHashMap<String, ConcurrentHashMap<String, Long>>
jobTaskPositionMap;
-
+ private final AgentConfiguration conf;
private static volatile TaskPositionManager taskPositionManager = null;
private TaskPositionManager(AgentManager agentManager) {
+ this.conf = AgentConfiguration.getAgentConf();
this.agentManager = agentManager;
this.jobConfDb = agentManager.getJobManager().getJobConfDb();
this.jobTaskPositionMap = new ConcurrentHashMap<>();
@@ -52,6 +56,7 @@ public class TaskPositionManager extends AbstractDaemon {
/**
* task position manager singleton, can only generated by agent manager
+ *
* @param agentManager
* @return
*/
@@ -68,6 +73,7 @@ public class TaskPositionManager extends AbstractDaemon {
/**
* get taskPositionManager singleton
+ *
* @return
*/
public static TaskPositionManager getTaskPositionManager() {
@@ -91,13 +97,15 @@ public class TaskPositionManager extends AbstractDaemon {
JobProfile jobProfile = jobConfDb.getJobById(jobId);
if (jobProfile == null) {
LOGGER.warn("jobProfile {} cannot be found in db, "
- + "might be deleted by standalone mode, now
delete job position in memory", jobId);
+ + "might be deleted by standalone mode,
now delete job position in memory", jobId);
deleteJobPosition(jobId);
continue;
}
flushJobProfile(jobId, jobProfile);
}
- TimeUnit.SECONDS.sleep(DEFAULT_FLUSH_TIMEOUT);
+ int flushTime = conf.getInt(AGENT_HEARTBEAT_INTERVAL,
+ DEFAULT_AGENT_FETCHER_INTERVAL);
+ TimeUnit.SECONDS.sleep(flushTime);
} catch (Exception ex) {
LOGGER.error("error caught", ex);
}
@@ -107,7 +115,7 @@ public class TaskPositionManager extends AbstractDaemon {
private void flushJobProfile(String jobId, JobProfile jobProfile) {
jobTaskPositionMap.get(jobId).forEach(
- (fileName, position) -> jobProfile.setLong(fileName +
POSITION_SUFFIX, position)
+ (fileName, position) -> jobProfile.setLong(fileName +
POSITION_SUFFIX, position)
);
if (jobConfDb.checkJobfinished(jobProfile)) {
LOGGER.info("Cannot update job profile {}, delete memory job in
jobTaskPosition", jobId);
diff --git a/inlong-agent/agent-docker/Dockerfile
b/inlong-agent/agent-docker/Dockerfile
index 131145b..5a9189d 100644
--- a/inlong-agent/agent-docker/Dockerfile
+++ b/inlong-agent/agent-docker/Dockerfile
@@ -33,6 +33,8 @@ ENV MANAGER_OPENAPI_PORT=8082
ENV DATAPROXY_IP=127.0.0.1
ENV DATAPROXY_PORT=46801
ENV ETH_NETWORK=eth0
+ENV AGENT_FETCH_INTERVAL=10
+ENV AGENT_HEARTBEAT_INTERVAL=10
ADD agent-docker.sh bin/
RUN chmod +x bin/agent-docker.sh
CMD ["bin/agent-docker.sh"]
diff --git a/inlong-agent/agent-docker/agent-docker.sh
b/inlong-agent/agent-docker/agent-docker.sh
index d16cb1b..400e9bd 100644
--- a/inlong-agent/agent-docker/agent-docker.sh
+++ b/inlong-agent/agent-docker/agent-docker.sh
@@ -22,6 +22,8 @@ local_ip=$(ifconfig $ETH_NETWORK | grep "inet" | grep -v
"inet6" | awk '{print $
cat <<EOF > ${file_path}/conf/agent.properties
agent.fetcher.classname=org.apache.inlong.agent.plugin.fetcher.ManagerFetcher
agent.local.ip=$local_ip
+agent.fetcher.interval=$AGENT_FETCH_INTERVAL
+agent.heartbeat.interval=$AGENT_HEARTBEAT_INTERVAL
agent.manager.vip.http.host=$MANAGER_OPENAPI_IP
agent.manager.vip.http.port=$MANAGER_OPENAPI_PORT
agent.dataproxy.http.host=$DATAPROXY_IP