This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d1cd775e16 [INLONG-9124][Agent] Add task and instance db (#9130) d1cd775e16 is described below commit d1cd775e164f28bdead9e21967cf296eb71b35cc Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Fri Oct 27 09:38:33 2023 +0800 [INLONG-9124][Agent] Add task and instance db (#9130) --- .../org/apache/inlong/agent/db/InstanceDb.java | 112 +++++++++++++++++++++ .../org/apache/inlong/agent/db/KeyValueEntity.java | 16 +++ .../org/apache/inlong/agent/db/TaskProfileDb.java | 101 +++++++++++++++++++ 3 files changed, 229 insertions(+) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java new file mode 100644 index 0000000000..dfc8a17055 --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.db; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.constant.TaskConstants; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * db interface for instance profile. + */ +public class InstanceDb { + + private static final Logger LOGGER = LoggerFactory.getLogger(TaskProfileDb.class); + + private final Db db; + + public InstanceDb(Db db) { + this.db = db; + } + + /** + * get instance list from db. + * + * @return list of task + */ + public List<InstanceProfile> getInstances(String taskId) { + List<KeyValueEntity> result = this.db.findAll(getKeyByTaskId(taskId)); + List<InstanceProfile> instanceList = new ArrayList<>(); + for (KeyValueEntity entity : result) { + instanceList.add(entity.getAsInstanceProfile()); + } + return instanceList; + } + + /** + * store instance profile. + * + * @param instance instance + */ + public void storeInstance(InstanceProfile instance) { + if (instance.allRequiredKeyExist()) { + String keyName = getKeyByTaskAndInstanceId(instance.get(TaskConstants.TASK_ID), + instance.get(TaskConstants.INSTANCE_ID)); + KeyValueEntity entity = new KeyValueEntity(keyName, + instance.toJsonStr(), instance.get(TaskConstants.INSTANCE_ID)); + KeyValueEntity oldEntity = db.put(entity); + if (oldEntity != null) { + LOGGER.warn("instance profile {} has been replaced", oldEntity.getKey()); + } + } else { + LOGGER.error("instance profile invalid!"); + } + } + + /** + * get instance profile. + * + * @param taskId task id from manager + * @param instanceId it can be file name(file collect), table name(db sync) etc + */ + public InstanceProfile getInstance(String taskId, String instanceId) { + KeyValueEntity result = this.db.get(getKeyByTaskAndInstanceId(taskId, instanceId)); + if (result == null) { + return null; + } + return result.getAsInstanceProfile(); + } + + /** + * delete instance + * + * @param taskId task id from manager + * @param instanceId it can be file name(file collect), table name(db sync) etc + */ + public void deleteInstance(String taskId, String instanceId) { + db.remove(getKeyByTaskAndInstanceId(taskId, instanceId)); + } + + private String getKey() { + return CommonConstants.INSTANCE_ID_PREFIX; + } + + private String getKeyByTaskId(String taskId) { + return CommonConstants.INSTANCE_ID_PREFIX + taskId; + } + + private String getKeyByTaskAndInstanceId(String taskId, String instanceId) { + return CommonConstants.INSTANCE_ID_PREFIX + taskId + "_" + instanceId; + } +} diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java index 5ca4dadcb6..528f17f61c 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java @@ -17,8 +17,10 @@ package org.apache.inlong.agent.db; +import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.JobProfile; import org.apache.inlong.agent.conf.OffsetProfile; +import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.conf.TriggerProfile; /** @@ -98,6 +100,20 @@ public class KeyValueEntity { return OffsetProfile.parseJsonStr(getJsonValue()); } + /** + * convert keyValue to task profile + */ + public TaskProfile getAsTaskProfile() { + return TaskProfile.parseJsonStr(getJsonValue()); + } + + /** + * convert keyValue to instance profile + */ + public InstanceProfile getAsInstanceProfile() { + return InstanceProfile.parseJsonStr(getJsonValue()); + } + /** * check whether the entity is finished */ diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java new file mode 100755 index 0000000000..b524bb09db --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.db; + +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.constant.TaskConstants; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * db interface for task profile. + */ +public class TaskProfileDb { + + private static final Logger LOGGER = LoggerFactory.getLogger(TaskProfileDb.class); + + private final Db db; + + public TaskProfileDb(Db db) { + this.db = db; + } + + /** + * get task list from db. + * + * @return list of task + */ + public List<TaskProfile> getTasks() { + List<KeyValueEntity> result = this.db.findAll(getKey()); + List<TaskProfile> taskList = new ArrayList<>(); + for (KeyValueEntity entity : result) { + taskList.add(entity.getAsTaskProfile()); + } + return taskList; + } + + /** + * store task profile. + * + * @param task task + */ + public void storeTask(TaskProfile task) { + if (task.allRequiredKeyExist()) { + String keyName = getKeyByTaskId(task.getTaskId()); + KeyValueEntity entity = new KeyValueEntity(keyName, + task.toJsonStr(), task.get(TaskConstants.FILE_DIR_FILTER_PATTERNS)); + KeyValueEntity oldEntity = db.put(entity); + if (oldEntity != null) { + LOGGER.warn("task profile {} has been replaced", oldEntity.getKey()); + } + } + } + + /** + * get task profile. + * + * @param taskId taskId + */ + public TaskProfile getTask(String taskId) { + KeyValueEntity result = this.db.get(getKeyByTaskId(taskId)); + if (result == null) { + return null; + } + return result.getAsTaskProfile(); + } + + /** + * delete task by id. + */ + public void deleteTask(String taskId) { + db.remove(getKeyByTaskId(taskId)); + } + + private String getKey() { + return CommonConstants.TASK_ID_PREFIX; + } + + private String getKeyByTaskId(String taskId) { + return CommonConstants.TASK_ID_PREFIX + taskId; + } +}