This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new dbd4ea4a1c [INLONG-8647][Agent] Stop sending task snapshot to manager (#8648) dbd4ea4a1c is described below commit dbd4ea4a1c35b401729abdd0c4fd7d3de69ccc41 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Mon Aug 7 17:18:06 2023 +0800 [INLONG-8647][Agent] Stop sending task snapshot to manager (#8648) --- .../apache/inlong/agent/core/HeartbeatManager.java | 72 ++-------------------- 1 file changed, 4 insertions(+), 68 deletions(-) diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java index b945969d6f..83946ac56f 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java @@ -34,16 +34,12 @@ import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager; import org.apache.inlong.common.heartbeat.GroupHeartbeat; import org.apache.inlong.common.heartbeat.HeartbeatMsg; import org.apache.inlong.common.heartbeat.StreamHeartbeat; -import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage; -import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest; import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.regex.Pattern; @@ -55,12 +51,8 @@ import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_HTTP_PORT; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_GROUP; import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_HTTP_PORT; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_HEARTBEAT_HTTP_PATH; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH; -import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_HEARTBEAT_INTERVAL; import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_HEARTBEAT_HTTP_PATH; -import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH; import static org.apache.inlong.agent.constant.JobConstants.JOB_GROUP_ID; import static org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID; @@ -76,7 +68,6 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea private final AgentConfiguration conf; private final HttpManager httpManager; private final String baseManagerUrl; - private final String reportSnapshotUrl; private final String reportHeartbeatUrl; private final Pattern numberPattern = Pattern.compile("^[-+]?[\\d]*$"); @@ -88,7 +79,6 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea jobmanager = agentManager.getJobManager(); httpManager = new HttpManager(conf); baseManagerUrl = HttpManager.buildBaseUrl(); - reportSnapshotUrl = buildReportSnapShotUrl(baseManagerUrl); reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl); } @@ -96,7 +86,6 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea conf = AgentConfiguration.getAgentConf(); httpManager = new HttpManager(conf); baseManagerUrl = HttpManager.buildBaseUrl(); - reportSnapshotUrl = buildReportSnapShotUrl(baseManagerUrl); reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl); jobmanager = null; @@ -122,7 +111,6 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea @Override public void start() throws Exception { - submitWorker(snapshotReportThread()); submitWorker(heartbeatReportThread()); submitWorker(printMemoryPermitThread()); } @@ -136,24 +124,6 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea }; } - private Runnable snapshotReportThread() { - return () -> { - while (isRunnable()) { - try { - TaskSnapshotRequest taskSnapshotRequest = buildTaskSnapshotRequest(); - httpManager.doSentPost(reportSnapshotUrl, taskSnapshotRequest); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(" {} report snapshot to manager", taskSnapshotRequest); - } - SECONDS.sleep(conf.getInt(AGENT_HEARTBEAT_INTERVAL, DEFAULT_AGENT_HEARTBEAT_INTERVAL)); - } catch (Throwable e) { - LOGGER.error("interrupted while report snapshot", e); - ThreadUtils.threadThrowableHandler(Thread.currentThread(), e); - } - } - }; - } - private Runnable heartbeatReportThread() { return () -> { while (isRunnable()) { @@ -182,39 +152,6 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea httpManager.doSentPost(reportHeartbeatUrl, heartbeat); } - /** - * build task snapshot request of job - */ - private TaskSnapshotRequest buildTaskSnapshotRequest() { - Map<String, JobWrapper> jobWrapperMap = jobmanager.getJobs(); - List<TaskSnapshotMessage> taskSnapshotMessageList = new ArrayList<>(); - TaskSnapshotRequest taskSnapshotRequest = new TaskSnapshotRequest(); - - Date date = new Date(System.currentTimeMillis()); - for (Map.Entry<String, JobWrapper> entry : jobWrapperMap.entrySet()) { - if (StringUtils.isBlank(entry.getKey()) || entry.getValue() == null) { - LOGGER.info("key: {} or value: {} is null", entry.getKey(), entry.getValue()); - continue; - } - String offset = entry.getValue().getSnapshot(); - String jobId = entry.getKey(); - TaskSnapshotMessage snapshotMessage = new TaskSnapshotMessage(); - snapshotMessage.setSnapshot(offset); - - // TODO Need to make sure the jobId is an Integer - if (!numberPattern.matcher(jobId).matches()) { - continue; - } - snapshotMessage.setJobId(Integer.valueOf(jobId)); - taskSnapshotMessageList.add(snapshotMessage); - } - taskSnapshotRequest.setSnapshotList(taskSnapshotMessageList); - taskSnapshotRequest.setReportTime(date); - taskSnapshotRequest.setAgentIp(AgentUtils.fetchLocalIp()); - taskSnapshotRequest.setUuid(AgentUtils.fetchLocalUuid()); - return taskSnapshotRequest; - } - /** * build heartbeat message of agent */ @@ -247,9 +184,11 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea Map<String, JobWrapper> jobWrapperMap = jobmanager.getJobs(); List<GroupHeartbeat> groupHeartbeats = Lists.newArrayList(); List<StreamHeartbeat> streamHeartbeats = Lists.newArrayList(); + List<String> jobIds = Lists.newArrayList(); jobWrapperMap.values().forEach(jobWrapper -> { Job job = jobWrapper.getJob(); JobProfile jobProfile = job.getJobConf(); + jobIds.add(jobProfile.getInstanceId()); final String groupId = jobProfile.get(JOB_GROUP_ID); final String streamId = jobProfile.get(JOB_STREAM_ID); State currentState = jobWrapper.getCurrentState(); @@ -268,6 +207,8 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea heartbeatMsg.setGroupHeartbeats(groupHeartbeats); heartbeatMsg.setStreamHeartbeats(streamHeartbeats); + + LOGGER.info("heartbeat jobIds {} heartbeatMsg {}", jobIds, heartbeatMsg); return heartbeatMsg; } @@ -286,11 +227,6 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea return heartbeatMsg; } - private String buildReportSnapShotUrl(String baseUrl) { - return baseUrl - + conf.get(AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH, DEFAULT_AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH); - } - private String buildReportHeartbeatUrl(String baseUrl) { return baseUrl + conf.get(AGENT_MANAGER_HEARTBEAT_HTTP_PATH, DEFAULT_AGENT_MANAGER_HEARTBEAT_HTTP_PATH); }