This is an automated email from the ASF dual-hosted git repository. wenweihuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new bd9ce03a34 [INLONG-11815][Agent] Add a unified reporting point for events (#11816) bd9ce03a34 is described below commit bd9ce03a34b60e4cf10b8698d6bfe770051cf0b1 Author: justinwwhuang <wenweihu...@apache.org> AuthorDate: Wed Apr 2 14:09:50 2025 +0800 [INLONG-11815][Agent] Add a unified reporting point for events (#11816) * [INLONG-11815][Agent] Add a unified reporting point for events * [INLONG-11815][Agent] Close all senders when creating sender error --- .../inlong/agent/constant/CommonConstants.java | 2 +- .../inlong/agent/metrics/AgentEventMetricItem.java | 72 ++++++++++++++ .../agent/metrics/AgentEventMetricItemSet.java | 39 ++++++++ .../inlong/agent/utils/EventReportUtils.java | 108 +++++++++++++++++++++ .../inlong/agent/core/AgentStatusManager.java | 15 ++- .../inlong/agent/core/FileStaticManager.java | 15 ++- .../apache/inlong/agent/core/HeartbeatManager.java | 7 ++ .../apache/inlong/agent/core/task/TaskManager.java | 13 +++ .../agent/plugin/fetcher/ManagerFetcher.java | 59 ++++++++--- .../agent/plugin/sinks/dataproxy/Sender.java | 15 ++- 10 files changed, 325 insertions(+), 20 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java index db0e509718..a20f281c63 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java @@ -41,7 +41,7 @@ public class CommonConstants { public static final boolean DEFAULT_PROXY_IS_COMPRESS = true; public static final String PROXY_MAX_SENDER_PER_GROUP = "proxy.max.sender.per.group"; - public static final int DEFAULT_PROXY_MAX_SENDER_PER_GROUP = 10; + public static final int DEFAULT_PROXY_MAX_SENDER_PER_GROUP = 3; // max size of message list public static final String PROXY_PACKAGE_MAX_SIZE = "proxy.package.maxSize"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItem.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItem.java new file mode 100644 index 0000000000..275326b611 --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItem.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.metrics; + +import org.apache.inlong.common.metric.CountMetric; +import org.apache.inlong.common.metric.Dimension; +import org.apache.inlong.common.metric.MetricDomain; +import org.apache.inlong.common.metric.MetricItem; + +import java.text.SimpleDateFormat; +import java.util.concurrent.atomic.AtomicLong; + +@MetricDomain(name = "AgentEvent") +public class AgentEventMetricItem extends MetricItem { + + public final static SimpleDateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + public static final String KEY_INLONG_EVENT_TIME = "eventTime"; + public static final String KEY_INLONG_GROUP_ID = "groupId"; + public static final String KEY_INLONG_STREAM_ID = "streamId"; + public static final String KEY_INLONG_COMPONENT_TYPE = "componentType"; + public static final String KEY_INLONG_COMPONENT_NAME = "componentName"; + public static final String KEY_INLONG_AGENT_IP = "agentIp"; + public static final String KEY_INLONG_COMPONENT_VERSION = "componentVersion"; + public static final String KEY_INLONG_EVENT_TYPE = "eventType"; + public static final String KEY_INLONG_EVENT_LEVEL = "eventLevel"; + public static final String KEY_INLONG_EVENT_CODE = "eventCode"; + public static final String KEY_INLONG_EXT = "ext"; + public static final String KEY_INLONG_EVENT_DESC = "eventDesc"; + + @Dimension + public String eventTime; + @Dimension + public String groupId; + @Dimension + public String streamId; + @Dimension + public String componentType; + @Dimension + public String componentName; + @Dimension + public String agentIp; + @Dimension + public String componentVersion; + @Dimension + public String eventType; + @Dimension + public String eventLevel; + @Dimension + public String eventCode; + @Dimension + public String ext; + @Dimension + public String eventDesc; + + @CountMetric + public AtomicLong count = new AtomicLong(0); +} diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItemSet.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItemSet.java new file mode 100644 index 0000000000..48f4f25e42 --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentEventMetricItemSet.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.metrics; + +import org.apache.inlong.common.metric.MetricDomain; +import org.apache.inlong.common.metric.MetricItemSet; + +@MetricDomain(name = "AgentEvent") +public class AgentEventMetricItemSet extends MetricItemSet<AgentEventMetricItem> { + + /** + * Constructor + * + * @param name + */ + public AgentEventMetricItemSet(String name) { + super(name); + } + + @Override + protected AgentEventMetricItem createItem() { + return new AgentEventMetricItem(); + } +} diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/EventReportUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/EventReportUtils.java new file mode 100644 index 0000000000..3c3863fc1f --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/EventReportUtils.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.utils; + +import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.metrics.AgentEventMetricItem; +import org.apache.inlong.agent.metrics.AgentEventMetricItemSet; +import org.apache.inlong.common.metric.MetricRegister; + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_IP; +import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_AGENT_IP; +import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_COMPONENT_NAME; +import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_COMPONENT_TYPE; +import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_COMPONENT_VERSION; +import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_CODE; +import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_DESC; +import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_LEVEL; +import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_TIME; +import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_TYPE; +import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EXT; +import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_STREAM_ID; + +/** + * DiagUtils + */ +public class EventReportUtils { + + public enum EvenCodeEnum { + + CONFIG_UPDATE_SUC(0, "config update suc"), + CONFIG_NO_UPDATE(1, "config no update"), + CONFIG_UPDATE_VERSION_NO_CHANGE(2, "config update version no change"), + CONFIG_INVALID_RET_CODE(3, "config invalid ret code"), + CONFIG_INVALID_RESULT(4, "config invalid result maybe visit manager failed"), + TASK_ADD(5, "task add"), + TASK_DELETE(6, "task delete"); + + private final int code; + private final String message; + + EvenCodeEnum(int code, String message) { + this.code = code; + this.message = message; + } + + public int getCode() { + return code; + } + + public String getMessage() { + return message; + } + } + + private final static String COMPONENT_TYPE_AGENT = "AGENT"; + private final static String COMPONENT_NAME_AGENT = "AGENT"; + public static final String EVENT_TYPE_CONFIG_UPDATE = "CONFIG_UPDATE"; + public static final String EVENT_LEVEL_INFO = "INFO"; + public static final String EVENT_LEVEL_WARN = "WARN"; + public static final String EVENT_LEVEL_ERROR = "ERROR"; + private static AgentEventMetricItemSet metricItemSet; + + private EventReportUtils() { + } + + public static void init() { + metricItemSet = new AgentEventMetricItemSet(COMPONENT_NAME_AGENT); + MetricRegister.register(metricItemSet); + } + + public static void report(String groupId, String streamId, long eventTime, String eventType, + String eventLevel, EvenCodeEnum evenCode, String ext, String desc) { + Map<String, String> dims = new HashMap<>(); + dims.put(KEY_INLONG_GROUP_ID, groupId); + dims.put(KEY_INLONG_STREAM_ID, streamId); + dims.put(KEY_INLONG_COMPONENT_TYPE, COMPONENT_TYPE_AGENT); + dims.put(KEY_INLONG_COMPONENT_NAME, COMPONENT_NAME_AGENT); + dims.put(KEY_INLONG_AGENT_IP, AgentConfiguration.getAgentConf().get(AGENT_LOCAL_IP)); + dims.put(KEY_INLONG_COMPONENT_VERSION, EventReportUtils.class.getPackage().getImplementationVersion()); + dims.put(KEY_INLONG_EVENT_TIME, AgentEventMetricItem.FORMAT.format(new Date(eventTime))); + dims.put(KEY_INLONG_EVENT_TYPE, eventType); + dims.put(KEY_INLONG_EVENT_LEVEL, eventLevel); + dims.put(KEY_INLONG_EVENT_CODE, String.valueOf(evenCode.getCode())); + dims.put(KEY_INLONG_EXT, ext.replaceAll("\\|", "-")); + dims.put(KEY_INLONG_EVENT_DESC, desc); + metricItemSet.findMetricItem(dims).count.addAndGet(1); + } +} diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java index 50085fa1bd..314fe5e669 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java @@ -21,6 +21,7 @@ import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.core.task.MemoryManager; import org.apache.inlong.agent.core.task.OffsetManager; import org.apache.inlong.agent.core.task.TaskManager; +import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.agent.utils.ExcuteLinux; import org.apache.inlong.sdk.dataproxy.common.ProcessResult; @@ -167,10 +168,20 @@ public class AgentStatusManager { } try { ProcessResult procResult = new ProcessResult(); + long dataTime = AgentUtils.getCurrentTime(); + byte[] body = data.getFieldsString().getBytes(StandardCharsets.UTF_8); if (!sender.sendMessage(new TcpEventInfo(INLONG_AGENT_SYSTEM, - INLONG_AGENT_STATUS, AgentUtils.getCurrentTime(), null, - data.getFieldsString().getBytes(StandardCharsets.UTF_8)), procResult)) { + INLONG_AGENT_STATUS, dataTime, null, body), procResult)) { LOGGER.error("send status failed: ret = {}", procResult); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED, INLONG_AGENT_SYSTEM, INLONG_AGENT_STATUS, + dataTime, 1, body.length); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME, INLONG_AGENT_SYSTEM, + INLONG_AGENT_STATUS, dataTime, 1, body.length); + } else { + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, INLONG_AGENT_SYSTEM, INLONG_AGENT_STATUS, + dataTime, 1, body.length); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME, INLONG_AGENT_SYSTEM, + INLONG_AGENT_STATUS, dataTime, 1, body.length); } } catch (Throwable ex) { LOGGER.error("send status throw exception", ex); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java index b3a47f2c6a..e598fa0092 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java @@ -18,6 +18,7 @@ package org.apache.inlong.agent.core; import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.sdk.dataproxy.common.ProcessResult; import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo; @@ -134,10 +135,20 @@ public class FileStaticManager { } try { ProcessResult procResult = new ProcessResult(); + long dataTime = AgentUtils.getCurrentTime(); + byte[] body = data.getFieldsString().getBytes(StandardCharsets.UTF_8); if (!sender.sendMessage(new TcpEventInfo(INLONG_AGENT_SYSTEM, - INLONG_FILE_STATIC, AgentUtils.getCurrentTime(), null, - data.getFieldsString().getBytes(StandardCharsets.UTF_8)), procResult)) { + INLONG_FILE_STATIC, dataTime, null, body), procResult)) { LOGGER.error("send static failed: ret = {}", procResult); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED, INLONG_AGENT_SYSTEM, INLONG_FILE_STATIC, + dataTime, 1, body.length); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME, INLONG_AGENT_SYSTEM, + INLONG_FILE_STATIC, dataTime, 1, body.length); + } else { + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, INLONG_AGENT_SYSTEM, INLONG_FILE_STATIC, + dataTime, 1, body.length); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME, INLONG_AGENT_SYSTEM, + INLONG_FILE_STATIC, dataTime, 1, body.length); } } catch (Throwable ex) { LOGGER.error("send static throw exception", ex); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java index d1d759cd0a..7693eebed5 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java @@ -77,6 +77,9 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea httpManager = new HttpManager(conf); baseManagerUrl = httpManager.getBaseUrl(); reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl); + createMessageSender(); + AgentStatusManager.init(agentManager); + FileStaticManager.init(); } public static HeartbeatManager getInstance(AgentManager agentManager) { @@ -123,6 +126,9 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea if (LOGGER.isDebugEnabled()) { LOGGER.debug(" {} report heartbeat to manager", heartbeatMsg); } + if (sender == null) { + createMessageSender(); + } AgentStatusManager.sendStatusMsg(sender); FileStaticManager.sendStaticMsg(sender); } catch (Throwable e) { @@ -212,6 +218,7 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea // start sender object ProcessResult procResult = new ProcessResult(); if (!sender.start(procResult)) { + sender.close(); throw new ProxySdkException("Sender start failure, " + procResult); } } catch (Throwable ex) { diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java index 2da20cde9b..e34e1ae2b3 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java @@ -27,6 +27,8 @@ import org.apache.inlong.agent.plugin.file.Task; import org.apache.inlong.agent.store.Store; import org.apache.inlong.agent.store.TaskStore; import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.EventReportUtils; +import org.apache.inlong.agent.utils.EventReportUtils.EvenCodeEnum; import org.apache.inlong.agent.utils.ThreadUtils; import org.apache.inlong.common.enums.TaskStateEnum; @@ -144,6 +146,7 @@ public class TaskManager extends AbstractDaemon { pendingTasks = new LinkedBlockingQueue<>(taskMaxLimit); configQueue = new LinkedBlockingQueue<>(CONFIG_QUEUE_CAPACITY); actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY); + EventReportUtils.init(); } public static TaskStore getTaskStore() { @@ -299,6 +302,11 @@ public class TaskManager extends AbstractDaemon { profileFromManager.getTaskId(), profileFromManager.isRetry(), profileFromManager.getState()); addTask(profileFromManager); + EventReportUtils.report(profileFromManager.getInlongGroupId(), + profileFromManager.getInlongStreamId(), AgentUtils.getCurrentTime(), + EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, EventReportUtils.EVENT_LEVEL_INFO, + EvenCodeEnum.TASK_ADD, profileFromManager.toJsonStr(), + EvenCodeEnum.TASK_ADD.getMessage()); } else { TaskStateEnum managerState = profileFromManager.getState(); TaskStateEnum storeState = taskFromStore.getState(); @@ -331,6 +339,11 @@ public class TaskManager extends AbstractDaemon { taskStore.getTasks().forEach((profileFromStore) -> { if (!tasksFromManager.containsKey(profileFromStore.getTaskId())) { LOGGER.info("traverseStoreTasksToManager try to delete task {}", profileFromStore.getTaskId()); + EventReportUtils.report(profileFromStore.getInlongGroupId(), + profileFromStore.getInlongStreamId(), AgentUtils.getCurrentTime(), + EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, EventReportUtils.EVENT_LEVEL_INFO, + EvenCodeEnum.TASK_DELETE, profileFromStore.toJsonStr(), + EvenCodeEnum.TASK_DELETE.getMessage()); deleteTask(profileFromStore); } }); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java index dc07605e3e..da2315e480 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java @@ -25,6 +25,8 @@ import org.apache.inlong.agent.constant.CycleUnitType; import org.apache.inlong.agent.core.AgentManager; import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig; import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.EventReportUtils; +import org.apache.inlong.agent.utils.EventReportUtils.EvenCodeEnum; import org.apache.inlong.agent.utils.HttpManager; import org.apache.inlong.agent.utils.ThreadUtils; import org.apache.inlong.common.enums.PullJobTypeEnum; @@ -132,10 +134,10 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { JsonElement element = resultData.get(AGENT_MANAGER_RETURN_PARAM_DATA); LOGGER.info("Get static config end"); if (element != null) { - LOGGER.info("Get static config not null {}", resultData); + LOGGER.info("Get static config not null {}", resultData); return GSON.fromJson(element.getAsJsonObject(), TaskResult.class); } else { - LOGGER.info("Get static config nothing to do"); + LOGGER.info("Get static config nothing to do"); return null; } } @@ -192,16 +194,45 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { while (isRunnable()) { try { TaskResult taskResult = getStaticConfig(); - if (taskResult != null && taskResult.getCode().equals(AgentResponseCode.SUCCESS) - && agentManager.getTaskManager().getTaskResultVersion() < taskResult.getVersion()) { - List<TaskProfile> taskProfiles = new ArrayList<>(); - taskResult.getDataConfigs().forEach((config) -> { - TaskProfile profile = TaskProfile.convertToTaskProfile(config); - taskProfiles.add(profile); - }); - agentManager.getTaskManager().submitTaskProfiles(taskProfiles); - agentManager.getTaskManager().setTaskResultMd5(taskResult.getMd5()); - agentManager.getTaskManager().setTaskResultVersion(taskResult.getVersion()); + if (taskResult != null) { + if (taskResult.getCode().equals(AgentResponseCode.SUCCESS)) { + if (agentManager.getTaskManager().getTaskResultVersion() < taskResult.getVersion()) { + EventReportUtils.report("", "", AgentUtils.getCurrentTime(), + EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, EventReportUtils.EVENT_LEVEL_INFO, + EvenCodeEnum.CONFIG_UPDATE_SUC, taskResult.toString(), + EvenCodeEnum.CONFIG_UPDATE_SUC.getMessage()); + List<TaskProfile> taskProfiles = new ArrayList<>(); + taskResult.getDataConfigs().forEach((config) -> { + TaskProfile profile = TaskProfile.convertToTaskProfile(config); + taskProfiles.add(profile); + }); + agentManager.getTaskManager().submitTaskProfiles(taskProfiles); + agentManager.getTaskManager().setTaskResultMd5(taskResult.getMd5()); + agentManager.getTaskManager().setTaskResultVersion(taskResult.getVersion()); + } else { + EventReportUtils.report("", "", AgentUtils.getCurrentTime(), + EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, EventReportUtils.EVENT_LEVEL_WARN, + EvenCodeEnum.CONFIG_UPDATE_VERSION_NO_CHANGE, taskResult.toString(), + EvenCodeEnum.CONFIG_UPDATE_VERSION_NO_CHANGE.getMessage()); + LOGGER.warn("%s: %s", EvenCodeEnum.CONFIG_UPDATE_VERSION_NO_CHANGE.getMessage(), + taskResult); + } + } else if (taskResult.getCode().equals(AgentResponseCode.NO_UPDATE)) { + EventReportUtils.report("", "", AgentUtils.getCurrentTime(), + EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, EventReportUtils.EVENT_LEVEL_INFO, + EvenCodeEnum.CONFIG_NO_UPDATE, taskResult.toString(), + EvenCodeEnum.CONFIG_NO_UPDATE.getMessage()); + } else { + EventReportUtils.report("", "", AgentUtils.getCurrentTime(), + EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, EventReportUtils.EVENT_LEVEL_ERROR, + EvenCodeEnum.CONFIG_INVALID_RET_CODE, taskResult.toString(), + EvenCodeEnum.CONFIG_INVALID_RET_CODE.getMessage()); + } + } else { + EventReportUtils.report("", "", AgentUtils.getCurrentTime(), + EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, EventReportUtils.EVENT_LEVEL_ERROR, + EvenCodeEnum.CONFIG_INVALID_RESULT, taskResult.toString(), + EvenCodeEnum.CONFIG_INVALID_RESULT.getMessage()); } AgentConfigInfo config = getAgentConfigInfo(); if (config != null && config.getCode().equals(AgentResponseCode.SUCCESS)) { @@ -211,6 +242,10 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { } } } catch (Throwable ex) { + EventReportUtils.report("", "", AgentUtils.getCurrentTime(), + EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, EventReportUtils.EVENT_LEVEL_ERROR, + EvenCodeEnum.CONFIG_INVALID_RESULT, ex.getMessage(), + EvenCodeEnum.CONFIG_INVALID_RESULT.getMessage()); LOGGER.warn("exception caught", ex); ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); } finally { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java index 7385cd3729..00a69f5a1b 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/dataproxy/Sender.java @@ -214,14 +214,23 @@ public class Sender { proxyClientConfig.setEnableDataCompress(isCompress); SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-" + sourcePath, Thread.currentThread().isDaemon()); + boolean hasError = false; + ProcessResult procResult = null; for (int i = 0; i < maxSenderPerGroup; i++) { InLongTcpMsgSender sender = new InLongTcpMsgSender(proxyClientConfig, SHARED_FACTORY); - ProcessResult procResult = new ProcessResult(); + procResult = new ProcessResult(); if (!sender.start(procResult)) { - throw new ProxySdkException("Start sender failure, " + procResult); + hasError = true; + break; } senders.add(sender); } + if (hasError) { + senders.forEach(sender -> { + sender.close(); + }); + throw new ProxySdkException("Start sender failure, " + procResult); + } } public void sendBatch(SenderMessage message) { @@ -376,7 +385,7 @@ public class Sender { AgentStatusManager.sendPackageCount.addAndGet(message.getMsgCnt()); AgentStatusManager.sendDataLen.addAndGet(message.getTotalSize()); } else { - LOGGER.warn("send groupId {}, streamId {}, taskId {}, instanceId {}, dataTime {} fail with times {}, " + LOGGER.error("send groupId {}, streamId {}, taskId {}, instanceId {}, dataTime {} fail with times {}, " + "error {}", groupId, streamId, taskId, instanceId, dataTime, retry, result); getMetricItem(groupId, streamId).pluginSendFailCount.addAndGet(msgCnt); putInResendQueue(new AgentSenderCallback(message, retry));