This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new dbd4ea4a1c [INLONG-8647][Agent] Stop sending task snapshot to manager 
(#8648)
dbd4ea4a1c is described below

commit dbd4ea4a1c35b401729abdd0c4fd7d3de69ccc41
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Mon Aug 7 17:18:06 2023 +0800

    [INLONG-8647][Agent] Stop sending task snapshot to manager (#8648)
---
 .../apache/inlong/agent/core/HeartbeatManager.java | 72 ++--------------------
 1 file changed, 4 insertions(+), 68 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index b945969d6f..83946ac56f 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -34,16 +34,12 @@ import 
org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
 import org.apache.inlong.common.heartbeat.GroupHeartbeat;
 import org.apache.inlong.common.heartbeat.HeartbeatMsg;
 import org.apache.inlong.common.heartbeat.StreamHeartbeat;
-import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
-import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 
 import com.google.common.collect.Lists;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
@@ -55,12 +51,8 @@ import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
 import static org.apache.inlong.agent.constant.AgentConstants.AGENT_HTTP_PORT;
 import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_GROUP;
 import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_HTTP_PORT;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_HEARTBEAT_HTTP_PATH;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_HEARTBEAT_INTERVAL;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_HEARTBEAT_HTTP_PATH;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_GROUP_ID;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID;
 
@@ -76,7 +68,6 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
     private final AgentConfiguration conf;
     private final HttpManager httpManager;
     private final String baseManagerUrl;
-    private final String reportSnapshotUrl;
     private final String reportHeartbeatUrl;
     private final Pattern numberPattern = Pattern.compile("^[-+]?[\\d]*$");
 
@@ -88,7 +79,6 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
         jobmanager = agentManager.getJobManager();
         httpManager = new HttpManager(conf);
         baseManagerUrl = HttpManager.buildBaseUrl();
-        reportSnapshotUrl = buildReportSnapShotUrl(baseManagerUrl);
         reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl);
     }
 
@@ -96,7 +86,6 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
         conf = AgentConfiguration.getAgentConf();
         httpManager = new HttpManager(conf);
         baseManagerUrl = HttpManager.buildBaseUrl();
-        reportSnapshotUrl = buildReportSnapShotUrl(baseManagerUrl);
         reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl);
 
         jobmanager = null;
@@ -122,7 +111,6 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
 
     @Override
     public void start() throws Exception {
-        submitWorker(snapshotReportThread());
         submitWorker(heartbeatReportThread());
         submitWorker(printMemoryPermitThread());
     }
@@ -136,24 +124,6 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
         };
     }
 
-    private Runnable snapshotReportThread() {
-        return () -> {
-            while (isRunnable()) {
-                try {
-                    TaskSnapshotRequest taskSnapshotRequest = 
buildTaskSnapshotRequest();
-                    httpManager.doSentPost(reportSnapshotUrl, 
taskSnapshotRequest);
-                    if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug(" {} report snapshot to manager", 
taskSnapshotRequest);
-                    }
-                    SECONDS.sleep(conf.getInt(AGENT_HEARTBEAT_INTERVAL, 
DEFAULT_AGENT_HEARTBEAT_INTERVAL));
-                } catch (Throwable e) {
-                    LOGGER.error("interrupted while report snapshot", e);
-                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
e);
-                }
-            }
-        };
-    }
-
     private Runnable heartbeatReportThread() {
         return () -> {
             while (isRunnable()) {
@@ -182,39 +152,6 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
         httpManager.doSentPost(reportHeartbeatUrl, heartbeat);
     }
 
-    /**
-     * build task snapshot request of job
-     */
-    private TaskSnapshotRequest buildTaskSnapshotRequest() {
-        Map<String, JobWrapper> jobWrapperMap = jobmanager.getJobs();
-        List<TaskSnapshotMessage> taskSnapshotMessageList = new ArrayList<>();
-        TaskSnapshotRequest taskSnapshotRequest = new TaskSnapshotRequest();
-
-        Date date = new Date(System.currentTimeMillis());
-        for (Map.Entry<String, JobWrapper> entry : jobWrapperMap.entrySet()) {
-            if (StringUtils.isBlank(entry.getKey()) || entry.getValue() == 
null) {
-                LOGGER.info("key: {} or value: {} is null", entry.getKey(), 
entry.getValue());
-                continue;
-            }
-            String offset = entry.getValue().getSnapshot();
-            String jobId = entry.getKey();
-            TaskSnapshotMessage snapshotMessage = new TaskSnapshotMessage();
-            snapshotMessage.setSnapshot(offset);
-
-            // TODO Need to make sure the jobId is an Integer
-            if (!numberPattern.matcher(jobId).matches()) {
-                continue;
-            }
-            snapshotMessage.setJobId(Integer.valueOf(jobId));
-            taskSnapshotMessageList.add(snapshotMessage);
-        }
-        taskSnapshotRequest.setSnapshotList(taskSnapshotMessageList);
-        taskSnapshotRequest.setReportTime(date);
-        taskSnapshotRequest.setAgentIp(AgentUtils.fetchLocalIp());
-        taskSnapshotRequest.setUuid(AgentUtils.fetchLocalUuid());
-        return taskSnapshotRequest;
-    }
-
     /**
      * build heartbeat message of agent
      */
@@ -247,9 +184,11 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
         Map<String, JobWrapper> jobWrapperMap = jobmanager.getJobs();
         List<GroupHeartbeat> groupHeartbeats = Lists.newArrayList();
         List<StreamHeartbeat> streamHeartbeats = Lists.newArrayList();
+        List<String> jobIds = Lists.newArrayList();
         jobWrapperMap.values().forEach(jobWrapper -> {
             Job job = jobWrapper.getJob();
             JobProfile jobProfile = job.getJobConf();
+            jobIds.add(jobProfile.getInstanceId());
             final String groupId = jobProfile.get(JOB_GROUP_ID);
             final String streamId = jobProfile.get(JOB_STREAM_ID);
             State currentState = jobWrapper.getCurrentState();
@@ -268,6 +207,8 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
 
         heartbeatMsg.setGroupHeartbeats(groupHeartbeats);
         heartbeatMsg.setStreamHeartbeats(streamHeartbeats);
+
+        LOGGER.info("heartbeat jobIds {} heartbeatMsg {}", jobIds, 
heartbeatMsg);
         return heartbeatMsg;
     }
 
@@ -286,11 +227,6 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
         return heartbeatMsg;
     }
 
-    private String buildReportSnapShotUrl(String baseUrl) {
-        return baseUrl
-                + conf.get(AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH, 
DEFAULT_AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH);
-    }
-
     private String buildReportHeartbeatUrl(String baseUrl) {
         return baseUrl + conf.get(AGENT_MANAGER_HEARTBEAT_HTTP_PATH, 
DEFAULT_AGENT_MANAGER_HEARTBEAT_HTTP_PATH);
     }

Reply via email to