This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new bd9ce03a34 [INLONG-11815][Agent] Add a unified reporting point for 
events (#11816)
bd9ce03a34 is described below

commit bd9ce03a34b60e4cf10b8698d6bfe770051cf0b1
Author: justinwwhuang <wenweihu...@apache.org>
AuthorDate: Wed Apr 2 14:09:50 2025 +0800

    [INLONG-11815][Agent] Add a unified reporting point for events (#11816)
    
    * [INLONG-11815][Agent] Add a unified reporting point for events
    
    * [INLONG-11815][Agent] Close all senders when creating sender error
---
 .../inlong/agent/constant/CommonConstants.java     |   2 +-
 .../inlong/agent/metrics/AgentEventMetricItem.java |  72 ++++++++++++++
 .../agent/metrics/AgentEventMetricItemSet.java     |  39 ++++++++
 .../inlong/agent/utils/EventReportUtils.java       | 108 +++++++++++++++++++++
 .../inlong/agent/core/AgentStatusManager.java      |  15 ++-
 .../inlong/agent/core/FileStaticManager.java       |  15 ++-
 .../apache/inlong/agent/core/HeartbeatManager.java |   7 ++
 .../apache/inlong/agent/core/task/TaskManager.java |  13 +++
 .../agent/plugin/fetcher/ManagerFetcher.java       |  59 ++++++++---
 .../agent/plugin/sinks/dataproxy/Sender.java       |  15 ++-
 10 files changed, 325 insertions(+), 20 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index db0e509718..a20f281c63 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -41,7 +41,7 @@ public class CommonConstants {
     public static final boolean DEFAULT_PROXY_IS_COMPRESS = true;
 
     public static final String PROXY_MAX_SENDER_PER_GROUP = 
"proxy.max.sender.per.group";
-    public static final int DEFAULT_PROXY_MAX_SENDER_PER_GROUP = 10;
+    public static final int DEFAULT_PROXY_MAX_SENDER_PER_GROUP = 3;
 
     // max size of message list
     public static final String PROXY_PACKAGE_MAX_SIZE = 
"proxy.package.maxSize";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItem.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItem.java
new file mode 100644
index 0000000000..275326b611
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItem.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.metrics;
+
+import org.apache.inlong.common.metric.CountMetric;
+import org.apache.inlong.common.metric.Dimension;
+import org.apache.inlong.common.metric.MetricDomain;
+import org.apache.inlong.common.metric.MetricItem;
+
+import java.text.SimpleDateFormat;
+import java.util.concurrent.atomic.AtomicLong;
+
+@MetricDomain(name = "AgentEvent")
+public class AgentEventMetricItem extends MetricItem {
+
+    public final static SimpleDateFormat FORMAT = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    public static final String KEY_INLONG_EVENT_TIME = "eventTime";
+    public static final String KEY_INLONG_GROUP_ID = "groupId";
+    public static final String KEY_INLONG_STREAM_ID = "streamId";
+    public static final String KEY_INLONG_COMPONENT_TYPE = "componentType";
+    public static final String KEY_INLONG_COMPONENT_NAME = "componentName";
+    public static final String KEY_INLONG_AGENT_IP = "agentIp";
+    public static final String KEY_INLONG_COMPONENT_VERSION = 
"componentVersion";
+    public static final String KEY_INLONG_EVENT_TYPE = "eventType";
+    public static final String KEY_INLONG_EVENT_LEVEL = "eventLevel";
+    public static final String KEY_INLONG_EVENT_CODE = "eventCode";
+    public static final String KEY_INLONG_EXT = "ext";
+    public static final String KEY_INLONG_EVENT_DESC = "eventDesc";
+
+    @Dimension
+    public String eventTime;
+    @Dimension
+    public String groupId;
+    @Dimension
+    public String streamId;
+    @Dimension
+    public String componentType;
+    @Dimension
+    public String componentName;
+    @Dimension
+    public String agentIp;
+    @Dimension
+    public String componentVersion;
+    @Dimension
+    public String eventType;
+    @Dimension
+    public String eventLevel;
+    @Dimension
+    public String eventCode;
+    @Dimension
+    public String ext;
+    @Dimension
+    public String eventDesc;
+
+    @CountMetric
+    public AtomicLong count = new AtomicLong(0);
+}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItemSet.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItemSet.java
new file mode 100644
index 0000000000..48f4f25e42
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItemSet.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.metrics;
+
+import org.apache.inlong.common.metric.MetricDomain;
+import org.apache.inlong.common.metric.MetricItemSet;
+
+@MetricDomain(name = "AgentEvent")
+public class AgentEventMetricItemSet extends 
MetricItemSet<AgentEventMetricItem> {
+
+    /**
+     * Constructor
+     *
+     * @param name
+     */
+    public AgentEventMetricItemSet(String name) {
+        super(name);
+    }
+
+    @Override
+    protected AgentEventMetricItem createItem() {
+        return new AgentEventMetricItem();
+    }
+}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/EventReportUtils.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/EventReportUtils.java
new file mode 100644
index 0000000000..3c3863fc1f
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/EventReportUtils.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.utils;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.metrics.AgentEventMetricItem;
+import org.apache.inlong.agent.metrics.AgentEventMetricItemSet;
+import org.apache.inlong.common.metric.MetricRegister;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_IP;
+import static 
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_AGENT_IP;
+import static 
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_COMPONENT_NAME;
+import static 
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_COMPONENT_TYPE;
+import static 
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_COMPONENT_VERSION;
+import static 
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_CODE;
+import static 
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_DESC;
+import static 
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_LEVEL;
+import static 
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_TIME;
+import static 
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_TYPE;
+import static 
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EXT;
+import static 
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_STREAM_ID;
+
+/**
+ * DiagUtils
+ */
+public class EventReportUtils {
+
+    public enum EvenCodeEnum {
+
+        CONFIG_UPDATE_SUC(0, "config update suc"),
+        CONFIG_NO_UPDATE(1, "config no update"),
+        CONFIG_UPDATE_VERSION_NO_CHANGE(2, "config update version no change"),
+        CONFIG_INVALID_RET_CODE(3, "config invalid ret code"),
+        CONFIG_INVALID_RESULT(4, "config invalid result maybe visit manager 
failed"),
+        TASK_ADD(5, "task add"),
+        TASK_DELETE(6, "task delete");
+
+        private final int code;
+        private final String message;
+
+        EvenCodeEnum(int code, String message) {
+            this.code = code;
+            this.message = message;
+        }
+
+        public int getCode() {
+            return code;
+        }
+
+        public String getMessage() {
+            return message;
+        }
+    }
+
+    private final static String COMPONENT_TYPE_AGENT = "AGENT";
+    private final static String COMPONENT_NAME_AGENT = "AGENT";
+    public static final String EVENT_TYPE_CONFIG_UPDATE = "CONFIG_UPDATE";
+    public static final String EVENT_LEVEL_INFO = "INFO";
+    public static final String EVENT_LEVEL_WARN = "WARN";
+    public static final String EVENT_LEVEL_ERROR = "ERROR";
+    private static AgentEventMetricItemSet metricItemSet;
+
+    private EventReportUtils() {
+    }
+
+    public static void init() {
+        metricItemSet = new AgentEventMetricItemSet(COMPONENT_NAME_AGENT);
+        MetricRegister.register(metricItemSet);
+    }
+
+    public static void report(String groupId, String streamId, long eventTime, 
String eventType,
+            String eventLevel, EvenCodeEnum evenCode, String ext, String desc) 
{
+        Map<String, String> dims = new HashMap<>();
+        dims.put(KEY_INLONG_GROUP_ID, groupId);
+        dims.put(KEY_INLONG_STREAM_ID, streamId);
+        dims.put(KEY_INLONG_COMPONENT_TYPE, COMPONENT_TYPE_AGENT);
+        dims.put(KEY_INLONG_COMPONENT_NAME, COMPONENT_NAME_AGENT);
+        dims.put(KEY_INLONG_AGENT_IP, 
AgentConfiguration.getAgentConf().get(AGENT_LOCAL_IP));
+        dims.put(KEY_INLONG_COMPONENT_VERSION, 
EventReportUtils.class.getPackage().getImplementationVersion());
+        dims.put(KEY_INLONG_EVENT_TIME, AgentEventMetricItem.FORMAT.format(new 
Date(eventTime)));
+        dims.put(KEY_INLONG_EVENT_TYPE, eventType);
+        dims.put(KEY_INLONG_EVENT_LEVEL, eventLevel);
+        dims.put(KEY_INLONG_EVENT_CODE, String.valueOf(evenCode.getCode()));
+        dims.put(KEY_INLONG_EXT, ext.replaceAll("\\|", "-"));
+        dims.put(KEY_INLONG_EVENT_DESC, desc);
+        metricItemSet.findMetricItem(dims).count.addAndGet(1);
+    }
+}
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
index 50085fa1bd..314fe5e669 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
@@ -21,6 +21,7 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.core.task.MemoryManager;
 import org.apache.inlong.agent.core.task.OffsetManager;
 import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.ExcuteLinux;
 import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
@@ -167,10 +168,20 @@ public class AgentStatusManager {
         }
         try {
             ProcessResult procResult = new ProcessResult();
+            long dataTime = AgentUtils.getCurrentTime();
+            byte[] body = 
data.getFieldsString().getBytes(StandardCharsets.UTF_8);
             if (!sender.sendMessage(new TcpEventInfo(INLONG_AGENT_SYSTEM,
-                    INLONG_AGENT_STATUS, AgentUtils.getCurrentTime(), null,
-                    data.getFieldsString().getBytes(StandardCharsets.UTF_8)), 
procResult)) {
+                    INLONG_AGENT_STATUS, dataTime, null, body), procResult)) {
                 LOGGER.error("send status failed: ret = {}", procResult);
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED, 
INLONG_AGENT_SYSTEM, INLONG_AGENT_STATUS,
+                        dataTime, 1, body.length);
+                
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME, 
INLONG_AGENT_SYSTEM,
+                        INLONG_AGENT_STATUS, dataTime, 1, body.length);
+            } else {
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, 
INLONG_AGENT_SYSTEM, INLONG_AGENT_STATUS,
+                        dataTime, 1, body.length);
+                
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME, 
INLONG_AGENT_SYSTEM,
+                        INLONG_AGENT_STATUS, dataTime, 1, body.length);
             }
         } catch (Throwable ex) {
             LOGGER.error("send status throw exception", ex);
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
index b3a47f2c6a..e598fa0092 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.agent.core;
 
 import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
@@ -134,10 +135,20 @@ public class FileStaticManager {
             }
             try {
                 ProcessResult procResult = new ProcessResult();
+                long dataTime = AgentUtils.getCurrentTime();
+                byte[] body = 
data.getFieldsString().getBytes(StandardCharsets.UTF_8);
                 if (!sender.sendMessage(new TcpEventInfo(INLONG_AGENT_SYSTEM,
-                        INLONG_FILE_STATIC, AgentUtils.getCurrentTime(), null,
-                        
data.getFieldsString().getBytes(StandardCharsets.UTF_8)), procResult)) {
+                        INLONG_FILE_STATIC, dataTime, null, body), 
procResult)) {
                     LOGGER.error("send static failed: ret = {}", procResult);
+                    AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED, 
INLONG_AGENT_SYSTEM, INLONG_FILE_STATIC,
+                            dataTime, 1, body.length);
+                    
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME, 
INLONG_AGENT_SYSTEM,
+                            INLONG_FILE_STATIC, dataTime, 1, body.length);
+                } else {
+                    AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, 
INLONG_AGENT_SYSTEM, INLONG_FILE_STATIC,
+                            dataTime, 1, body.length);
+                    
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME, 
INLONG_AGENT_SYSTEM,
+                            INLONG_FILE_STATIC, dataTime, 1, body.length);
                 }
             } catch (Throwable ex) {
                 LOGGER.error("send static throw exception", ex);
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index d1d759cd0a..7693eebed5 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -77,6 +77,9 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
         httpManager = new HttpManager(conf);
         baseManagerUrl = httpManager.getBaseUrl();
         reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl);
+        createMessageSender();
+        AgentStatusManager.init(agentManager);
+        FileStaticManager.init();
     }
 
     public static HeartbeatManager getInstance(AgentManager agentManager) {
@@ -123,6 +126,9 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
                     if (LOGGER.isDebugEnabled()) {
                         LOGGER.debug(" {} report heartbeat to manager", 
heartbeatMsg);
                     }
+                    if (sender == null) {
+                        createMessageSender();
+                    }
                     AgentStatusManager.sendStatusMsg(sender);
                     FileStaticManager.sendStaticMsg(sender);
                 } catch (Throwable e) {
@@ -212,6 +218,7 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
             // start sender object
             ProcessResult procResult = new ProcessResult();
             if (!sender.start(procResult)) {
+                sender.close();
                 throw new ProxySdkException("Sender start failure, " + 
procResult);
             }
         } catch (Throwable ex) {
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index 2da20cde9b..e34e1ae2b3 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -27,6 +27,8 @@ import org.apache.inlong.agent.plugin.file.Task;
 import org.apache.inlong.agent.store.Store;
 import org.apache.inlong.agent.store.TaskStore;
 import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.EventReportUtils;
+import org.apache.inlong.agent.utils.EventReportUtils.EvenCodeEnum;
 import org.apache.inlong.agent.utils.ThreadUtils;
 import org.apache.inlong.common.enums.TaskStateEnum;
 
@@ -144,6 +146,7 @@ public class TaskManager extends AbstractDaemon {
         pendingTasks = new LinkedBlockingQueue<>(taskMaxLimit);
         configQueue = new LinkedBlockingQueue<>(CONFIG_QUEUE_CAPACITY);
         actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
+        EventReportUtils.init();
     }
 
     public static TaskStore getTaskStore() {
@@ -299,6 +302,11 @@ public class TaskManager extends AbstractDaemon {
                         profileFromManager.getTaskId(),
                         profileFromManager.isRetry(), 
profileFromManager.getState());
                 addTask(profileFromManager);
+                EventReportUtils.report(profileFromManager.getInlongGroupId(),
+                        profileFromManager.getInlongStreamId(), 
AgentUtils.getCurrentTime(),
+                        EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, 
EventReportUtils.EVENT_LEVEL_INFO,
+                        EvenCodeEnum.TASK_ADD, profileFromManager.toJsonStr(),
+                        EvenCodeEnum.TASK_ADD.getMessage());
             } else {
                 TaskStateEnum managerState = profileFromManager.getState();
                 TaskStateEnum storeState = taskFromStore.getState();
@@ -331,6 +339,11 @@ public class TaskManager extends AbstractDaemon {
         taskStore.getTasks().forEach((profileFromStore) -> {
             if (!tasksFromManager.containsKey(profileFromStore.getTaskId())) {
                 LOGGER.info("traverseStoreTasksToManager try to delete task 
{}", profileFromStore.getTaskId());
+                EventReportUtils.report(profileFromStore.getInlongGroupId(),
+                        profileFromStore.getInlongStreamId(), 
AgentUtils.getCurrentTime(),
+                        EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, 
EventReportUtils.EVENT_LEVEL_INFO,
+                        EvenCodeEnum.TASK_DELETE, profileFromStore.toJsonStr(),
+                        EvenCodeEnum.TASK_DELETE.getMessage());
                 deleteTask(profileFromStore);
             }
         });
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index dc07605e3e..da2315e480 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -25,6 +25,8 @@ import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.core.AgentManager;
 import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
 import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.EventReportUtils;
+import org.apache.inlong.agent.utils.EventReportUtils.EvenCodeEnum;
 import org.apache.inlong.agent.utils.HttpManager;
 import org.apache.inlong.agent.utils.ThreadUtils;
 import org.apache.inlong.common.enums.PullJobTypeEnum;
@@ -132,10 +134,10 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
         JsonElement element = resultData.get(AGENT_MANAGER_RETURN_PARAM_DATA);
         LOGGER.info("Get static config  end");
         if (element != null) {
-            LOGGER.info("Get static config  not null {}", resultData);
+            LOGGER.info("Get static config not null {}", resultData);
             return GSON.fromJson(element.getAsJsonObject(), TaskResult.class);
         } else {
-            LOGGER.info("Get static config  nothing to do");
+            LOGGER.info("Get static config nothing to do");
             return null;
         }
     }
@@ -192,16 +194,45 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
             while (isRunnable()) {
                 try {
                     TaskResult taskResult = getStaticConfig();
-                    if (taskResult != null && 
taskResult.getCode().equals(AgentResponseCode.SUCCESS)
-                            && 
agentManager.getTaskManager().getTaskResultVersion() < taskResult.getVersion()) 
{
-                        List<TaskProfile> taskProfiles = new ArrayList<>();
-                        taskResult.getDataConfigs().forEach((config) -> {
-                            TaskProfile profile = 
TaskProfile.convertToTaskProfile(config);
-                            taskProfiles.add(profile);
-                        });
-                        
agentManager.getTaskManager().submitTaskProfiles(taskProfiles);
-                        
agentManager.getTaskManager().setTaskResultMd5(taskResult.getMd5());
-                        
agentManager.getTaskManager().setTaskResultVersion(taskResult.getVersion());
+                    if (taskResult != null) {
+                        if 
(taskResult.getCode().equals(AgentResponseCode.SUCCESS)) {
+                            if 
(agentManager.getTaskManager().getTaskResultVersion() < 
taskResult.getVersion()) {
+                                EventReportUtils.report("", "", 
AgentUtils.getCurrentTime(),
+                                        
EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, EventReportUtils.EVENT_LEVEL_INFO,
+                                        EvenCodeEnum.CONFIG_UPDATE_SUC, 
taskResult.toString(),
+                                        
EvenCodeEnum.CONFIG_UPDATE_SUC.getMessage());
+                                List<TaskProfile> taskProfiles = new 
ArrayList<>();
+                                taskResult.getDataConfigs().forEach((config) 
-> {
+                                    TaskProfile profile = 
TaskProfile.convertToTaskProfile(config);
+                                    taskProfiles.add(profile);
+                                });
+                                
agentManager.getTaskManager().submitTaskProfiles(taskProfiles);
+                                
agentManager.getTaskManager().setTaskResultMd5(taskResult.getMd5());
+                                
agentManager.getTaskManager().setTaskResultVersion(taskResult.getVersion());
+                            } else {
+                                EventReportUtils.report("", "", 
AgentUtils.getCurrentTime(),
+                                        
EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, EventReportUtils.EVENT_LEVEL_WARN,
+                                        
EvenCodeEnum.CONFIG_UPDATE_VERSION_NO_CHANGE, taskResult.toString(),
+                                        
EvenCodeEnum.CONFIG_UPDATE_VERSION_NO_CHANGE.getMessage());
+                                LOGGER.warn("%s: %s", 
EvenCodeEnum.CONFIG_UPDATE_VERSION_NO_CHANGE.getMessage(),
+                                        taskResult);
+                            }
+                        } else if 
(taskResult.getCode().equals(AgentResponseCode.NO_UPDATE)) {
+                            EventReportUtils.report("", "", 
AgentUtils.getCurrentTime(),
+                                    EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, 
EventReportUtils.EVENT_LEVEL_INFO,
+                                    EvenCodeEnum.CONFIG_NO_UPDATE, 
taskResult.toString(),
+                                    
EvenCodeEnum.CONFIG_NO_UPDATE.getMessage());
+                        } else {
+                            EventReportUtils.report("", "", 
AgentUtils.getCurrentTime(),
+                                    EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, 
EventReportUtils.EVENT_LEVEL_ERROR,
+                                    EvenCodeEnum.CONFIG_INVALID_RET_CODE, 
taskResult.toString(),
+                                    
EvenCodeEnum.CONFIG_INVALID_RET_CODE.getMessage());
+                        }
+                    } else {
+                        EventReportUtils.report("", "", 
AgentUtils.getCurrentTime(),
+                                EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, 
EventReportUtils.EVENT_LEVEL_ERROR,
+                                EvenCodeEnum.CONFIG_INVALID_RESULT, 
taskResult.toString(),
+                                
EvenCodeEnum.CONFIG_INVALID_RESULT.getMessage());
                     }
                     AgentConfigInfo config = getAgentConfigInfo();
                     if (config != null && 
config.getCode().equals(AgentResponseCode.SUCCESS)) {
@@ -211,6 +242,10 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
                         }
                     }
                 } catch (Throwable ex) {
+                    EventReportUtils.report("", "", 
AgentUtils.getCurrentTime(),
+                            EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, 
EventReportUtils.EVENT_LEVEL_ERROR,
+                            EvenCodeEnum.CONFIG_INVALID_RESULT, 
ex.getMessage(),
+                            EvenCodeEnum.CONFIG_INVALID_RESULT.getMessage());
                     LOGGER.warn("exception caught", ex);
                     ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
ex);
                 } finally {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java
index 7385cd3729..00a69f5a1b 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java
@@ -214,14 +214,23 @@ public class Sender {
         proxyClientConfig.setEnableDataCompress(isCompress);
         SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-" + 
sourcePath,
                 Thread.currentThread().isDaemon());
+        boolean hasError = false;
+        ProcessResult procResult = null;
         for (int i = 0; i < maxSenderPerGroup; i++) {
             InLongTcpMsgSender sender = new 
InLongTcpMsgSender(proxyClientConfig, SHARED_FACTORY);
-            ProcessResult procResult = new ProcessResult();
+            procResult = new ProcessResult();
             if (!sender.start(procResult)) {
-                throw new ProxySdkException("Start sender failure, " + 
procResult);
+                hasError = true;
+                break;
             }
             senders.add(sender);
         }
+        if (hasError) {
+            senders.forEach(sender -> {
+                sender.close();
+            });
+            throw new ProxySdkException("Start sender failure, " + procResult);
+        }
     }
 
     public void sendBatch(SenderMessage message) {
@@ -376,7 +385,7 @@ public class Sender {
                 
AgentStatusManager.sendPackageCount.addAndGet(message.getMsgCnt());
                 
AgentStatusManager.sendDataLen.addAndGet(message.getTotalSize());
             } else {
-                LOGGER.warn("send groupId {}, streamId {}, taskId {}, 
instanceId {}, dataTime {} fail with times {}, "
+                LOGGER.error("send groupId {}, streamId {}, taskId {}, 
instanceId {}, dataTime {} fail with times {}, "
                         + "error {}", groupId, streamId, taskId, instanceId, 
dataTime, retry, result);
                 getMetricItem(groupId, 
streamId).pluginSendFailCount.addAndGet(msgCnt);
                 putInResendQueue(new AgentSenderCallback(message, retry));

Reply via email to