This is an automated email from the ASF dual-hosted git repository. wenweihuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ca4bc8dce9 [INLONG-10268][Agent] Get the data version from the auditVersion field (#10269) ca4bc8dce9 is described below commit ca4bc8dce95903b4a292356ab0c887e28c85ffde Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Mon May 27 10:36:37 2024 +0800 [INLONG-10268][Agent] Get the data version from the auditVersion field (#10269) * [INLONG-10268][Agent] Get the data version from the auditVersion field * Update inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java Co-authored-by: Charles Zhang <dockerzh...@apache.org> --------- Co-authored-by: Charles Zhang <dockerzh...@apache.org> --- .../java/org/apache/inlong/agent/constant/TaskConstants.java | 1 + .../org/apache/inlong/agent/message/file/ProxyMessageCache.java | 4 +++- .../main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java | 9 ++++++++- .../org/apache/inlong/agent/core/instance/InstanceManager.java | 3 ++- .../java/org/apache/inlong/agent/core/task/OffsetManager.java | 4 +++- .../org/apache/inlong/agent/plugin/instance/CommonInstance.java | 4 +++- .../inlong/agent/plugin/sinks/filecollect/SenderManager.java | 3 ++- .../apache/inlong/agent/plugin/sources/file/AbstractSource.java | 3 ++- .../java/org/apache/inlong/agent/plugin/task/AbstractTask.java | 4 +++- 9 files changed, 27 insertions(+), 8 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java index 96669e1235..4cb1c70d12 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java @@ -69,6 +69,7 @@ public class TaskConstants extends CommonConstants { public static final String FILE_SOURCE_EXTEND_CLASS = "task.fileTask.extendedClass"; public static final String DEFAULT_FILE_SOURCE_EXTEND_CLASS = "org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler"; + public static final String TASK_AUDIT_VERSION = "task.auditVersion"; // Kafka task public static final String TASK_KAFKA_TOPIC = "task.kafkaTask.topic"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java index 9216461579..8ad4d2b555 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java @@ -39,6 +39,7 @@ import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PAC import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION; import static org.apache.inlong.common.msg.AttributeConstants.AUDIT_VERSION; /** @@ -61,6 +62,7 @@ public class ProxyMessageCache { private long lastPrintTime = 0; private long dataTime; private boolean isRealTime = false; + protected long auditVersion; /** * extra map used when sending to dataproxy */ @@ -78,7 +80,7 @@ public class ProxyMessageCache { dataTime = instanceProfile.getSinkDataTime(); extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false"); extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields())); - extraMap.put(AUDIT_VERSION, taskId); + extraMap.put(AUDIT_VERSION, instanceProfile.get(TASK_AUDIT_VERSION)); } public void generateExtraMap(String dataKey) { diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java index d0b378dd47..ed6fef26c3 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java @@ -99,6 +99,8 @@ public class TaskProfileDto { public static final String deafult_time_offset = "0"; + private static final String DEFAULT_AUDIT_VERSION = "0"; + private Task task; private Proxy proxy; @@ -417,7 +419,11 @@ public class TaskProfileDto { task.setPredefinedFields(dataConfig.getPredefinedFields()); task.setCycleUnit(CycleUnitType.REAL_TIME); task.setTimeZone(dataConfig.getTimeZone()); - + if (dataConfig.getAuditVersion() == null) { + task.setAuditVersion(DEFAULT_AUDIT_VERSION); + } else { + task.setAuditVersion(dataConfig.getAuditVersion()); + } // set sink type if (dataConfig.getDataReportType() == NORMAL_SEND_TO_DATAPROXY.ordinal()) { task.setSink(DEFAULT_DATA_PROXY_SINK); @@ -537,6 +543,7 @@ public class TaskProfileDto { private Integer state; private String cycleUnit; private String timeZone; + private String auditVersion; private FileTask fileTask; private BinlogTask binlogTask; diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index 25ce136ddf..0642e325a2 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -43,6 +43,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION; /** * handle the instance created by task, including add, delete, update etc. @@ -117,7 +118,6 @@ public class InstanceManager extends AbstractDaemon { */ public InstanceManager(String taskId, int instanceLimit, Db basicDb, TaskProfileDb taskProfileDb) { this.taskId = taskId; - this.auditVersion = Long.parseLong(taskId); instanceDb = new InstanceDb(basicDb); this.taskProfileDb = taskProfileDb; this.agentConf = AgentConfiguration.getAgentConf(); @@ -298,6 +298,7 @@ public class InstanceManager extends AbstractDaemon { private void restoreFromDb() { taskFromDb = taskProfileDb.getTask(taskId); + auditVersion = Long.parseLong(taskFromDb.get(TASK_AUDIT_VERSION)); List<InstanceProfile> profileList = instanceDb.getInstances(taskId); profileList.forEach((profile) -> { InstanceStateEnum state = profile.getState(); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java index 41dc95ca95..fc7c41a7e7 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java @@ -40,6 +40,8 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION; + /** * used to store instance offset to db * where key is task id + read file name and value is instance offset @@ -168,7 +170,7 @@ public class OffsetManager extends AbstractDaemon { instanceDb.deleteInstance(taskId, instanceId); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB, instanceFromDb.getInlongGroupId(), instanceFromDb.getInlongStreamId(), instanceFromDb.getSinkDataTime(), 1, 1, - Long.parseLong(taskId)); + Long.parseLong(taskFromDb.get(TASK_AUDIT_VERSION))); iterator.remove(); } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java index 566fb7be44..7eb77c7237 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java @@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.TimeUnit; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION; + /** * common instance contains source and sink. * main job is to read from source and write to sink @@ -66,7 +68,7 @@ public abstract class CommonInstance extends Instance { try { instanceManager = (InstanceManager) srcManager; profile = srcProfile; - auditVersion = Long.parseLong(getTaskId()); + auditVersion = Long.parseLong(srcProfile.get(TASK_AUDIT_VERSION)); setInodeInfo(profile); LOGGER.info("task id: {} submit new instance {} profile detail {}.", profile.getTaskId(), profile.getInstanceId(), profile.toJsonStr()); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index 0eba81e4d2..3422f3b05e 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -56,6 +56,7 @@ import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AD import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY; import static org.apache.inlong.agent.constant.TaskConstants.DEFAULT_TASK_PROXY_SEND; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION; import static org.apache.inlong.agent.constant.TaskConstants.TASK_PROXY_SEND; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID; @@ -110,7 +111,7 @@ public class SenderManager { public SenderManager(InstanceProfile profile, String inlongGroupId, String sourcePath) { this.profile = profile; - auditVersion = Long.parseLong(profile.getTaskId()); + auditVersion = Long.parseLong(profile.get(TASK_AUDIT_VERSION)); managerAddr = agentConf.get(AGENT_MANAGER_ADDR); proxySend = profile.getBoolean(TASK_PROXY_SEND, DEFAULT_TASK_PROXY_SEND); totalAsyncBufSize = profile diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java index bc5225206a..4b36ad30cb 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java @@ -64,6 +64,7 @@ import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_REA import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT; import static org.apache.inlong.agent.constant.TaskConstants.DEFAULT_FILE_SOURCE_EXTEND_CLASS; import static org.apache.inlong.agent.constant.TaskConstants.OFFSET; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION; import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID; @@ -119,7 +120,7 @@ public abstract class AbstractSource implements Source { public void init(InstanceProfile profile) { this.profile = profile; taskId = profile.getTaskId(); - auditVersion = Long.parseLong(taskId); + auditVersion = Long.parseLong(profile.get(TASK_AUDIT_VERSION)); instanceId = profile.getInstanceId(); inlongGroupId = profile.getInlongGroupId(); inlongStreamId = profile.getInlongStreamId(); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java index b288ff1946..a6d8d03482 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java @@ -36,6 +36,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION; + public abstract class AbstractTask extends Task { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTask.class); @@ -55,7 +57,7 @@ public abstract class AbstractTask extends Task { taskManager = (TaskManager) srcManager; this.taskProfile = taskProfile; this.basicDb = basicDb; - auditVersion = Long.parseLong(taskProfile.getTaskId()); + auditVersion = Long.parseLong(taskProfile.get(TASK_AUDIT_VERSION)); instanceManager = new InstanceManager(taskProfile.getTaskId(), taskProfile.getInt(TaskConstants.FILE_MAX_NUM), basicDb, taskManager.getTaskDb()); try {