This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new afba2a8f80 [INLONG-9138][Agent] Add task manager (#9139) afba2a8f80 is described below commit afba2a8f8014dfc4ec8c19e91b07d1d4dcc26674 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Fri Oct 27 17:33:35 2023 +0800 [INLONG-9138][Agent] Add task manager (#9139) --- .../org/apache/inlong/agent/plugin/file/Task.java | 52 +++ .../inlong/agent/core/task/file/MemoryManager.java | 117 ++++++ .../inlong/agent/core/task/file/TaskManager.java | 449 +++++++++++++++++++++ .../inlong/agent/core/AgentBaseTestsHelper.java | 37 +- .../apache/inlong/agent/core/task/MockTask.java | 69 ++++ .../inlong/agent/core/task/TestTaskManager.java | 97 +++++ .../agent/plugin/utils/file/NewDateUtils.java | 237 +---------- 7 files changed, 832 insertions(+), 226 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java new file mode 100755 index 0000000000..ce580a0bb7 --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Task.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.file; + +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.db.Db; +import org.apache.inlong.agent.state.AbstractStateWrapper; + +import java.io.IOException; + +/** + * Task interface, which generates instance in condition. + */ +public abstract class Task extends AbstractStateWrapper { + + /** + * init task by profile + * + * @throws IOException + */ + public abstract void init(Object srcManager, TaskProfile profile, Db basicDb) throws IOException; + + /** + * destroy task. + */ + public abstract void destroy(); + + /** + * get task profile + */ + public abstract TaskProfile getProfile(); + + /** + * get task id + */ + public abstract String getTaskId(); +} diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java new file mode 100644 index 0000000000..fca9d37c72 --- /dev/null +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/MemoryManager.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.core.task.file; + +import org.apache.inlong.agent.conf.AgentConfiguration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; + +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT; +import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_WRITER_PERMIT; + +/** + * used to limit global memory to avoid oom + */ +public class MemoryManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(MemoryManager.class); + private static volatile MemoryManager memoryManager = null; + private final AgentConfiguration conf; + private ConcurrentHashMap<String, Semaphore> semaphoreMap = new ConcurrentHashMap<>(); + + private MemoryManager() { + this.conf = AgentConfiguration.getAgentConf(); + Semaphore semaphore = null; + semaphore = new Semaphore( + conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT)); + semaphoreMap.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, semaphore); + + semaphore = new Semaphore( + conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT)); + semaphoreMap.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, semaphore); + + semaphore = new Semaphore( + conf.getInt(AGENT_GLOBAL_WRITER_PERMIT, DEFAULT_AGENT_GLOBAL_WRITER_PERMIT)); + semaphoreMap.put(AGENT_GLOBAL_WRITER_PERMIT, semaphore); + } + + /** + * manager singleton + */ + public static MemoryManager getInstance() { + if (memoryManager == null) { + synchronized (MemoryManager.class) { + if (memoryManager == null) { + memoryManager = new MemoryManager(); + } + } + } + return memoryManager; + } + + public boolean tryAcquire(String semaphoreName, int permit) { + Semaphore semaphore = semaphoreMap.get(semaphoreName); + if (semaphore == null) { + LOGGER.error("tryAcquire {} not exist"); + return false; + } + return semaphore.tryAcquire(permit); + } + + public void release(String semaphoreName, int permit) { + Semaphore semaphore = semaphoreMap.get(semaphoreName); + if (semaphore == null) { + LOGGER.error("release {} not exist"); + return; + } + semaphore.release(permit); + } + + public int getLeft(String semaphoreName) { + Semaphore semaphore = semaphoreMap.get(semaphoreName); + if (semaphore == null) { + LOGGER.error("getLeft {} not exist"); + return -1; + } + return semaphore.availablePermits(); + } + + public void printDetail(String semaphoreName) { + Semaphore semaphore = semaphoreMap.get(semaphoreName); + if (semaphore == null) { + LOGGER.error("printDetail {} not exist"); + return; + } + LOGGER.info("permit left {} wait {} {}", semaphore.availablePermits(), semaphore.getQueueLength(), + semaphoreName); + } + + public void printAll() { + printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT); + printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT); + printDetail(AGENT_GLOBAL_WRITER_PERMIT); + } +} diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java new file mode 100644 index 0000000000..b52a51c90a --- /dev/null +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/file/TaskManager.java @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.core.task.file; + +import org.apache.inlong.agent.common.AbstractDaemon; +import org.apache.inlong.agent.common.AgentThreadFactory; +import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.core.task.TaskAction; +import org.apache.inlong.agent.db.Db; +import org.apache.inlong.agent.db.RocksDbImp; +import org.apache.inlong.agent.db.TaskProfileDb; +import org.apache.inlong.agent.plugin.file.Task; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.ThreadUtils; +import org.apache.inlong.common.enums.TaskStateEnum; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT; +import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE; + +/** + * handle the task config from manager, including add, delete, update etc. + * the task config is store in both db and memory. + */ +public class TaskManager extends AbstractDaemon { + + private static final Logger LOGGER = LoggerFactory.getLogger(TaskManager.class); + public static final int CONFIG_QUEUE_CAPACITY = 1; + public static final int CORE_THREAD_SLEEP_TIME = 1000; + private static final int ACTION_QUEUE_CAPACITY = 100000; + // task basic db + private final Db taskBasicDb; + // instance basic db + private final Db instanceBasicDb; + // task in db + private final TaskProfileDb taskDb; + // task in memory + private final ConcurrentHashMap<String, Task> taskMap; + // task config from manager. + private final BlockingQueue<List<TaskProfile>> configQueue; + // task thread pool; + private final ThreadPoolExecutor runningPool; + // tasks which are not accepted by running pool. + private final BlockingQueue<Task> pendingTasks; + private final int taskMaxLimit; + private final AgentConfiguration agentConf; + // instance profile queue. + private final BlockingQueue<TaskAction> actionQueue; + + /** + * Init task manager. + */ + public TaskManager() { + this.agentConf = AgentConfiguration.getAgentConf(); + this.taskBasicDb = initDb( + agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.AGENT_LOCAL_DB_PATH_TASK)); + this.instanceBasicDb = initDb( + agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.AGENT_LOCAL_DB_PATH_INSTANCE)); + taskDb = new TaskProfileDb(taskBasicDb); + this.runningPool = new ThreadPoolExecutor( + 0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new AgentThreadFactory("task-manager-running-pool")); + taskMap = new ConcurrentHashMap<>(); + taskMaxLimit = agentConf.getInt(JOB_NUMBER_LIMIT, DEFAULT_JOB_NUMBER_LIMIT); + pendingTasks = new LinkedBlockingQueue<>(taskMaxLimit); + configQueue = new LinkedBlockingQueue<>(CONFIG_QUEUE_CAPACITY); + actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY); + } + + /** + * init db by class name + * + * @return db + */ + public static Db initDb(String childPath) { + try { + return new RocksDbImp(childPath); + } catch (Exception ex) { + throw new UnsupportedClassVersionError(ex.getMessage()); + } + } + + public void submitTaskProfiles(List<TaskProfile> taskProfiles) { + if (taskProfiles == null) { + return; + } + while (configQueue.size() != 0) { + configQueue.poll(); + } + configQueue.add(taskProfiles); + } + + public boolean submitAction(TaskAction action) { + if (action == null) { + return false; + } + return actionQueue.offer(action); + } + + /** + * thread for core thread. + * + * @return runnable profile. + */ + private Runnable coreThread() { + return () -> { + Thread.currentThread().setName("task-manager-core"); + while (isRunnable()) { + try { + AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); + dealWithConfigQueue(configQueue); + dealWithActionQueue(actionQueue); + } catch (Throwable ex) { + LOGGER.error("exception caught", ex); + ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); + } + } + }; + } + + private void dealWithConfigQueue(BlockingQueue<List<TaskProfile>> queue) { + List<TaskProfile> dataConfigs = queue.poll(); + if (dataConfigs == null) { + return; + } + keepPaceWithManager(dataConfigs); + keepPaceWithDb(); + } + + private void dealWithActionQueue(BlockingQueue<TaskAction> queue) { + while (isRunnable()) { + try { + TaskAction action = queue.poll(); + if (action == null) { + break; + } + TaskProfile profile = action.getProfile(); + switch (action.getActionType()) { + case FINISH: + LOGGER.info("deal finish action, taskId {}", profile.getTaskId()); + finishTask(profile); + break; + default: + LOGGER.error("invalid action type for action queue: taskId {} type {}", profile.getTaskId(), + action.getActionType()); + } + } catch (Throwable ex) { + LOGGER.error("dealWithActionQueue", ex); + ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); + } + } + } + + /** + * keep pace with data config from manager, task state should be only RUNNING or FROZEN. + * NEW and STOP only used in manager + */ + private void keepPaceWithManager(List<TaskProfile> taskProfiles) { + LOGGER.info("deal with List<TaskProfile> {}", taskProfiles); + Map<String, TaskProfile> tasksFromManager = new ConcurrentHashMap<>(); + taskProfiles.forEach((profile) -> { + TaskStateEnum state = profile.getState(); + if (state == TaskStateEnum.RUNNING || state == TaskStateEnum.FROZEN) { + tasksFromManager.put(profile.getTaskId(), profile); + } else { + LOGGER.error("task {} invalid task state {}", profile, state); + } + }); + traverseManagerTasksToDb(tasksFromManager); + traverseDbTasksToManager(tasksFromManager); + } + + /** + * keep pace with task in db + */ + private void keepPaceWithDb() { + traverseDbTasksToMemory(); + traverseMemoryTasksToDb(); + } + + /** + * keep pace with task in db + */ + private void traverseManagerTasksToDb(Map<String, TaskProfile> tasksFromManager) { + tasksFromManager.values().forEach((profileFromManager) -> { + TaskProfile taskFromDb = taskDb.getTask(profileFromManager.getTaskId()); + if (taskFromDb == null) { + LOGGER.info("traverseManagerTasksToDb task {} not found in db retry {} state {}, add it", + profileFromManager.getTaskId(), + profileFromManager.isRetry(), profileFromManager.getState()); + addTask(profileFromManager); + } else { + TaskStateEnum managerState = profileFromManager.getState(); + TaskStateEnum dbState = taskFromDb.getState(); + if (managerState == dbState) { + return; + } + if (dbState == TaskStateEnum.FINISH) { + LOGGER.info("traverseManagerTasksToDb task {} dbState {} retry {}, do nothing", + taskFromDb.getTaskId(), dbState, + taskFromDb.isRetry()); + return; + } + if (managerState == TaskStateEnum.RUNNING) { + LOGGER.info("traverseManagerTasksToDb task {} dbState {} retry {}, active it", + taskFromDb.getTaskId(), dbState, taskFromDb.isRetry()); + activeTask(profileFromManager); + } else { + LOGGER.info("traverseManagerTasksToDb task {} dbState {} retry {}, freeze it", + taskFromDb.getTaskId(), dbState, taskFromDb.isRetry()); + freezeTask(profileFromManager); + } + } + }); + } + + /** + * traverse tasks in db, if not found in tasks from manager then delete it + */ + private void traverseDbTasksToManager(Map<String, TaskProfile> tasksFromManager) { + taskDb.getTasks().forEach((profileFromDb) -> { + if (!tasksFromManager.containsKey(profileFromDb.getTaskId())) { + LOGGER.info("traverseDbTasksToManager try to delete task {}", profileFromDb.getTaskId()); + deleteTask(profileFromDb); + } + }); + } + + /** + * manager task state is RUNNING and taskMap not found then add + * manager task state is FROZE and taskMap found thrn delete + */ + private void traverseDbTasksToMemory() { + taskDb.getTasks().forEach((profileFromDb) -> { + TaskStateEnum dbState = profileFromDb.getState(); + Task task = taskMap.get(profileFromDb.getTaskId()); + if (dbState == TaskStateEnum.RUNNING) { + if (task == null) { + LOGGER.info("traverseDbTasksToMemory add task to mem taskId {}", profileFromDb.getTaskId()); + addToMemory(profileFromDb); + } + } else if (dbState == TaskStateEnum.FROZEN) { + if (task != null) { + LOGGER.info("traverseDbTasksToMemory delete task from mem taskId {}", + profileFromDb.getTaskId()); + deleteFromMemory(profileFromDb.getTaskId()); + } + } else { + if (dbState != TaskStateEnum.FINISH) { + LOGGER.error("task {} invalid state {}", profileFromDb.getTaskId(), dbState); + } + } + }); + } + + /** + * task in taskMap but not in taskDb then delete + * task in taskMap but task state from db is FROZEN then delete + */ + private void traverseMemoryTasksToDb() { + taskMap.values().forEach((task) -> { + TaskProfile profileFromDb = taskDb.getTask(task.getTaskId()); + if (profileFromDb == null) { + deleteFromMemory(task.getTaskId()); + return; + } + TaskStateEnum stateFromDb = profileFromDb.getState(); + if (stateFromDb != TaskStateEnum.RUNNING) { + deleteFromMemory(task.getTaskId()); + } + }); + } + + /** + * add task profile to db + * if task state is RUNNING then add task to memory + */ + private void addTask(TaskProfile taskProfile) { + if (taskMap.size() >= taskMaxLimit) { + LOGGER.error("taskMap size {} over limit {}", taskMap.size(), taskMaxLimit); + return; + } + addToDb(taskProfile); + TaskStateEnum state = TaskStateEnum.getTaskState(taskProfile.getInt(TASK_STATE)); + if (state == TaskStateEnum.RUNNING) { + addToMemory(taskProfile); + } else { + LOGGER.info("taskId {} state {} no need to add to memory", taskProfile.getTaskId(), + taskProfile.getState()); + } + } + + private void deleteTask(TaskProfile taskProfile) { + deleteFromDb(taskProfile); + deleteFromMemory(taskProfile.getTaskId()); + } + + private void freezeTask(TaskProfile taskProfile) { + updateToDb(taskProfile); + deleteFromMemory(taskProfile.getTaskId()); + } + + private void finishTask(TaskProfile taskProfile) { + taskProfile.setState(TaskStateEnum.FINISH); + updateToDb(taskProfile); + deleteFromMemory(taskProfile.getTaskId()); + } + + private void activeTask(TaskProfile taskProfile) { + updateToDb(taskProfile); + addToMemory(taskProfile); + } + + private void restoreFromDb() { + List<TaskProfile> taskProfileList = taskDb.getTasks(); + taskProfileList.forEach((profile) -> { + if (profile.getState() == TaskStateEnum.RUNNING) { + LOGGER.info("restoreFromDb taskId {}", profile.getTaskId()); + addToMemory(profile); + } + }); + } + + private void stopAllTasks() { + taskMap.values().forEach((task) -> { + task.destroy(); + }); + taskMap.clear(); + } + + /** + * add task to db, it was expected that there is no record refer the task id. + * cause the task id will change if the task content changes, replace the record + * if it is found, the memory record will be updated by the db. + */ + private void addToDb(TaskProfile taskProfile) { + if (taskDb.getTask(taskProfile.getTaskId()) != null) { + LOGGER.error("task {} should not exist", taskProfile); + } + taskDb.storeTask(taskProfile); + } + + private void deleteFromDb(TaskProfile taskProfile) { + if (taskDb.getTask(taskProfile.getTaskId()) == null) { + LOGGER.error("try to delete task {} but not found in db", taskProfile); + return; + } + taskDb.deleteTask(taskProfile.getTaskId()); + } + + private void updateToDb(TaskProfile taskProfile) { + if (taskDb.getTask(taskProfile.getTaskId()) == null) { + LOGGER.error("task {} not found, agent may have been reinstalled", taskProfile); + } + taskDb.storeTask(taskProfile); + } + + /** + * add task to memory, if there is a record refer to the task id exist we need to destroy it first. + */ + private void addToMemory(TaskProfile taskProfile) { + Task oldTask = taskMap.get(taskProfile.getTaskId()); + if (oldTask != null) { + oldTask.destroy(); + taskMap.remove(taskProfile.getTaskId()); + LOGGER.error("old task {} should not exist, try stop it first", + taskProfile); + } + try { + Class<?> taskClass = Class.forName(taskProfile.getTaskClass()); + Task task = (Task) taskClass.newInstance(); + task.init(this, taskProfile, instanceBasicDb); + taskMap.put(taskProfile.getTaskId(), task); + runningPool.submit(task); + LOGGER.info( + "add task {} into memory, taskMap size {}, runningPool task total {}, runningPool task active {}", + task.getTaskId(), taskMap.size(), runningPool.getTaskCount(), + runningPool.getActiveCount()); + } catch (Throwable t) { + LOGGER.error("add task error {}", t.getMessage()); + } + } + + private void deleteFromMemory(String taskId) { + Task oldTask = taskMap.get(taskId); + if (oldTask == null) { + LOGGER.error("old task {} not found", taskId); + return; + } + oldTask.destroy(); + taskMap.remove(oldTask.getTaskId()); + LOGGER.info( + "delete task {} from memory, taskMap size {}, runningPool task total {}, runningPool task active {}", + oldTask.getTaskId(), taskMap.size(), runningPool.getTaskCount(), + runningPool.getActiveCount()); + } + + public Task getTask(String taskId) { + return taskMap.get(taskId); + } + + public TaskProfile getTaskProfile(String taskId) { + return taskDb.getTask(taskId); + } + + @Override + public void start() throws Exception { + restoreFromDb(); + submitWorker(coreThread()); + } + + @Override + public void stop() throws Exception { + stopAllTasks(); + waitForTerminate(); + runningPool.shutdown(); + } +} diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java index ea771e96f4..d3d593d345 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java @@ -18,8 +18,14 @@ package org.apache.inlong.agent.core; import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig; +import org.apache.inlong.common.enums.TaskStateEnum; +import org.apache.inlong.common.pojo.agent.DataConfig; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +39,8 @@ import java.nio.file.Paths; public class AgentBaseTestsHelper { private static final Logger LOGGER = LoggerFactory.getLogger(AgentBaseTestsHelper.class); - + private static final GsonBuilder gsonBuilder = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss"); + private static final Gson GSON = gsonBuilder.create(); private final String className; private Path testRootDir; @@ -64,4 +71,32 @@ public class AgentBaseTestsHelper { } } } + + public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, Long startTime, Long endTime, + TaskStateEnum state) { + DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state); + TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig); + return profile; + } + + private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime, + TaskStateEnum state) { + DataConfig dataConfig = new DataConfig(); + dataConfig.setInlongGroupId("testGroupId"); + dataConfig.setInlongStreamId("testStreamId"); + dataConfig.setDataReportType(1); + dataConfig.setTaskType(3); + dataConfig.setTaskId(taskId); + dataConfig.setState(state.ordinal()); + FileTaskConfig fileTaskConfig = new FileTaskConfig(); + fileTaskConfig.setPattern(pattern); + fileTaskConfig.setTimeOffset("0d"); + fileTaskConfig.setMaxFileCount(100); + fileTaskConfig.setCycleUnit("D"); + fileTaskConfig.setRetry(retry); + fileTaskConfig.setStartTime(startTime); + fileTaskConfig.setEndTime(endTime); + dataConfig.setExtParams(GSON.toJson(fileTaskConfig)); + return dataConfig; + } } diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java new file mode 100644 index 0000000000..23c5ad5cc5 --- /dev/null +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/MockTask.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.core.task; + +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.core.task.file.TaskManager; +import org.apache.inlong.agent.db.Db; +import org.apache.inlong.agent.plugin.file.Task; + +import java.io.IOException; + +public class MockTask extends Task { + + public static final int INIT_TIME = 100; + public static final int RUN_TIME = 101; + public static final int DESTROY_TIME = 102; + private TaskProfile profile; + private long index = INIT_TIME; + public long initTime = 0; + public long destroyTime = 0; + public long runtime = 0; + private TaskManager manager; + + @Override + public void init(Object srcManager, TaskProfile profile, Db basicDb) throws IOException { + manager = (TaskManager) srcManager; + this.profile = profile; + } + + @Override + public void destroy() { + destroyTime = index++; + } + + @Override + public TaskProfile getProfile() { + return profile; + } + + @Override + public String getTaskId() { + return profile.getTaskId(); + } + + @Override + public void addCallbacks() { + + } + + @Override + public void run() { + runtime = index++; + } +} \ No newline at end of file diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java new file mode 100755 index 0000000000..bf9047c40a --- /dev/null +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.core.task; + +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.core.AgentBaseTestsHelper; +import org.apache.inlong.agent.core.task.file.TaskManager; +import org.apache.inlong.common.enums.TaskStateEnum; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; + +public class TestTaskManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(TestTaskManager.class); + private static TaskManager manager; + private static AgentBaseTestsHelper helper; + + @BeforeClass + public static void setup() { + helper = new AgentBaseTestsHelper(TestTaskManager.class.getName()).setupAgentHome(); + try { + manager = new TaskManager(); + manager.start(); + } catch (Exception e) { + Assert.assertTrue("manager start error", false); + } + } + + @AfterClass + public static void teardown() throws Exception { + manager.stop(); + helper.teardownAgentHome(); + } + + @Test + public void testTaskManager() { + String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; + TaskProfile taskProfile1 = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); + String taskId1 = taskProfile1.getTaskId(); + taskProfile1.setTaskClass(MockTask.class.getCanonicalName()); + List<TaskProfile> taskProfiles1 = new ArrayList<>(); + taskProfiles1.add(taskProfile1); + // test add + manager.submitTaskProfiles(taskProfiles1); + await().atMost(3, TimeUnit.SECONDS).until(() -> manager.getTask(taskId1) != null); + LOGGER.info("state {}", manager.getTaskProfile(taskId1).getState()); + Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == TaskStateEnum.RUNNING); + + // test froze + taskProfile1.setState(TaskStateEnum.FROZEN); + manager.submitTaskProfiles(taskProfiles1); + await().atMost(3, TimeUnit.SECONDS).until(() -> manager.getTask(taskId1) == null); + Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == TaskStateEnum.FROZEN); + taskProfile1.setState(TaskStateEnum.RUNNING); + manager.submitTaskProfiles(taskProfiles1); + await().atMost(3, TimeUnit.SECONDS).until(() -> manager.getTask(taskId1) != null); + Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == TaskStateEnum.RUNNING); + + // test delete + TaskProfile taskProfile2 = helper.getTaskProfile(2, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); + taskProfile2.setTaskClass(MockTask.class.getCanonicalName()); + List<TaskProfile> taskProfiles2 = new ArrayList<>(); + taskProfiles2.add(taskProfile2); + manager.submitTaskProfiles(taskProfiles2); + await().atMost(3, TimeUnit.SECONDS).until(() -> manager.getTask(taskId1) == null); + Assert.assertTrue(manager.getTaskProfile(taskId1) == null); + String taskId2 = taskProfile2.getTaskId(); + await().atMost(3, TimeUnit.SECONDS).until(() -> manager.getTask(taskId2) != null); + Assert.assertTrue(manager.getTaskProfile(taskId2).getState() == TaskStateEnum.RUNNING); + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java index 9a9f8ece96..363b029783 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java @@ -220,10 +220,10 @@ public class NewDateUtils { timeInterval = DAY_TIMEOUT_INTERVAL; } - // 处理偏移量,超时周期要加上时间偏移偏移量 + // To handle the offset, add the time offset to the timeout period if (timeOffset.startsWith("-")) { timeInterval += caclOffset(timeOffset); - } else { // 处理向后偏移 + } else { // Process Backward Offset timeInterval -= caclOffset(timeOffset); } @@ -231,20 +231,23 @@ public class NewDateUtils { } /** - * 根据偏移量计算偏移时间 - * 当前偏移只会向前偏移,也可向后偏移为兼容之前的计算方式(相减),当为向后偏移时,返回负;当向前偏移,返回正 + * Calculate offset time based on offset + * The current offset will only be offset forward, or it can be offset backward to be compatible with the previous + * calculation method (subtraction). + * When it is offset backward, it returns negative; + * When offset forward, return positive * - * @param timeOffset 偏移量,如-1d,-4h,-10m等; + * @param timeOffset offset,such as -1d,-4h,-10m; * @return */ public static long caclOffset(String timeOffset) { String offsetUnit = timeOffset.substring(timeOffset.length() - 1); int startIndex = timeOffset.charAt(0) == '-' ? 1 : 0; - // 默认向后偏移 + // Default Backward Offset int symbol = 1; if (startIndex == 1) { symbol = 1; - } else if (startIndex == 0) { // 向前偏移 + } else if (startIndex == 0) { // Forward offset symbol = -1; } int offsetTime = Integer @@ -410,7 +413,8 @@ public class NewDateUtils { Matcher mat = Pattern.compile(longestDatePattern).matcher(fileName); boolean find = mat.find(); - // TODO : 存在文件名中有多个部分匹配到时间表达式的情况("/data/joox_logs/2000701106/201602170040.log" YYYYMMDDhh) + // TODO : more than one part match the time regex in file name ("/data/joox_logs/2000701106/201602170040.log" + // YYYYMMDDhh) if (!find) { logger.error("Can't find the pattern {} for file name {}", longestDatePattern, fileName); @@ -717,221 +721,4 @@ public class NewDateUtils { return ret; } - - public static void main(String[] args) throws Exception { - - // String aa = "/data/taox/YYYYMMDDt_log/[0-9]+_YYYYMMDD_hh00.log"; - /* - * String aa = "/data/taox/YYYYt_logMMDD/[0-9]+_YYYYMMDD_hh00.log"; String bb = - * replaceDateExpressionWithRegex(aa); System.out.println("---------: " + bb); - * - * String cc = replaceDateExpression(Calendar.getInstance(), aa); System.out.println("---------: " + cc); - * - * String dd = replaceDateExpression1(Calendar.getInstance(), aa); System.out.println("---------: " + dd); - */ - - // String text = "/data1/qq_BaseInfo/YYYY-MM/YYYY-MM-DD/gamedr.gamedb[0-9]+.minigame - // .db/YYYY-MM-DD-[0-9]+.txt"; - // System.out.println(replaceDateExpressionWithRegex(text)); - // - // int timeInterval = 1000; - // String timeOffset = "10H"; - // - // String offsetUnit = timeOffset.substring(timeOffset.length() - 1); - // int startIndex = timeOffset.charAt(0) == '-' ? 1 : 0; - // int offsetTime = Integer.parseInt(timeOffset.substring(startIndex, timeOffset.length() - // - 1)); - // if("d".equalsIgnoreCase(offsetUnit)){ - // timeInterval += offsetTime * 24 * 3600 * 1000; - // }else if("h".equalsIgnoreCase(offsetUnit)){ - // timeInterval += offsetTime * 3600 * 1000; - // }else if("m".equalsIgnoreCase(offsetUnit)){ - // timeInterval += offsetTime * 60 * 1000; - // } - // - // System.out.println(timeInterval); - // - // SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd HH:mm:ss"); - // - // Calendar calendar = NewDateUtils.getCurDate("D", "-10D"); - // System.out.println("year: " + calendar.get(Calendar.YEAR) + ", month: " - // + (calendar.get(Calendar.MONTH) + 1) + ", day: " - // + calendar.get(Calendar.DAY_OF_MONTH) + ", hour: " - // + calendar.get(Calendar.HOUR_OF_DAY) + ", minute: " - // + calendar.get(Calendar.MINUTE) + ", second: " - // + calendar.get(Calendar.SECOND)); - // System.out.println(dateFormat.format(calendar.getTimeInMillis())); - // - // calendar = getCurDate("H", "-2H"); - // System.out.println("year: " + calendar.get(Calendar.YEAR) + ", month: " - // + (calendar.get(Calendar.MONTH) + 1) + ", day: " - // + calendar.get(Calendar.DAY_OF_MONTH) + ", hour: " - // + calendar.get(Calendar.HOUR_OF_DAY) + ", minute: " - // + calendar.get(Calendar.MINUTE) + ", second: " - // + calendar.get(Calendar.SECOND)); - // System.out.println(dateFormat.format(calendar.getTimeInMillis())); - // - // calendar = getCurDate("H", "-2D"); - // System.out.println("year: " + calendar.get(Calendar.YEAR) + ", month: " - // + (calendar.get(Calendar.MONTH) + 1) + ", day: " - // + calendar.get(Calendar.DAY_OF_MONTH) + ", hour: " - // + calendar.get(Calendar.HOUR_OF_DAY) + ", minute: " - // + calendar.get(Calendar.MINUTE) + ", second: " - // + calendar.get(Calendar.SECOND)); - // System.out.println(dateFormat.format(calendar.getTimeInMillis())); - // - // calendar = getCurDate("5m", "-20m"); - // System.out.println("year: " + calendar.get(Calendar.YEAR) + ", month: " - // + (calendar.get(Calendar.MONTH) + 1) + ", day: " - // + calendar.get(Calendar.DAY_OF_MONTH) + ", hour: " - // + calendar.get(Calendar.HOUR_OF_DAY) + ", minute: " - // + calendar.get(Calendar.MINUTE) + ", second: " - // + calendar.get(Calendar.SECOND)); - // System.out.println(dateFormat.format(calendar.getTimeInMillis())); - // - // String directory = "/data/home/user00/xyshome/logsvr/log/YYYYMMDD/[0-9]+_YYYYMMDD_hh00 - // .log"; - // calendar = getCurDate("H", "-3H"); - // System.out.println(replaceDateExpression(calendar, directory)); - // - // System.out.println(NewDateUtils.timeStrConvertTomillSec("201404031105", - // "m")); - // System.out.println(NewDateUtils.timeStrConvertTomillSec("2014040223", - // "H")); - // System.out.println(NewDateUtils - // .timeStrConvertTomillSec("20140402", "D")); - // - // System.out.println(NewDateUtils.millSecConvertToTimeStr( - // System.currentTimeMillis(), "Y")); - // System.out.println(NewDateUtils.millSecConvertToTimeStr( - // System.currentTimeMillis(), "M")); - // System.out.println(NewDateUtils.millSecConvertToTimeStr( - // System.currentTimeMillis(), "D")); - // System.out.println(NewDateUtils.millSecConvertToTimeStr( - // System.currentTimeMillis(), "H")); - // System.out.println(NewDateUtils.millSecConvertToTimeStr( - // System.currentTimeMillis(), "10m")); - // System.out.println(NewDateUtils.millSecConvertToTimeStr( - // System.currentTimeMillis(), "15m")); - // System.out.println(NewDateUtils.millSecConvertToTimeStr( - // System.currentTimeMillis(), "30m")); - // System.out.println(NewDateUtils.millSecConvertToTimeStr( - // NewDateUtils.timeStrConvertTomillSec("201404121900", "10m"), - // "10m")); - // - // NewDateUtils.getDateRegion("20120810", "20120813", "D"); - // NewDateUtils.getDateRegion("2012081005", "2012081300", "H"); - // NewDateUtils.getDateRegion("201404111649", "201404111600", "10m"); - // String dataTime = "20160122"; - // System.out.println(NewDateUtils.getShouldStartTime(dataTime, "D", "-2h")); - - // String dataPath = "/data/herococo/YYYYMMDD_*/YYYYMMDDhhmm.log"; - // dataPath = NewDateUtils.replaceDateExpressionWithRegex(dataPath); - // System.out.println("dataPath: " + dataPath); - // - // Pattern pattern = Pattern.compile(dataPath, Pattern.CASE_INSENSITIVE - // | Pattern.DOTALL | Pattern.MULTILINE); - // // Pattern pattern = Pattern.compile("/data/herococo/\\d+/\\d+.log", - // // Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE); - // Matcher m = pattern - // .matcher("/data/herococo/20140406_a/20140406152730.log"); - // System.out.println(m.matches()); - // - // dataPath = "/data/home/user00/xyshome/logsvr/log/YYYYMMDD/[0-9]+_YYYYMMDD_hh00.log"; - // dataPath = NewDateUtils.replaceDateExpressionWithRegex(dataPath); - // pattern = Pattern.compile(dataPath, Pattern.CASE_INSENSITIVE - // | Pattern.DOTALL | Pattern.MULTILINE); - // m = pattern - // .matcher("/data/home/user00/xyshome/logsvr/log/20140406/8_20140406_1600.log"); - // System.out.println(dataPath); - // System.out.println(m.matches()); - // - // dataPath = "/data/work/data2/abc/YYYYMMDDhh.*.txt"; - // dataPath = NewDateUtils.replaceDateExpressionWithRegex(dataPath); - // pattern = Pattern.compile(dataPath, Pattern.CASE_INSENSITIVE - // | Pattern.DOTALL | Pattern.MULTILINE); - // m = pattern.matcher("/data/work/data2/abc/201404102242.txt"); - // System.out.println(dataPath); - // System.out.println(m.matches()); - // - // List<Long> retTimeList = NewDateUtils.getDateRegion("20140411", - // "20140411", "D"); - // for (Long time : retTimeList) { - // System.out.println(NewDateUtils.millSecConvertToTimeStr(time, "D")); - // } - // - // pattern = Pattern - // .compile( - // "/data/home/tlog/logplat/log/tlogd_1/[0-9]+_\\d{4}\\d{2}\\d{2}_\\d{2}00 - // .log", - // Pattern.CASE_INSENSITIVE | Pattern.DOTALL - // | Pattern.MULTILINE); - // m = pattern - // .matcher("/data/home/tlog/logplat/log/tlogd_1/65535_20140506_1600.log.1"); - // System.out.println(m.matches()); - // System.out.println(m.lookingAt()); - // - // String unit = "h"; - // if (StringUtils.endsWithIgnoreCase("h", "H")) { - // System.out.println("yes"); - // } - // - // System.out.println(NewDateUtils.getDateTime("20160106", "D", "-4h")); - - // PathDateExpression dateExpression = DateUtils - // .extractLongestTimeRegexWithPrefixOrSuffix - // ("/data/log/qqtalk/[0-9]+_[0-9]+_id20522_[0-9]+_YYYYMMDD_hh.log"); - // System.out.println(dateExpression.getLongestDatePattern()); - // String fileTime = getDateTime("/data/log/qqtalk/3900626911_11217_id20522_17_20160420 - // .log", dateExpression); - // System.out.println(fileTime); - - // String dataTime = "20180411"; - // - // String shouldStart = getShouldStartTime(dataTime, "D", "4h"); - // System.out.println(shouldStart); - // - // String fileName = "rc_trade_water[0-9]*.YYYY-MM-DD-hh.[0-9]+"; - // - // String newFileName = "rc_trade_water.2016-11-20-12.9"; - // - // /** - // * 打印出文件名中 最长的时间表达式 - // */ - // PathDateExpression dateExpression = DateUtils.extractLongestTimeRegexWithPrefixOrSuffix - // (fileName); - // System.out.println(dateExpression.getLongestDatePattern()); - // - // /** - // * 检查正则表达式是否能匹配到文件 - // */ - // Pattern pattern = Pattern.compile(replaceDateExpressionWithRegex(fileName), - // Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE); - // - // Matcher matcher = pattern.matcher(newFileName); - // - // if (matcher.matches() || matcher.lookingAt()) { - // System.out.println("Matched File"); - // } - // - // /** - // * 打印文件名的时间 - // */ - // String fileTime = getDateTime(newFileName, dateExpression); - // System.out.println(fileTime); - // - // - // String fileName1 = "/data/joox_logs/2000701106/201602170040.log"; - // String filePathRegx = "/data/joox_logs/2000701106/{YYYYMMDDhh}40.log"; - // String fullRegx = replaceDateExpressionWithRegex(filePathRegx, "dateTimeGN"); - // System.out.println(fullRegx); - // Pattern fullPattern = Pattern.compile(fullRegx); - // Matcher fullMatcher = fullPattern.matcher(fileName1); - // while (fullMatcher.find()) { - // System.out.println(fullMatcher.group("dateTimeGN")); - // } - - System.out.println( - timeStrConvertTomillSec("2018111209", "h", TimeZone.getTimeZone("GMT+8:00"))); - } }