This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f62b2e87a1 [INLONG-9364][Agent] Remove expired instance from db (#9365)
f62b2e87a1 is described below

commit f62b2e87a12d5c7c2d75417715d3edb422230dab
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Thu Nov 30 11:16:23 2023 +0800

    [INLONG-9364][Agent] Remove expired instance from db (#9365)
---
 .../apache/inlong/agent/utils/DateTransUtils.java  | 40 ++++++++++++
 .../agent/core/instance/InstanceManager.java       | 74 ++++++++++++++++++----
 .../inlong/agent/core/task/file/TaskManager.java   | 22 +++++--
 .../agent/core/instance/TestInstanceManager.java   |  6 +-
 .../agent/plugin/task/filecollect/FileScanner.java |  4 +-
 .../task/filecollect/LogFileCollectTask.java       |  6 +-
 .../agent/plugin/utils/file/NewDateUtils.java      | 44 +------------
 .../inlong/agent/plugin/utils/TestUtils.java       | 14 ++--
 .../apache/inlong/common/enums/TaskStateEnum.java  |  4 +-
 9 files changed, 139 insertions(+), 75 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
index 55182c7dd8..fe6257d64e 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
@@ -111,4 +111,44 @@ public class DateTransUtils {
         return retTime;
     }
 
+    /**
+     * Calculate offset time based on offset
+     * The current offset will only be offset forward, or it can be offset 
backward to be compatible with the previous
+     * calculation method (subtraction).
+     * When it is offset backward, it returns negative;
+     * When offset forward, return positive
+     *
+     * @param timeOffset offset,such as -1d,-4h,-10m;
+     * @return
+     */
+    public static long calcOffset(String timeOffset) {
+        if (timeOffset.length() == 0) {
+            return 0;
+        }
+        String offsetUnit = timeOffset.substring(timeOffset.length() - 1);
+        int startIndex;
+        int symbol;
+        if (timeOffset.charAt(0) == '-') {
+            symbol = -1;
+            startIndex = 1;
+        } else {
+            symbol = 1;
+            startIndex = 0;
+        }
+
+        String strOffset = timeOffset.substring(startIndex, 
timeOffset.length() - 1);
+        if (strOffset.length() == 0) {
+            return 0;
+        }
+        int offsetTime = Integer.parseInt(strOffset);
+        if ("d".equalsIgnoreCase(offsetUnit)) {
+            return offsetTime * 24 * 3600 * 1000 * symbol;
+        } else if ("h".equalsIgnoreCase(offsetUnit)) {
+            return offsetTime * 3600 * 1000 * symbol;
+        } else if ("m".equalsIgnoreCase(offsetUnit)) {
+            return offsetTime * 60 * 1000 * symbol;
+        }
+        return 0;
+    }
+
 }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index a80ce8b53b..260e5a477f 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -21,16 +21,22 @@ import org.apache.inlong.agent.common.AbstractDaemon;
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
 import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.db.InstanceDb;
+import org.apache.inlong.agent.db.TaskProfileDb;
 import org.apache.inlong.agent.plugin.Instance;
 import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.agent.utils.ThreadUtils;
 import org.apache.inlong.common.enums.InstanceStateEnum;
+import org.apache.inlong.common.enums.TaskStateEnum;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -38,6 +44,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * handle the instance created by task, including add, delete, update etc.
@@ -47,11 +54,14 @@ public class InstanceManager extends AbstractDaemon {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(InstanceManager.class);
     private static final int ACTION_QUEUE_CAPACITY = 100;
+    public static final int CLEAN_INSTANCE_ONCE_LIMIT = 10;
     public volatile int CORE_THREAD_SLEEP_TIME_MS = 1000;
-    public static final int CORE_THREAD_PRINT_TIME = 10000;
-    private long lastPrintTime = 0;
-    // task in db
+    public static final int INSTANCE_DB_CLEAN_INTERVAL_MS = 10000;
+    private long lastCleanTime = 0;
+    public static final String DB_INSTANCE_EXPIRE_CYCLE_COUNT = "-3";
+    // instance in db
     private final InstanceDb instanceDb;
+    TaskProfileDb taskProfileDb;
     // task in memory
     private final ConcurrentHashMap<String, Instance> instanceMap;
     // instance profile queue.
@@ -105,9 +115,10 @@ public class InstanceManager extends AbstractDaemon {
     /**
      * Init task manager.
      */
-    public InstanceManager(String taskId, int instanceLimit, Db basicDb) {
+    public InstanceManager(String taskId, int instanceLimit, Db basicDb, 
TaskProfileDb taskProfileDb) {
         this.taskId = taskId;
         instanceDb = new InstanceDb(basicDb);
+        this.taskProfileDb = taskProfileDb;
         this.agentConf = AgentConfiguration.getAgentConf();
         instanceMap = new ConcurrentHashMap<>();
         this.instanceLimit = instanceLimit;
@@ -145,11 +156,11 @@ public class InstanceManager extends AbstractDaemon {
             while (isRunnable()) {
                 try {
                     AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
-                    printInstanceDetail();
+                    cleanDbInstance();
                     dealWithActionQueue(actionQueue);
                     keepPaceWithDb();
                 } catch (Throwable ex) {
-                    LOGGER.error("coreThread {}", ex.getMessage());
+                    LOGGER.error("coreThread {}", ex);
                     ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
ex);
                 }
                 runAtLeastOneTime = true;
@@ -158,9 +169,10 @@ public class InstanceManager extends AbstractDaemon {
         };
     }
 
-    private void printInstanceDetail() {
-        if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_TIME) {
+    private void cleanDbInstance() {
+        if (AgentUtils.getCurrentTime() - lastCleanTime > 
INSTANCE_DB_CLEAN_INTERVAL_MS) {
             List<InstanceProfile> instances = instanceDb.getInstances(taskId);
+            doCleanDbInstance(instances);
             InstancePrintStat stat = new InstancePrintStat();
             for (int i = 0; i < instances.size(); i++) {
                 InstanceProfile instance = instances.get(i);
@@ -169,7 +181,45 @@ public class InstanceManager extends AbstractDaemon {
             LOGGER.info(
                     "instanceManager running! taskId {} mem {} db total {} {} 
action count {}",
                     taskId, instanceMap.size(), instances.size(), stat, 
actionQueue.size());
-            lastPrintTime = AgentUtils.getCurrentTime();
+            lastCleanTime = AgentUtils.getCurrentTime();
+        }
+    }
+
+    private void doCleanDbInstance(List<InstanceProfile> instances) {
+        AtomicInteger cleanCount = new AtomicInteger();
+        Iterator<InstanceProfile> iterator = instances.iterator();
+        while (iterator.hasNext()) {
+            if (cleanCount.get() > CLEAN_INSTANCE_ONCE_LIMIT) {
+                return;
+            }
+            InstanceProfile instanceFromDb = iterator.next();
+            if (instanceFromDb.getState() != InstanceStateEnum.FINISHED) {
+                return;
+            }
+            TaskProfile taskFromDb = taskProfileDb.getTask(taskId);
+            if (taskFromDb != null) {
+                if 
(taskFromDb.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) {
+                    return;
+                }
+                if (taskFromDb.isRetry()) {
+                    if (taskFromDb.getState() != TaskStateEnum.RETRY_FINISH) {
+                        return;
+                    }
+                } else {
+                    if (instanceFromDb.getState() != 
InstanceStateEnum.FINISHED) {
+                        return;
+                    }
+                }
+            }
+            long expireTime = 
DateTransUtils.calcOffset(DB_INSTANCE_EXPIRE_CYCLE_COUNT + 
taskFromDb.getCycleUnit());
+            if (AgentUtils.getCurrentTime() - instanceFromDb.getModifyTime() > 
expireTime) {
+                cleanCount.getAndIncrement();
+                LOGGER.info("instance has expired, delete from db dataTime {} 
taskId {} instanceId {}",
+                        instanceFromDb.getSourceDataTime(), 
instanceFromDb.getTaskId(),
+                        instanceFromDb.getInstanceId());
+                instanceDb.deleteInstance(instanceFromDb.getTaskId(), 
instanceFromDb.getInstanceId());
+                iterator.remove();
+            }
         }
     }
 
@@ -181,10 +231,10 @@ public class InstanceManager extends AbstractDaemon {
     private void traverseDbTasksToMemory() {
         instanceDb.getInstances(taskId).forEach((profileFromDb) -> {
             InstanceStateEnum dbState = profileFromDb.getState();
-            Instance task = instanceMap.get(profileFromDb.getInstanceId());
+            Instance instance = instanceMap.get(profileFromDb.getInstanceId());
             switch (dbState) {
                 case DEFAULT: {
-                    if (task == null) {
+                    if (instance == null) {
                         LOGGER.info("traverseDbTasksToMemory add instance to 
mem taskId {} instanceId {}",
                                 profileFromDb.getTaskId(), 
profileFromDb.getInstanceId());
                         addToMemory(profileFromDb);
@@ -193,7 +243,7 @@ public class InstanceManager extends AbstractDaemon {
                 }
                 case FINISHED:
                     DELETE: {
-                        if (task != null) {
+                        if (instance != null) {
                             LOGGER.info("traverseDbTasksToMemory delete 
instance from mem taskId {} instanceId {}",
                                     profileFromDb.getTaskId(), 
profileFromDb.getInstanceId());
                             deleteFromMemory(profileFromDb.getInstanceId());
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
index 4d8bc9fea6..7027b798f8 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
@@ -63,6 +63,8 @@ public class TaskManager extends AbstractDaemon {
     private final Db taskBasicDb;
     // instance basic db
     private final Db instanceBasicDb;
+    // offset basic db
+    private final Db offsetBasicDb;
     // task in db
     private final TaskProfileDb taskDb;
     // task in memory
@@ -100,7 +102,7 @@ public class TaskManager extends AbstractDaemon {
                     frozenCount++;
                     break;
                 }
-                case FINISH: {
+                case RETRY_FINISH: {
                     finishedCount++;
                     break;
                 }
@@ -122,11 +124,13 @@ public class TaskManager extends AbstractDaemon {
      */
     public TaskManager() {
         this.agentConf = AgentConfiguration.getAgentConf();
-        this.taskBasicDb = initDb(
+        taskBasicDb = initDb(
                 agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_TASK));
-        this.instanceBasicDb = initDb(
-                agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE));
         taskDb = new TaskProfileDb(taskBasicDb);
+        instanceBasicDb = initDb(
+                agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE));
+        offsetBasicDb =
+                initDb(agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_OFFSET));
         this.runningPool = new ThreadPoolExecutor(
                 0, Integer.MAX_VALUE,
                 60L, TimeUnit.SECONDS,
@@ -139,6 +143,10 @@ public class TaskManager extends AbstractDaemon {
         actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
     }
 
+    public TaskProfileDb getTaskDb() {
+        return taskDb;
+    }
+
     /**
      * init db by class name
      *
@@ -284,7 +292,7 @@ public class TaskManager extends AbstractDaemon {
                 if (managerState == dbState) {
                     return;
                 }
-                if (dbState == TaskStateEnum.FINISH) {
+                if (dbState == TaskStateEnum.RETRY_FINISH) {
                     LOGGER.info("traverseManagerTasksToDb task {} dbState {} 
retry {}, do nothing",
                             taskFromDb.getTaskId(), dbState,
                             taskFromDb.isRetry());
@@ -335,7 +343,7 @@ public class TaskManager extends AbstractDaemon {
                     deleteFromMemory(profileFromDb.getTaskId());
                 }
             } else {
-                if (dbState != TaskStateEnum.FINISH) {
+                if (dbState != TaskStateEnum.RETRY_FINISH) {
                     LOGGER.error("task {} invalid state {}", 
profileFromDb.getTaskId(), dbState);
                 }
             }
@@ -394,7 +402,7 @@ public class TaskManager extends AbstractDaemon {
     }
 
     private void finishTask(TaskProfile taskProfile) {
-        taskProfile.setState(TaskStateEnum.FINISH);
+        taskProfile.setState(TaskStateEnum.RETRY_FINISH);
         updateToDb(taskProfile);
         deleteFromMemory(taskProfile.getTaskId());
     }
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index 558bed0204..262565022e 100755
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -19,9 +19,11 @@ package org.apache.inlong.agent.core.instance;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.core.AgentBaseTestsHelper;
 import org.apache.inlong.agent.core.task.file.TaskManager;
 import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.db.TaskProfileDb;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.common.enums.InstanceStateEnum;
@@ -53,7 +55,9 @@ public class TestInstanceManager {
         String pattern = helper.getTestRootDir() + "/YYYYMMDD_[0-9]+.txt";
         Db basicDb = TaskManager.initDb("/localdb");
         taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, 
TaskStateEnum.RUNNING, "GMT+6:00");
-        manager = new InstanceManager("1", 2, basicDb);
+        Db taskBasicDb = 
TaskManager.initDb(AgentConstants.AGENT_LOCAL_DB_PATH_TASK);
+        TaskProfileDb taskDb = new TaskProfileDb(taskBasicDb);
+        manager = new InstanceManager("1", 2, basicDb, taskDb);
         manager.CORE_THREAD_SLEEP_TIME_MS = 100;
         manager.start();
     }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
index fc989b3bcf..9985214873 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
@@ -61,8 +61,8 @@ public class FileScanner {
             long startTime,
             long endTime, boolean isRetry) {
         if (!isRetry) {
-            startTime += NewDateUtils.calcOffset(timeOffset);
-            endTime += NewDateUtils.calcOffset(timeOffset);
+            startTime += DateTransUtils.calcOffset(timeOffset);
+            endTime += DateTransUtils.calcOffset(timeOffset);
         }
         String strStartTime = 
DateTransUtils.millSecConvertToTimeStr(startTime, cycleUnit);
         String strEndTime = DateTransUtils.millSecConvertToTimeStr(endTime, 
cycleUnit);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
index 355bf3909f..86ce040210 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -34,6 +34,7 @@ import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
 import org.apache.inlong.agent.plugin.utils.file.PathDateExpression;
 import org.apache.inlong.agent.state.State;
 import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.agent.utils.file.FileUtils;
 
 import org.slf4j.Logger;
@@ -70,6 +71,7 @@ public class LogFileCollectTask extends Task {
 
     public static final String DEFAULT_FILE_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.FileInstance";
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LogFileCollectTask.class);
+    public static final String SCAN_CYCLE_RANCE = "-2";
     private TaskProfile taskProfile;
     private Db basicDb;
     private TaskManager taskManager;
@@ -117,7 +119,7 @@ public class LogFileCollectTask extends Task {
             isRealTime = true;
         }
         instanceManager = new InstanceManager(taskProfile.getTaskId(), 
taskProfile.getInt(TaskConstants.FILE_MAX_NUM),
-                basicDb);
+                basicDb, taskManager.getTaskDb());
         try {
             instanceManager.start();
         } catch (Exception e) {
@@ -318,7 +320,7 @@ public class LogFileCollectTask extends Task {
         if (!retry) {
             long currentTime = System.currentTimeMillis();
             // only scan two cycle, like two hours or two days
-            long offset = NewDateUtils.calcOffset("-2" + 
taskProfile.getCycleUnit());
+            long offset = DateTransUtils.calcOffset(SCAN_CYCLE_RANCE + 
taskProfile.getCycleUnit());
             startScanTime = currentTime + offset;
             endScanTime = currentTime;
         }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
index 706167788f..c38eb57be8 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
@@ -223,54 +223,14 @@ public class NewDateUtils {
         }
 
         if (timeOffset.startsWith("-")) {
-            timeInterval -= calcOffset(timeOffset);
+            timeInterval -= DateTransUtils.calcOffset(timeOffset);
         } else {
-            timeInterval += calcOffset(timeOffset);
+            timeInterval += DateTransUtils.calcOffset(timeOffset);
         }
 
         return isValidCreationTime(dataTime, timeInterval);
     }
 
-    /**
-     * Calculate offset time based on offset
-     * The current offset will only be offset forward, or it can be offset 
backward to be compatible with the previous
-     * calculation method (subtraction).
-     * When it is offset backward, it returns negative;
-     * When offset forward, return positive
-     *
-     * @param timeOffset offset,such as -1d,-4h,-10m;
-     * @return
-     */
-    public static long calcOffset(String timeOffset) {
-        if (timeOffset.length() == 0) {
-            return 0;
-        }
-        String offsetUnit = timeOffset.substring(timeOffset.length() - 1);
-        int startIndex;
-        int symbol;
-        if (timeOffset.charAt(0) == '-') {
-            symbol = -1;
-            startIndex = 1;
-        } else {
-            symbol = 1;
-            startIndex = 0;
-        }
-
-        String strOffset = timeOffset.substring(startIndex, 
timeOffset.length() - 1);
-        if (strOffset.length() == 0) {
-            return 0;
-        }
-        int offsetTime = Integer.parseInt(strOffset);
-        if ("d".equalsIgnoreCase(offsetUnit)) {
-            return offsetTime * 24 * 3600 * 1000 * symbol;
-        } else if ("h".equalsIgnoreCase(offsetUnit)) {
-            return offsetTime * 3600 * 1000 * symbol;
-        } else if ("m".equalsIgnoreCase(offsetUnit)) {
-            return offsetTime * 60 * 1000 * symbol;
-        }
-        return 0;
-    }
-
     /*
      * Check whether the data time is between curTime - interval and curTime + 
interval.
      */
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
index 46860ae20a..ea575e613d 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.agent.plugin.utils;
 
-import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.common.metric.MetricRegister;
 
 import org.apache.commons.io.FileUtils;
@@ -46,12 +46,12 @@ public class TestUtils {
 
     @Test
     public void testCalcOffset() {
-        Assert.assertTrue(NewDateUtils.calcOffset("-1h") == -3600 * 1000);
-        Assert.assertTrue(NewDateUtils.calcOffset("1D") == 24 * 3600 * 1000);
-        Assert.assertTrue(NewDateUtils.calcOffset("0") == 0);
-        Assert.assertTrue(NewDateUtils.calcOffset("1") == 0);
-        Assert.assertTrue(NewDateUtils.calcOffset("10") == 0);
-        Assert.assertTrue(NewDateUtils.calcOffset("") == 0);
+        Assert.assertTrue(DateTransUtils.calcOffset("-1h") == -3600 * 1000);
+        Assert.assertTrue(DateTransUtils.calcOffset("1D") == 24 * 3600 * 1000);
+        Assert.assertTrue(DateTransUtils.calcOffset("0") == 0);
+        Assert.assertTrue(DateTransUtils.calcOffset("1") == 0);
+        Assert.assertTrue(DateTransUtils.calcOffset("10") == 0);
+        Assert.assertTrue(DateTransUtils.calcOffset("") == 0);
     }
 
     public static String getTestTriggerProfile() {
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java
index 6401fa8ffb..583604b32b 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskStateEnum.java
@@ -25,7 +25,7 @@ public enum TaskStateEnum {
     NEW(0),
     RUNNING(1),
     FROZEN(2),
-    FINISH(3);
+    RETRY_FINISH(3);
 
     private final int state;
 
@@ -42,7 +42,7 @@ public enum TaskStateEnum {
             case 2:
                 return FROZEN;
             case 3:
-                return FINISH;
+                return RETRY_FINISH;
             default:
                 throw new RuntimeException("Unsupported task state " + state);
         }

Reply via email to