This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ca4bc8dce9 [INLONG-10268][Agent] Get the data version from the 
auditVersion field (#10269)
ca4bc8dce9 is described below

commit ca4bc8dce95903b4a292356ab0c887e28c85ffde
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Mon May 27 10:36:37 2024 +0800

    [INLONG-10268][Agent] Get the data version from the auditVersion field 
(#10269)
    
    * [INLONG-10268][Agent] Get the data version from the auditVersion field
    
    * Update 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
    
    Co-authored-by: Charles Zhang <dockerzh...@apache.org>
    
    ---------
    
    Co-authored-by: Charles Zhang <dockerzh...@apache.org>
---
 .../java/org/apache/inlong/agent/constant/TaskConstants.java     | 1 +
 .../org/apache/inlong/agent/message/file/ProxyMessageCache.java  | 4 +++-
 .../main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java   | 9 ++++++++-
 .../org/apache/inlong/agent/core/instance/InstanceManager.java   | 3 ++-
 .../java/org/apache/inlong/agent/core/task/OffsetManager.java    | 4 +++-
 .../org/apache/inlong/agent/plugin/instance/CommonInstance.java  | 4 +++-
 .../inlong/agent/plugin/sinks/filecollect/SenderManager.java     | 3 ++-
 .../apache/inlong/agent/plugin/sources/file/AbstractSource.java  | 3 ++-
 .../java/org/apache/inlong/agent/plugin/task/AbstractTask.java   | 4 +++-
 9 files changed, 27 insertions(+), 8 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 96669e1235..4cb1c70d12 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -69,6 +69,7 @@ public class TaskConstants extends CommonConstants {
     public static final String FILE_SOURCE_EXTEND_CLASS = 
"task.fileTask.extendedClass";
     public static final String DEFAULT_FILE_SOURCE_EXTEND_CLASS =
             
"org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler";
+    public static final String TASK_AUDIT_VERSION = "task.auditVersion";
 
     // Kafka task
     public static final String TASK_KAFKA_TOPIC = "task.kafkaTask.topic";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
index 9216461579..8ad4d2b555 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
@@ -39,6 +39,7 @@ import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PAC
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
 import static org.apache.inlong.common.msg.AttributeConstants.AUDIT_VERSION;
 
 /**
@@ -61,6 +62,7 @@ public class ProxyMessageCache {
     private long lastPrintTime = 0;
     private long dataTime;
     private boolean isRealTime = false;
+    protected long auditVersion;
     /**
      * extra map used when sending to dataproxy
      */
@@ -78,7 +80,7 @@ public class ProxyMessageCache {
         dataTime = instanceProfile.getSinkDataTime();
         extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
         
extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields()));
-        extraMap.put(AUDIT_VERSION, taskId);
+        extraMap.put(AUDIT_VERSION, instanceProfile.get(TASK_AUDIT_VERSION));
     }
 
     public void generateExtraMap(String dataKey) {
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index d0b378dd47..ed6fef26c3 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -99,6 +99,8 @@ public class TaskProfileDto {
 
     public static final String deafult_time_offset = "0";
 
+    private static final String DEFAULT_AUDIT_VERSION = "0";
+
     private Task task;
     private Proxy proxy;
 
@@ -417,7 +419,11 @@ public class TaskProfileDto {
         task.setPredefinedFields(dataConfig.getPredefinedFields());
         task.setCycleUnit(CycleUnitType.REAL_TIME);
         task.setTimeZone(dataConfig.getTimeZone());
-
+        if (dataConfig.getAuditVersion() == null) {
+            task.setAuditVersion(DEFAULT_AUDIT_VERSION);
+        } else {
+            task.setAuditVersion(dataConfig.getAuditVersion());
+        }
         // set sink type
         if (dataConfig.getDataReportType() == 
NORMAL_SEND_TO_DATAPROXY.ordinal()) {
             task.setSink(DEFAULT_DATA_PROXY_SINK);
@@ -537,6 +543,7 @@ public class TaskProfileDto {
         private Integer state;
         private String cycleUnit;
         private String timeZone;
+        private String auditVersion;
 
         private FileTask fileTask;
         private BinlogTask binlogTask;
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 25ce136ddf..0642e325a2 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -43,6 +43,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
 
 /**
  * handle the instance created by task, including add, delete, update etc.
@@ -117,7 +118,6 @@ public class InstanceManager extends AbstractDaemon {
      */
     public InstanceManager(String taskId, int instanceLimit, Db basicDb, 
TaskProfileDb taskProfileDb) {
         this.taskId = taskId;
-        this.auditVersion = Long.parseLong(taskId);
         instanceDb = new InstanceDb(basicDb);
         this.taskProfileDb = taskProfileDb;
         this.agentConf = AgentConfiguration.getAgentConf();
@@ -298,6 +298,7 @@ public class InstanceManager extends AbstractDaemon {
 
     private void restoreFromDb() {
         taskFromDb = taskProfileDb.getTask(taskId);
+        auditVersion = Long.parseLong(taskFromDb.get(TASK_AUDIT_VERSION));
         List<InstanceProfile> profileList = instanceDb.getInstances(taskId);
         profileList.forEach((profile) -> {
             InstanceStateEnum state = profile.getState();
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
index 41dc95ca95..fc7c41a7e7 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/OffsetManager.java
@@ -40,6 +40,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
+
 /**
  * used to store instance offset to db
  * where key is task id + read file name and value is instance offset
@@ -168,7 +170,7 @@ public class OffsetManager extends AbstractDaemon {
                 instanceDb.deleteInstance(taskId, instanceId);
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_DEL_INSTANCE_DB, 
instanceFromDb.getInlongGroupId(),
                         instanceFromDb.getInlongStreamId(), 
instanceFromDb.getSinkDataTime(), 1, 1,
-                        Long.parseLong(taskId));
+                        Long.parseLong(taskFromDb.get(TASK_AUDIT_VERSION)));
                 iterator.remove();
             }
         }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
index 566fb7be44..7eb77c7237 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
@@ -38,6 +38,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
+
 /**
  * common instance contains source and sink.
  * main job is to read from source and write to sink
@@ -66,7 +68,7 @@ public abstract class CommonInstance extends Instance {
         try {
             instanceManager = (InstanceManager) srcManager;
             profile = srcProfile;
-            auditVersion = Long.parseLong(getTaskId());
+            auditVersion = Long.parseLong(srcProfile.get(TASK_AUDIT_VERSION));
             setInodeInfo(profile);
             LOGGER.info("task id: {} submit new instance {} profile detail 
{}.", profile.getTaskId(),
                     profile.getInstanceId(), profile.toJsonStr());
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 0eba81e4d2..3422f3b05e 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -56,6 +56,7 @@ import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AD
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
 import static 
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_TASK_PROXY_SEND;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_PROXY_SEND;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
@@ -110,7 +111,7 @@ public class SenderManager {
 
     public SenderManager(InstanceProfile profile, String inlongGroupId, String 
sourcePath) {
         this.profile = profile;
-        auditVersion = Long.parseLong(profile.getTaskId());
+        auditVersion = Long.parseLong(profile.get(TASK_AUDIT_VERSION));
         managerAddr = agentConf.get(AGENT_MANAGER_ADDR);
         proxySend = profile.getBoolean(TASK_PROXY_SEND, 
DEFAULT_TASK_PROXY_SEND);
         totalAsyncBufSize = profile
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index bc5225206a..4b36ad30cb 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -64,6 +64,7 @@ import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_REA
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
 import static 
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_FILE_SOURCE_EXTEND_CLASS;
 import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
@@ -119,7 +120,7 @@ public abstract class AbstractSource implements Source {
     public void init(InstanceProfile profile) {
         this.profile = profile;
         taskId = profile.getTaskId();
-        auditVersion = Long.parseLong(taskId);
+        auditVersion = Long.parseLong(profile.get(TASK_AUDIT_VERSION));
         instanceId = profile.getInstanceId();
         inlongGroupId = profile.getInlongGroupId();
         inlongStreamId = profile.getInlongStreamId();
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
index b288ff1946..a6d8d03482 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/AbstractTask.java
@@ -36,6 +36,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.List;
 
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
+
 public abstract class AbstractTask extends Task {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractTask.class);
@@ -55,7 +57,7 @@ public abstract class AbstractTask extends Task {
         taskManager = (TaskManager) srcManager;
         this.taskProfile = taskProfile;
         this.basicDb = basicDb;
-        auditVersion = Long.parseLong(taskProfile.getTaskId());
+        auditVersion = Long.parseLong(taskProfile.get(TASK_AUDIT_VERSION));
         instanceManager = new InstanceManager(taskProfile.getTaskId(), 
taskProfile.getInt(TaskConstants.FILE_MAX_NUM),
                 basicDb, taskManager.getTaskDb());
         try {

Reply via email to