This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d1cd775e16 [INLONG-9124][Agent] Add task and instance db (#9130)
d1cd775e16 is described below

commit d1cd775e164f28bdead9e21967cf296eb71b35cc
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Fri Oct 27 09:38:33 2023 +0800

    [INLONG-9124][Agent] Add task and instance db (#9130)
---
 .../org/apache/inlong/agent/db/InstanceDb.java     | 112 +++++++++++++++++++++
 .../org/apache/inlong/agent/db/KeyValueEntity.java |  16 +++
 .../org/apache/inlong/agent/db/TaskProfileDb.java  | 101 +++++++++++++++++++
 3 files changed, 229 insertions(+)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
new file mode 100644
index 0000000000..dfc8a17055
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/InstanceDb.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.db;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * db interface for instance profile.
+ */
+public class InstanceDb {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskProfileDb.class);
+
+    private final Db db;
+
+    public InstanceDb(Db db) {
+        this.db = db;
+    }
+
+    /**
+     * get instance list from db.
+     *
+     * @return list of task
+     */
+    public List<InstanceProfile> getInstances(String taskId) {
+        List<KeyValueEntity> result = this.db.findAll(getKeyByTaskId(taskId));
+        List<InstanceProfile> instanceList = new ArrayList<>();
+        for (KeyValueEntity entity : result) {
+            instanceList.add(entity.getAsInstanceProfile());
+        }
+        return instanceList;
+    }
+
+    /**
+     * store instance profile.
+     *
+     * @param instance instance
+     */
+    public void storeInstance(InstanceProfile instance) {
+        if (instance.allRequiredKeyExist()) {
+            String keyName = 
getKeyByTaskAndInstanceId(instance.get(TaskConstants.TASK_ID),
+                    instance.get(TaskConstants.INSTANCE_ID));
+            KeyValueEntity entity = new KeyValueEntity(keyName,
+                    instance.toJsonStr(), 
instance.get(TaskConstants.INSTANCE_ID));
+            KeyValueEntity oldEntity = db.put(entity);
+            if (oldEntity != null) {
+                LOGGER.warn("instance profile {} has been replaced", 
oldEntity.getKey());
+            }
+        } else {
+            LOGGER.error("instance profile invalid!");
+        }
+    }
+
+    /**
+     * get instance profile.
+     *
+     * @param taskId task id from manager
+     * @param instanceId it can be file name(file collect), table name(db 
sync) etc
+     */
+    public InstanceProfile getInstance(String taskId, String instanceId) {
+        KeyValueEntity result = this.db.get(getKeyByTaskAndInstanceId(taskId, 
instanceId));
+        if (result == null) {
+            return null;
+        }
+        return result.getAsInstanceProfile();
+    }
+
+    /**
+     * delete instance
+     *
+     * @param taskId task id from manager
+     * @param instanceId it can be file name(file collect), table name(db 
sync) etc
+     */
+    public void deleteInstance(String taskId, String instanceId) {
+        db.remove(getKeyByTaskAndInstanceId(taskId, instanceId));
+    }
+
+    private String getKey() {
+        return CommonConstants.INSTANCE_ID_PREFIX;
+    }
+
+    private String getKeyByTaskId(String taskId) {
+        return CommonConstants.INSTANCE_ID_PREFIX + taskId;
+    }
+
+    private String getKeyByTaskAndInstanceId(String taskId, String instanceId) 
{
+        return CommonConstants.INSTANCE_ID_PREFIX + taskId + "_" + instanceId;
+    }
+}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
index 5ca4dadcb6..528f17f61c 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
@@ -17,8 +17,10 @@
 
 package org.apache.inlong.agent.db;
 
+import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.conf.OffsetProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.conf.TriggerProfile;
 
 /**
@@ -98,6 +100,20 @@ public class KeyValueEntity {
         return OffsetProfile.parseJsonStr(getJsonValue());
     }
 
+    /**
+     * convert keyValue to task profile
+     */
+    public TaskProfile getAsTaskProfile() {
+        return TaskProfile.parseJsonStr(getJsonValue());
+    }
+
+    /**
+     * convert keyValue to instance profile
+     */
+    public InstanceProfile getAsInstanceProfile() {
+        return InstanceProfile.parseJsonStr(getJsonValue());
+    }
+
     /**
      * check whether the entity is finished
      */
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java
new file mode 100755
index 0000000000..b524bb09db
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/TaskProfileDb.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.db;
+
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * db interface for task profile.
+ */
+public class TaskProfileDb {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskProfileDb.class);
+
+    private final Db db;
+
+    public TaskProfileDb(Db db) {
+        this.db = db;
+    }
+
+    /**
+     * get task list from db.
+     *
+     * @return list of task
+     */
+    public List<TaskProfile> getTasks() {
+        List<KeyValueEntity> result = this.db.findAll(getKey());
+        List<TaskProfile> taskList = new ArrayList<>();
+        for (KeyValueEntity entity : result) {
+            taskList.add(entity.getAsTaskProfile());
+        }
+        return taskList;
+    }
+
+    /**
+     * store task profile.
+     *
+     * @param task task
+     */
+    public void storeTask(TaskProfile task) {
+        if (task.allRequiredKeyExist()) {
+            String keyName = getKeyByTaskId(task.getTaskId());
+            KeyValueEntity entity = new KeyValueEntity(keyName,
+                    task.toJsonStr(), 
task.get(TaskConstants.FILE_DIR_FILTER_PATTERNS));
+            KeyValueEntity oldEntity = db.put(entity);
+            if (oldEntity != null) {
+                LOGGER.warn("task profile {} has been replaced", 
oldEntity.getKey());
+            }
+        }
+    }
+
+    /**
+     * get task profile.
+     *
+     * @param taskId taskId
+     */
+    public TaskProfile getTask(String taskId) {
+        KeyValueEntity result = this.db.get(getKeyByTaskId(taskId));
+        if (result == null) {
+            return null;
+        }
+        return result.getAsTaskProfile();
+    }
+
+    /**
+     * delete task by id.
+     */
+    public void deleteTask(String taskId) {
+        db.remove(getKeyByTaskId(taskId));
+    }
+
+    private String getKey() {
+        return CommonConstants.TASK_ID_PREFIX;
+    }
+
+    private String getKeyByTaskId(String taskId) {
+        return CommonConstants.TASK_ID_PREFIX + taskId;
+    }
+}

Reply via email to