This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new afba2a8f80 [INLONG-9138][Agent] Add task manager (#9139)
afba2a8f80 is described below

commit afba2a8f8014dfc4ec8c19e91b07d1d4dcc26674
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Fri Oct 27 17:33:35 2023 +0800

    [INLONG-9138][Agent] Add task manager (#9139)
---
 .../org/apache/inlong/agent/plugin/file/Task.java  |  52 +++
 .../inlong/agent/core/task/file/MemoryManager.java | 117 ++++++
 .../inlong/agent/core/task/file/TaskManager.java   | 449 +++++++++++++++++++++
 .../inlong/agent/core/AgentBaseTestsHelper.java    |  37 +-
 .../apache/inlong/agent/core/task/MockTask.java    |  69 ++++
 .../inlong/agent/core/task/TestTaskManager.java    |  97 +++++
 .../agent/plugin/utils/file/NewDateUtils.java      | 237 +----------
 7 files changed, 832 insertions(+), 226 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
new file mode 100755
index 0000000000..ce580a0bb7
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.file;
+
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.state.AbstractStateWrapper;
+
+import java.io.IOException;
+
+/**
+ * Task interface, which generates instance in condition.
+ */
+public abstract class Task extends AbstractStateWrapper {
+
+    /**
+     * init task by profile
+     *
+     * @throws IOException
+     */
+    public abstract void init(Object srcManager, TaskProfile profile, Db 
basicDb) throws IOException;
+
+    /**
+     * destroy task.
+     */
+    public abstract void destroy();
+
+    /**
+     * get task profile
+     */
+    public abstract TaskProfile getProfile();
+
+    /**
+     * get task id
+     */
+    public abstract String getTaskId();
+}
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java
new file mode 100644
index 0000000000..fca9d37c72
--- /dev/null
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.core.task.file;
+
+import org.apache.inlong.agent.conf.AgentConfiguration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_WRITER_PERMIT;
+
+/**
+ * used to limit global memory to avoid oom
+ */
+public class MemoryManager {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MemoryManager.class);
+    private static volatile MemoryManager memoryManager = null;
+    private final AgentConfiguration conf;
+    private ConcurrentHashMap<String, Semaphore> semaphoreMap = new 
ConcurrentHashMap<>();
+
+    private MemoryManager() {
+        this.conf = AgentConfiguration.getAgentConf();
+        Semaphore semaphore = null;
+        semaphore = new Semaphore(
+                conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT));
+        semaphoreMap.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, semaphore);
+
+        semaphore = new Semaphore(
+                conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT));
+        semaphoreMap.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, semaphore);
+
+        semaphore = new Semaphore(
+                conf.getInt(AGENT_GLOBAL_WRITER_PERMIT, 
DEFAULT_AGENT_GLOBAL_WRITER_PERMIT));
+        semaphoreMap.put(AGENT_GLOBAL_WRITER_PERMIT, semaphore);
+    }
+
+    /**
+     * manager singleton
+     */
+    public static MemoryManager getInstance() {
+        if (memoryManager == null) {
+            synchronized (MemoryManager.class) {
+                if (memoryManager == null) {
+                    memoryManager = new MemoryManager();
+                }
+            }
+        }
+        return memoryManager;
+    }
+
+    public boolean tryAcquire(String semaphoreName, int permit) {
+        Semaphore semaphore = semaphoreMap.get(semaphoreName);
+        if (semaphore == null) {
+            LOGGER.error("tryAcquire {} not exist");
+            return false;
+        }
+        return semaphore.tryAcquire(permit);
+    }
+
+    public void release(String semaphoreName, int permit) {
+        Semaphore semaphore = semaphoreMap.get(semaphoreName);
+        if (semaphore == null) {
+            LOGGER.error("release {} not exist");
+            return;
+        }
+        semaphore.release(permit);
+    }
+
+    public int getLeft(String semaphoreName) {
+        Semaphore semaphore = semaphoreMap.get(semaphoreName);
+        if (semaphore == null) {
+            LOGGER.error("getLeft {} not exist");
+            return -1;
+        }
+        return semaphore.availablePermits();
+    }
+
+    public void printDetail(String semaphoreName) {
+        Semaphore semaphore = semaphoreMap.get(semaphoreName);
+        if (semaphore == null) {
+            LOGGER.error("printDetail {} not exist");
+            return;
+        }
+        LOGGER.info("permit left {} wait {} {}", semaphore.availablePermits(), 
semaphore.getQueueLength(),
+                semaphoreName);
+    }
+
+    public void printAll() {
+        printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT);
+        printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT);
+        printDetail(AGENT_GLOBAL_WRITER_PERMIT);
+    }
+}
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
new file mode 100644
index 0000000000..b52a51c90a
--- /dev/null
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.core.task.file;
+
+import org.apache.inlong.agent.common.AbstractDaemon;
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.core.task.TaskAction;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.db.RocksDbImp;
+import org.apache.inlong.agent.db.TaskProfileDb;
+import org.apache.inlong.agent.plugin.file.Task;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.enums.TaskStateEnum;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT;
+import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT;
+import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE;
+
+/**
+ * handle the task config from manager, including add, delete, update etc.
+ * the task config is store in both db and memory.
+ */
+public class TaskManager extends AbstractDaemon {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskManager.class);
+    public static final int CONFIG_QUEUE_CAPACITY = 1;
+    public static final int CORE_THREAD_SLEEP_TIME = 1000;
+    private static final int ACTION_QUEUE_CAPACITY = 100000;
+    // task basic db
+    private final Db taskBasicDb;
+    // instance basic db
+    private final Db instanceBasicDb;
+    // task in db
+    private final TaskProfileDb taskDb;
+    // task in memory
+    private final ConcurrentHashMap<String, Task> taskMap;
+    // task config from manager.
+    private final BlockingQueue<List<TaskProfile>> configQueue;
+    // task thread pool;
+    private final ThreadPoolExecutor runningPool;
+    // tasks which are not accepted by running pool.
+    private final BlockingQueue<Task> pendingTasks;
+    private final int taskMaxLimit;
+    private final AgentConfiguration agentConf;
+    // instance profile queue.
+    private final BlockingQueue<TaskAction> actionQueue;
+
+    /**
+     * Init task manager.
+     */
+    public TaskManager() {
+        this.agentConf = AgentConfiguration.getAgentConf();
+        this.taskBasicDb = initDb(
+                agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_TASK));
+        this.instanceBasicDb = initDb(
+                agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE));
+        taskDb = new TaskProfileDb(taskBasicDb);
+        this.runningPool = new ThreadPoolExecutor(
+                0, Integer.MAX_VALUE,
+                60L, TimeUnit.SECONDS,
+                new SynchronousQueue<>(),
+                new AgentThreadFactory("task-manager-running-pool"));
+        taskMap = new ConcurrentHashMap<>();
+        taskMaxLimit = agentConf.getInt(JOB_NUMBER_LIMIT, 
DEFAULT_JOB_NUMBER_LIMIT);
+        pendingTasks = new LinkedBlockingQueue<>(taskMaxLimit);
+        configQueue = new LinkedBlockingQueue<>(CONFIG_QUEUE_CAPACITY);
+        actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
+    }
+
+    /**
+     * init db by class name
+     *
+     * @return db
+     */
+    public static Db initDb(String childPath) {
+        try {
+            return new RocksDbImp(childPath);
+        } catch (Exception ex) {
+            throw new UnsupportedClassVersionError(ex.getMessage());
+        }
+    }
+
+    public void submitTaskProfiles(List<TaskProfile> taskProfiles) {
+        if (taskProfiles == null) {
+            return;
+        }
+        while (configQueue.size() != 0) {
+            configQueue.poll();
+        }
+        configQueue.add(taskProfiles);
+    }
+
+    public boolean submitAction(TaskAction action) {
+        if (action == null) {
+            return false;
+        }
+        return actionQueue.offer(action);
+    }
+
+    /**
+     * thread for core thread.
+     *
+     * @return runnable profile.
+     */
+    private Runnable coreThread() {
+        return () -> {
+            Thread.currentThread().setName("task-manager-core");
+            while (isRunnable()) {
+                try {
+                    AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
+                    dealWithConfigQueue(configQueue);
+                    dealWithActionQueue(actionQueue);
+                } catch (Throwable ex) {
+                    LOGGER.error("exception caught", ex);
+                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
ex);
+                }
+            }
+        };
+    }
+
+    private void dealWithConfigQueue(BlockingQueue<List<TaskProfile>> queue) {
+        List<TaskProfile> dataConfigs = queue.poll();
+        if (dataConfigs == null) {
+            return;
+        }
+        keepPaceWithManager(dataConfigs);
+        keepPaceWithDb();
+    }
+
+    private void dealWithActionQueue(BlockingQueue<TaskAction> queue) {
+        while (isRunnable()) {
+            try {
+                TaskAction action = queue.poll();
+                if (action == null) {
+                    break;
+                }
+                TaskProfile profile = action.getProfile();
+                switch (action.getActionType()) {
+                    case FINISH:
+                        LOGGER.info("deal finish action, taskId {}", 
profile.getTaskId());
+                        finishTask(profile);
+                        break;
+                    default:
+                        LOGGER.error("invalid action type for action queue: 
taskId {} type {}", profile.getTaskId(),
+                                action.getActionType());
+                }
+            } catch (Throwable ex) {
+                LOGGER.error("dealWithActionQueue", ex);
+                ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
+            }
+        }
+    }
+
+    /**
+     * keep pace with data config from manager, task state should be only 
RUNNING or FROZEN.
+     * NEW and STOP only used in manager
+     */
+    private void keepPaceWithManager(List<TaskProfile> taskProfiles) {
+        LOGGER.info("deal with List<TaskProfile> {}", taskProfiles);
+        Map<String, TaskProfile> tasksFromManager = new ConcurrentHashMap<>();
+        taskProfiles.forEach((profile) -> {
+            TaskStateEnum state = profile.getState();
+            if (state == TaskStateEnum.RUNNING || state == 
TaskStateEnum.FROZEN) {
+                tasksFromManager.put(profile.getTaskId(), profile);
+            } else {
+                LOGGER.error("task {} invalid task state {}", profile, state);
+            }
+        });
+        traverseManagerTasksToDb(tasksFromManager);
+        traverseDbTasksToManager(tasksFromManager);
+    }
+
+    /**
+     * keep pace with task in db
+     */
+    private void keepPaceWithDb() {
+        traverseDbTasksToMemory();
+        traverseMemoryTasksToDb();
+    }
+
+    /**
+     * keep pace with task in db
+     */
+    private void traverseManagerTasksToDb(Map<String, TaskProfile> 
tasksFromManager) {
+        tasksFromManager.values().forEach((profileFromManager) -> {
+            TaskProfile taskFromDb = 
taskDb.getTask(profileFromManager.getTaskId());
+            if (taskFromDb == null) {
+                LOGGER.info("traverseManagerTasksToDb task {} not found in db 
retry {} state {}, add it",
+                        profileFromManager.getTaskId(),
+                        profileFromManager.isRetry(), 
profileFromManager.getState());
+                addTask(profileFromManager);
+            } else {
+                TaskStateEnum managerState = profileFromManager.getState();
+                TaskStateEnum dbState = taskFromDb.getState();
+                if (managerState == dbState) {
+                    return;
+                }
+                if (dbState == TaskStateEnum.FINISH) {
+                    LOGGER.info("traverseManagerTasksToDb task {} dbState {} 
retry {}, do nothing",
+                            taskFromDb.getTaskId(), dbState,
+                            taskFromDb.isRetry());
+                    return;
+                }
+                if (managerState == TaskStateEnum.RUNNING) {
+                    LOGGER.info("traverseManagerTasksToDb task {} dbState {} 
retry {}, active it",
+                            taskFromDb.getTaskId(), dbState, 
taskFromDb.isRetry());
+                    activeTask(profileFromManager);
+                } else {
+                    LOGGER.info("traverseManagerTasksToDb task {} dbState {} 
retry {}, freeze it",
+                            taskFromDb.getTaskId(), dbState, 
taskFromDb.isRetry());
+                    freezeTask(profileFromManager);
+                }
+            }
+        });
+    }
+
+    /**
+     * traverse tasks in db, if not found in tasks from manager then delete it
+     */
+    private void traverseDbTasksToManager(Map<String, TaskProfile> 
tasksFromManager) {
+        taskDb.getTasks().forEach((profileFromDb) -> {
+            if (!tasksFromManager.containsKey(profileFromDb.getTaskId())) {
+                LOGGER.info("traverseDbTasksToManager try to delete task {}", 
profileFromDb.getTaskId());
+                deleteTask(profileFromDb);
+            }
+        });
+    }
+
+    /**
+     * manager task state is RUNNING and taskMap not found then add
+     * manager task state is FROZE and taskMap found thrn delete
+     */
+    private void traverseDbTasksToMemory() {
+        taskDb.getTasks().forEach((profileFromDb) -> {
+            TaskStateEnum dbState = profileFromDb.getState();
+            Task task = taskMap.get(profileFromDb.getTaskId());
+            if (dbState == TaskStateEnum.RUNNING) {
+                if (task == null) {
+                    LOGGER.info("traverseDbTasksToMemory add task to mem 
taskId {}", profileFromDb.getTaskId());
+                    addToMemory(profileFromDb);
+                }
+            } else if (dbState == TaskStateEnum.FROZEN) {
+                if (task != null) {
+                    LOGGER.info("traverseDbTasksToMemory delete task from mem 
taskId {}",
+                            profileFromDb.getTaskId());
+                    deleteFromMemory(profileFromDb.getTaskId());
+                }
+            } else {
+                if (dbState != TaskStateEnum.FINISH) {
+                    LOGGER.error("task {} invalid state {}", 
profileFromDb.getTaskId(), dbState);
+                }
+            }
+        });
+    }
+
+    /**
+     * task in taskMap but not in taskDb then delete
+     * task in taskMap but task state from db is FROZEN then delete
+     */
+    private void traverseMemoryTasksToDb() {
+        taskMap.values().forEach((task) -> {
+            TaskProfile profileFromDb = taskDb.getTask(task.getTaskId());
+            if (profileFromDb == null) {
+                deleteFromMemory(task.getTaskId());
+                return;
+            }
+            TaskStateEnum stateFromDb = profileFromDb.getState();
+            if (stateFromDb != TaskStateEnum.RUNNING) {
+                deleteFromMemory(task.getTaskId());
+            }
+        });
+    }
+
+    /**
+     * add task profile to db
+     * if task state is RUNNING then add task to memory
+     */
+    private void addTask(TaskProfile taskProfile) {
+        if (taskMap.size() >= taskMaxLimit) {
+            LOGGER.error("taskMap size {} over limit {}", taskMap.size(), 
taskMaxLimit);
+            return;
+        }
+        addToDb(taskProfile);
+        TaskStateEnum state = 
TaskStateEnum.getTaskState(taskProfile.getInt(TASK_STATE));
+        if (state == TaskStateEnum.RUNNING) {
+            addToMemory(taskProfile);
+        } else {
+            LOGGER.info("taskId {} state {} no need to add to memory", 
taskProfile.getTaskId(),
+                    taskProfile.getState());
+        }
+    }
+
+    private void deleteTask(TaskProfile taskProfile) {
+        deleteFromDb(taskProfile);
+        deleteFromMemory(taskProfile.getTaskId());
+    }
+
+    private void freezeTask(TaskProfile taskProfile) {
+        updateToDb(taskProfile);
+        deleteFromMemory(taskProfile.getTaskId());
+    }
+
+    private void finishTask(TaskProfile taskProfile) {
+        taskProfile.setState(TaskStateEnum.FINISH);
+        updateToDb(taskProfile);
+        deleteFromMemory(taskProfile.getTaskId());
+    }
+
+    private void activeTask(TaskProfile taskProfile) {
+        updateToDb(taskProfile);
+        addToMemory(taskProfile);
+    }
+
+    private void restoreFromDb() {
+        List<TaskProfile> taskProfileList = taskDb.getTasks();
+        taskProfileList.forEach((profile) -> {
+            if (profile.getState() == TaskStateEnum.RUNNING) {
+                LOGGER.info("restoreFromDb taskId {}", profile.getTaskId());
+                addToMemory(profile);
+            }
+        });
+    }
+
+    private void stopAllTasks() {
+        taskMap.values().forEach((task) -> {
+            task.destroy();
+        });
+        taskMap.clear();
+    }
+
+    /**
+     * add task to db, it was expected that there is no record refer the task 
id.
+     * cause the task id will change if the task content changes, replace the 
record
+     * if it is found, the memory record will be updated by the db.
+     */
+    private void addToDb(TaskProfile taskProfile) {
+        if (taskDb.getTask(taskProfile.getTaskId()) != null) {
+            LOGGER.error("task {} should not exist", taskProfile);
+        }
+        taskDb.storeTask(taskProfile);
+    }
+
+    private void deleteFromDb(TaskProfile taskProfile) {
+        if (taskDb.getTask(taskProfile.getTaskId()) == null) {
+            LOGGER.error("try to delete task {} but not found in db", 
taskProfile);
+            return;
+        }
+        taskDb.deleteTask(taskProfile.getTaskId());
+    }
+
+    private void updateToDb(TaskProfile taskProfile) {
+        if (taskDb.getTask(taskProfile.getTaskId()) == null) {
+            LOGGER.error("task {} not found, agent may have been reinstalled", 
taskProfile);
+        }
+        taskDb.storeTask(taskProfile);
+    }
+
+    /**
+     * add task to memory, if there is a record refer to the task id exist we 
need to destroy it first.
+     */
+    private void addToMemory(TaskProfile taskProfile) {
+        Task oldTask = taskMap.get(taskProfile.getTaskId());
+        if (oldTask != null) {
+            oldTask.destroy();
+            taskMap.remove(taskProfile.getTaskId());
+            LOGGER.error("old task {} should not exist, try stop it first",
+                    taskProfile);
+        }
+        try {
+            Class<?> taskClass = Class.forName(taskProfile.getTaskClass());
+            Task task = (Task) taskClass.newInstance();
+            task.init(this, taskProfile, instanceBasicDb);
+            taskMap.put(taskProfile.getTaskId(), task);
+            runningPool.submit(task);
+            LOGGER.info(
+                    "add task {} into memory, taskMap size {}, runningPool 
task total {}, runningPool task active {}",
+                    task.getTaskId(), taskMap.size(), 
runningPool.getTaskCount(),
+                    runningPool.getActiveCount());
+        } catch (Throwable t) {
+            LOGGER.error("add task error {}", t.getMessage());
+        }
+    }
+
+    private void deleteFromMemory(String taskId) {
+        Task oldTask = taskMap.get(taskId);
+        if (oldTask == null) {
+            LOGGER.error("old task {} not found", taskId);
+            return;
+        }
+        oldTask.destroy();
+        taskMap.remove(oldTask.getTaskId());
+        LOGGER.info(
+                "delete task {} from memory, taskMap size {}, runningPool task 
total {}, runningPool task active {}",
+                oldTask.getTaskId(), taskMap.size(), 
runningPool.getTaskCount(),
+                runningPool.getActiveCount());
+    }
+
+    public Task getTask(String taskId) {
+        return taskMap.get(taskId);
+    }
+
+    public TaskProfile getTaskProfile(String taskId) {
+        return taskDb.getTask(taskId);
+    }
+
+    @Override
+    public void start() throws Exception {
+        restoreFromDb();
+        submitWorker(coreThread());
+    }
+
+    @Override
+    public void stop() throws Exception {
+        stopAllTasks();
+        waitForTerminate();
+        runningPool.shutdown();
+    }
+}
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
index ea771e96f4..d3d593d345 100755
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
@@ -18,8 +18,14 @@
 package org.apache.inlong.agent.core;
 
 import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
+import org.apache.inlong.common.enums.TaskStateEnum;
+import org.apache.inlong.common.pojo.agent.DataConfig;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,7 +39,8 @@ import java.nio.file.Paths;
 public class AgentBaseTestsHelper {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AgentBaseTestsHelper.class);
-
+    private static final GsonBuilder gsonBuilder = new 
GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss");
+    private static final Gson GSON = gsonBuilder.create();
     private final String className;
     private Path testRootDir;
 
@@ -64,4 +71,32 @@ public class AgentBaseTestsHelper {
             }
         }
     }
+
+    public TaskProfile getTaskProfile(int taskId, String pattern, boolean 
retry, Long startTime, Long endTime,
+            TaskStateEnum state) {
+        DataConfig dataConfig = getDataConfig(taskId, pattern, retry, 
startTime, endTime, state);
+        TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
+        return profile;
+    }
+
+    private DataConfig getDataConfig(int taskId, String pattern, boolean 
retry, Long startTime, Long endTime,
+            TaskStateEnum state) {
+        DataConfig dataConfig = new DataConfig();
+        dataConfig.setInlongGroupId("testGroupId");
+        dataConfig.setInlongStreamId("testStreamId");
+        dataConfig.setDataReportType(1);
+        dataConfig.setTaskType(3);
+        dataConfig.setTaskId(taskId);
+        dataConfig.setState(state.ordinal());
+        FileTaskConfig fileTaskConfig = new FileTaskConfig();
+        fileTaskConfig.setPattern(pattern);
+        fileTaskConfig.setTimeOffset("0d");
+        fileTaskConfig.setMaxFileCount(100);
+        fileTaskConfig.setCycleUnit("D");
+        fileTaskConfig.setRetry(retry);
+        fileTaskConfig.setStartTime(startTime);
+        fileTaskConfig.setEndTime(endTime);
+        dataConfig.setExtParams(GSON.toJson(fileTaskConfig));
+        return dataConfig;
+    }
 }
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java
new file mode 100644
index 0000000000..23c5ad5cc5
--- /dev/null
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.core.task;
+
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.core.task.file.TaskManager;
+import org.apache.inlong.agent.db.Db;
+import org.apache.inlong.agent.plugin.file.Task;
+
+import java.io.IOException;
+
+public class MockTask extends Task {
+
+    public static final int INIT_TIME = 100;
+    public static final int RUN_TIME = 101;
+    public static final int DESTROY_TIME = 102;
+    private TaskProfile profile;
+    private long index = INIT_TIME;
+    public long initTime = 0;
+    public long destroyTime = 0;
+    public long runtime = 0;
+    private TaskManager manager;
+
+    @Override
+    public void init(Object srcManager, TaskProfile profile, Db basicDb) 
throws IOException {
+        manager = (TaskManager) srcManager;
+        this.profile = profile;
+    }
+
+    @Override
+    public void destroy() {
+        destroyTime = index++;
+    }
+
+    @Override
+    public TaskProfile getProfile() {
+        return profile;
+    }
+
+    @Override
+    public String getTaskId() {
+        return profile.getTaskId();
+    }
+
+    @Override
+    public void addCallbacks() {
+
+    }
+
+    @Override
+    public void run() {
+        runtime = index++;
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
new file mode 100755
index 0000000000..bf9047c40a
--- /dev/null
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.core.task;
+
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.core.AgentBaseTestsHelper;
+import org.apache.inlong.agent.core.task.file.TaskManager;
+import org.apache.inlong.common.enums.TaskStateEnum;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+public class TestTaskManager {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TestTaskManager.class);
+    private static TaskManager manager;
+    private static AgentBaseTestsHelper helper;
+
+    @BeforeClass
+    public static void setup() {
+        helper = new 
AgentBaseTestsHelper(TestTaskManager.class.getName()).setupAgentHome();
+        try {
+            manager = new TaskManager();
+            manager.start();
+        } catch (Exception e) {
+            Assert.assertTrue("manager start error", false);
+        }
+    }
+
+    @AfterClass
+    public static void teardown() throws Exception {
+        manager.stop();
+        helper.teardownAgentHome();
+    }
+
+    @Test
+    public void testTaskManager() {
+        String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
+        TaskProfile taskProfile1 = helper.getTaskProfile(1, pattern, false, 
0L, 0L, TaskStateEnum.RUNNING);
+        String taskId1 = taskProfile1.getTaskId();
+        taskProfile1.setTaskClass(MockTask.class.getCanonicalName());
+        List<TaskProfile> taskProfiles1 = new ArrayList<>();
+        taskProfiles1.add(taskProfile1);
+        // test add
+        manager.submitTaskProfiles(taskProfiles1);
+        await().atMost(3, TimeUnit.SECONDS).until(() -> 
manager.getTask(taskId1) != null);
+        LOGGER.info("state {}", manager.getTaskProfile(taskId1).getState());
+        Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == 
TaskStateEnum.RUNNING);
+
+        // test froze
+        taskProfile1.setState(TaskStateEnum.FROZEN);
+        manager.submitTaskProfiles(taskProfiles1);
+        await().atMost(3, TimeUnit.SECONDS).until(() -> 
manager.getTask(taskId1) == null);
+        Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == 
TaskStateEnum.FROZEN);
+        taskProfile1.setState(TaskStateEnum.RUNNING);
+        manager.submitTaskProfiles(taskProfiles1);
+        await().atMost(3, TimeUnit.SECONDS).until(() -> 
manager.getTask(taskId1) != null);
+        Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == 
TaskStateEnum.RUNNING);
+
+        // test delete
+        TaskProfile taskProfile2 = helper.getTaskProfile(2, pattern, false, 
0L, 0L, TaskStateEnum.RUNNING);
+        taskProfile2.setTaskClass(MockTask.class.getCanonicalName());
+        List<TaskProfile> taskProfiles2 = new ArrayList<>();
+        taskProfiles2.add(taskProfile2);
+        manager.submitTaskProfiles(taskProfiles2);
+        await().atMost(3, TimeUnit.SECONDS).until(() -> 
manager.getTask(taskId1) == null);
+        Assert.assertTrue(manager.getTaskProfile(taskId1) == null);
+        String taskId2 = taskProfile2.getTaskId();
+        await().atMost(3, TimeUnit.SECONDS).until(() -> 
manager.getTask(taskId2) != null);
+        Assert.assertTrue(manager.getTaskProfile(taskId2).getState() == 
TaskStateEnum.RUNNING);
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
index 9a9f8ece96..363b029783 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
@@ -220,10 +220,10 @@ public class NewDateUtils {
             timeInterval = DAY_TIMEOUT_INTERVAL;
         }
 
-        // 处理偏移量,超时周期要加上时间偏移偏移量
+        // To handle the offset, add the time offset to the timeout period
         if (timeOffset.startsWith("-")) {
             timeInterval += caclOffset(timeOffset);
-        } else { // 处理向后偏移
+        } else { // Process Backward Offset
             timeInterval -= caclOffset(timeOffset);
         }
 
@@ -231,20 +231,23 @@ public class NewDateUtils {
     }
 
     /**
-     * 根据偏移量计算偏移时间
-     * 当前偏移只会向前偏移,也可向后偏移为兼容之前的计算方式(相减),当为向后偏移时,返回负;当向前偏移,返回正
+     * Calculate offset time based on offset
+     * The current offset will only be offset forward, or it can be offset 
backward to be compatible with the previous
+     * calculation method (subtraction).
+     * When it is offset backward, it returns negative;
+     * When offset forward, return positive
      *
-     * @param timeOffset 偏移量,如-1d,-4h,-10m等;
+     * @param timeOffset offset,such as -1d,-4h,-10m;
      * @return
      */
     public static long caclOffset(String timeOffset) {
         String offsetUnit = timeOffset.substring(timeOffset.length() - 1);
         int startIndex = timeOffset.charAt(0) == '-' ? 1 : 0;
-        // 默认向后偏移
+        // Default Backward Offset
         int symbol = 1;
         if (startIndex == 1) {
             symbol = 1;
-        } else if (startIndex == 0) { // 向前偏移
+        } else if (startIndex == 0) { // Forward offset
             symbol = -1;
         }
         int offsetTime = Integer
@@ -410,7 +413,8 @@ public class NewDateUtils {
 
         Matcher mat = Pattern.compile(longestDatePattern).matcher(fileName);
         boolean find = mat.find();
-        // TODO : 
存在文件名中有多个部分匹配到时间表达式的情况("/data/joox_logs/2000701106/201602170040.log" YYYYMMDDhh)
+        // TODO : more than one part match the time regex in file name 
("/data/joox_logs/2000701106/201602170040.log"
+        // YYYYMMDDhh)
         if (!find) {
             logger.error("Can't find the pattern {} for file name {}", 
longestDatePattern,
                     fileName);
@@ -717,221 +721,4 @@ public class NewDateUtils {
 
         return ret;
     }
-
-    public static void main(String[] args) throws Exception {
-
-        // String aa = "/data/taox/YYYYMMDDt_log/[0-9]+_YYYYMMDD_hh00.log";
-        /*
-         * String aa = "/data/taox/YYYYt_logMMDD/[0-9]+_YYYYMMDD_hh00.log"; 
String bb =
-         * replaceDateExpressionWithRegex(aa); System.out.println("---------: 
" + bb);
-         *
-         * String cc = replaceDateExpression(Calendar.getInstance(), aa); 
System.out.println("---------: " + cc);
-         *
-         * String dd = replaceDateExpression1(Calendar.getInstance(), aa); 
System.out.println("---------: " + dd);
-         */
-
-        // String text = 
"/data1/qq_BaseInfo/YYYY-MM/YYYY-MM-DD/gamedr.gamedb[0-9]+.minigame
-        // .db/YYYY-MM-DD-[0-9]+.txt";
-        // System.out.println(replaceDateExpressionWithRegex(text));
-        //
-        // int timeInterval = 1000;
-        // String timeOffset = "10H";
-        //
-        // String offsetUnit = timeOffset.substring(timeOffset.length() - 1);
-        // int startIndex = timeOffset.charAt(0) == '-' ? 1 : 0;
-        // int offsetTime = Integer.parseInt(timeOffset.substring(startIndex, 
timeOffset.length()
-        // - 1));
-        // if("d".equalsIgnoreCase(offsetUnit)){
-        // timeInterval += offsetTime * 24 * 3600 * 1000;
-        // }else if("h".equalsIgnoreCase(offsetUnit)){
-        // timeInterval += offsetTime * 3600 * 1000;
-        // }else if("m".equalsIgnoreCase(offsetUnit)){
-        // timeInterval += offsetTime * 60 * 1000;
-        // }
-        //
-        // System.out.println(timeInterval);
-        //
-        // SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd 
HH:mm:ss");
-        //
-        // Calendar calendar = NewDateUtils.getCurDate("D", "-10D");
-        // System.out.println("year: " + calendar.get(Calendar.YEAR) + ", 
month: "
-        // + (calendar.get(Calendar.MONTH) + 1) + ", day: "
-        // + calendar.get(Calendar.DAY_OF_MONTH) + ", hour: "
-        // + calendar.get(Calendar.HOUR_OF_DAY) + ", minute: "
-        // + calendar.get(Calendar.MINUTE) + ", second: "
-        // + calendar.get(Calendar.SECOND));
-        // System.out.println(dateFormat.format(calendar.getTimeInMillis()));
-        //
-        // calendar = getCurDate("H", "-2H");
-        // System.out.println("year: " + calendar.get(Calendar.YEAR) + ", 
month: "
-        // + (calendar.get(Calendar.MONTH) + 1) + ", day: "
-        // + calendar.get(Calendar.DAY_OF_MONTH) + ", hour: "
-        // + calendar.get(Calendar.HOUR_OF_DAY) + ", minute: "
-        // + calendar.get(Calendar.MINUTE) + ", second: "
-        // + calendar.get(Calendar.SECOND));
-        // System.out.println(dateFormat.format(calendar.getTimeInMillis()));
-        //
-        // calendar = getCurDate("H", "-2D");
-        // System.out.println("year: " + calendar.get(Calendar.YEAR) + ", 
month: "
-        // + (calendar.get(Calendar.MONTH) + 1) + ", day: "
-        // + calendar.get(Calendar.DAY_OF_MONTH) + ", hour: "
-        // + calendar.get(Calendar.HOUR_OF_DAY) + ", minute: "
-        // + calendar.get(Calendar.MINUTE) + ", second: "
-        // + calendar.get(Calendar.SECOND));
-        // System.out.println(dateFormat.format(calendar.getTimeInMillis()));
-        //
-        // calendar = getCurDate("5m", "-20m");
-        // System.out.println("year: " + calendar.get(Calendar.YEAR) + ", 
month: "
-        // + (calendar.get(Calendar.MONTH) + 1) + ", day: "
-        // + calendar.get(Calendar.DAY_OF_MONTH) + ", hour: "
-        // + calendar.get(Calendar.HOUR_OF_DAY) + ", minute: "
-        // + calendar.get(Calendar.MINUTE) + ", second: "
-        // + calendar.get(Calendar.SECOND));
-        // System.out.println(dateFormat.format(calendar.getTimeInMillis()));
-        //
-        // String directory = 
"/data/home/user00/xyshome/logsvr/log/YYYYMMDD/[0-9]+_YYYYMMDD_hh00
-        // .log";
-        // calendar = getCurDate("H", "-3H");
-        // System.out.println(replaceDateExpression(calendar, directory));
-        //
-        // 
System.out.println(NewDateUtils.timeStrConvertTomillSec("201404031105",
-        // "m"));
-        // 
System.out.println(NewDateUtils.timeStrConvertTomillSec("2014040223",
-        // "H"));
-        // System.out.println(NewDateUtils
-        // .timeStrConvertTomillSec("20140402", "D"));
-        //
-        // System.out.println(NewDateUtils.millSecConvertToTimeStr(
-        // System.currentTimeMillis(), "Y"));
-        // System.out.println(NewDateUtils.millSecConvertToTimeStr(
-        // System.currentTimeMillis(), "M"));
-        // System.out.println(NewDateUtils.millSecConvertToTimeStr(
-        // System.currentTimeMillis(), "D"));
-        // System.out.println(NewDateUtils.millSecConvertToTimeStr(
-        // System.currentTimeMillis(), "H"));
-        // System.out.println(NewDateUtils.millSecConvertToTimeStr(
-        // System.currentTimeMillis(), "10m"));
-        // System.out.println(NewDateUtils.millSecConvertToTimeStr(
-        // System.currentTimeMillis(), "15m"));
-        // System.out.println(NewDateUtils.millSecConvertToTimeStr(
-        // System.currentTimeMillis(), "30m"));
-        // System.out.println(NewDateUtils.millSecConvertToTimeStr(
-        // NewDateUtils.timeStrConvertTomillSec("201404121900", "10m"),
-        // "10m"));
-        //
-        // NewDateUtils.getDateRegion("20120810", "20120813", "D");
-        // NewDateUtils.getDateRegion("2012081005", "2012081300", "H");
-        // NewDateUtils.getDateRegion("201404111649", "201404111600", "10m");
-        // String dataTime = "20160122";
-        // System.out.println(NewDateUtils.getShouldStartTime(dataTime, "D", 
"-2h"));
-
-        // String dataPath = "/data/herococo/YYYYMMDD_*/YYYYMMDDhhmm.log";
-        // dataPath = NewDateUtils.replaceDateExpressionWithRegex(dataPath);
-        // System.out.println("dataPath: " + dataPath);
-        //
-        // Pattern pattern = Pattern.compile(dataPath, Pattern.CASE_INSENSITIVE
-        // | Pattern.DOTALL | Pattern.MULTILINE);
-        // // Pattern pattern = Pattern.compile("/data/herococo/\\d+/\\d+.log",
-        // // Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
-        // Matcher m = pattern
-        // .matcher("/data/herococo/20140406_a/20140406152730.log");
-        // System.out.println(m.matches());
-        //
-        // dataPath = 
"/data/home/user00/xyshome/logsvr/log/YYYYMMDD/[0-9]+_YYYYMMDD_hh00.log";
-        // dataPath = NewDateUtils.replaceDateExpressionWithRegex(dataPath);
-        // pattern = Pattern.compile(dataPath, Pattern.CASE_INSENSITIVE
-        // | Pattern.DOTALL | Pattern.MULTILINE);
-        // m = pattern
-        // 
.matcher("/data/home/user00/xyshome/logsvr/log/20140406/8_20140406_1600.log");
-        // System.out.println(dataPath);
-        // System.out.println(m.matches());
-        //
-        // dataPath = "/data/work/data2/abc/YYYYMMDDhh.*.txt";
-        // dataPath = NewDateUtils.replaceDateExpressionWithRegex(dataPath);
-        // pattern = Pattern.compile(dataPath, Pattern.CASE_INSENSITIVE
-        // | Pattern.DOTALL | Pattern.MULTILINE);
-        // m = pattern.matcher("/data/work/data2/abc/201404102242.txt");
-        // System.out.println(dataPath);
-        // System.out.println(m.matches());
-        //
-        // List<Long> retTimeList = NewDateUtils.getDateRegion("20140411",
-        // "20140411", "D");
-        // for (Long time : retTimeList) {
-        // System.out.println(NewDateUtils.millSecConvertToTimeStr(time, "D"));
-        // }
-        //
-        // pattern = Pattern
-        // .compile(
-        // 
"/data/home/tlog/logplat/log/tlogd_1/[0-9]+_\\d{4}\\d{2}\\d{2}_\\d{2}00
-        // .log",
-        // Pattern.CASE_INSENSITIVE | Pattern.DOTALL
-        // | Pattern.MULTILINE);
-        // m = pattern
-        // 
.matcher("/data/home/tlog/logplat/log/tlogd_1/65535_20140506_1600.log.1");
-        // System.out.println(m.matches());
-        // System.out.println(m.lookingAt());
-        //
-        // String unit = "h";
-        // if (StringUtils.endsWithIgnoreCase("h", "H")) {
-        // System.out.println("yes");
-        // }
-        //
-        // System.out.println(NewDateUtils.getDateTime("20160106", "D", 
"-4h"));
-
-        // PathDateExpression dateExpression = DateUtils
-        // .extractLongestTimeRegexWithPrefixOrSuffix
-        // ("/data/log/qqtalk/[0-9]+_[0-9]+_id20522_[0-9]+_YYYYMMDD_hh.log");
-        // System.out.println(dateExpression.getLongestDatePattern());
-        // String fileTime = 
getDateTime("/data/log/qqtalk/3900626911_11217_id20522_17_20160420
-        // .log", dateExpression);
-        // System.out.println(fileTime);
-
-        // String dataTime = "20180411";
-        //
-        // String shouldStart = getShouldStartTime(dataTime, "D", "4h");
-        // System.out.println(shouldStart);
-        //
-        // String fileName = "rc_trade_water[0-9]*.YYYY-MM-DD-hh.[0-9]+";
-        //
-        // String newFileName = "rc_trade_water.2016-11-20-12.9";
-        //
-        // /**
-        // * 打印出文件名中 最长的时间表达式
-        // */
-        // PathDateExpression dateExpression = 
DateUtils.extractLongestTimeRegexWithPrefixOrSuffix
-        // (fileName);
-        // System.out.println(dateExpression.getLongestDatePattern());
-        //
-        // /**
-        // * 检查正则表达式是否能匹配到文件
-        // */
-        // Pattern pattern = 
Pattern.compile(replaceDateExpressionWithRegex(fileName),
-        // Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
-        //
-        // Matcher matcher = pattern.matcher(newFileName);
-        //
-        // if (matcher.matches() || matcher.lookingAt()) {
-        // System.out.println("Matched File");
-        // }
-        //
-        // /**
-        // * 打印文件名的时间
-        // */
-        // String fileTime = getDateTime(newFileName, dateExpression);
-        // System.out.println(fileTime);
-        //
-        //
-        // String fileName1 = "/data/joox_logs/2000701106/201602170040.log";
-        // String filePathRegx = 
"/data/joox_logs/2000701106/{YYYYMMDDhh}40.log";
-        // String fullRegx = replaceDateExpressionWithRegex(filePathRegx, 
"dateTimeGN");
-        // System.out.println(fullRegx);
-        // Pattern fullPattern = Pattern.compile(fullRegx);
-        // Matcher fullMatcher = fullPattern.matcher(fileName1);
-        // while (fullMatcher.find()) {
-        // System.out.println(fullMatcher.group("dateTimeGN"));
-        // }
-
-        System.out.println(
-                timeStrConvertTomillSec("2018111209", "h", 
TimeZone.getTimeZone("GMT+8:00")));
-    }
 }

Reply via email to