This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a194f575f9 [INLONG-9117][Agent] Rewrite class RocksDbImp to enable it 
to be constructed with a child path (#9119)
a194f575f9 is described below

commit a194f575f9cf72a005634e5f20144668013e9e09
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Thu Oct 26 16:58:02 2023 +0800

    [INLONG-9117][Agent] Rewrite class RocksDbImp to enable it to be 
constructed with a child path (#9119)
---
 .../org/apache/inlong/agent/db/KeyValueEntity.java |  8 ++++
 .../org/apache/inlong/agent/db/RocksDbImp.java     | 46 ++++++----------------
 .../org/apache/inlong/agent/db/TestRocksDbImp.java | 37 +----------------
 .../org/apache/inlong/agent/core/AgentManager.java |  8 ++--
 .../inlong/agent/plugin/utils/RocksDBUtils.java    |  6 ++-
 5 files changed, 29 insertions(+), 76 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
index 5cbda1e4f0..5ca4dadcb6 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.agent.db;
 
 import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.conf.OffsetProfile;
 import org.apache.inlong.agent.conf.TriggerProfile;
 
 /**
@@ -90,6 +91,13 @@ public class KeyValueEntity {
         return TriggerProfile.parseJsonStr(getJsonValue());
     }
 
+    /**
+     * convert keyValue to offset profile
+     */
+    public OffsetProfile getAsOffsetProfile() {
+        return OffsetProfile.parseJsonStr(getJsonValue());
+    }
+
     /**
      * check whether the entity is finished
      */
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
index d8dd2aa522..eac8479aa4 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java
@@ -62,10 +62,10 @@ public class RocksDbImp implements Db {
     private ConcurrentHashMap<String, ColumnFamilyDescriptor> 
columnDescriptorMap;
     private String storePath;
 
-    public RocksDbImp() {
+    public RocksDbImp(String childPath) {
         // init rocks db
         this.conf = AgentConfiguration.getAgentConf();
-        this.db = initEnv();
+        this.db = initEnv(childPath);
         // add a command column family
         addColumnFamily(commandFamilyName);
     }
@@ -74,10 +74,10 @@ public class RocksDbImp implements Db {
         return new ColumnFamilyDescriptor(columnFamilyName, new 
ColumnFamilyOptions());
     }
 
-    private RocksDB initEnv() {
-        String configPath = conf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH);
+    private RocksDB initEnv(String childPath) {
         String parentPath = conf.get(AgentConstants.AGENT_HOME, 
AgentConstants.DEFAULT_AGENT_HOME);
-        File finalPath = new File(parentPath, configPath);
+        LOGGER.info("parentPath {} childPath {}", parentPath, childPath);
+        File finalPath = new File(parentPath, childPath);
         storePath = finalPath.getAbsolutePath();
         RocksDB.loadLibrary();
         final Options options = new Options();
@@ -160,23 +160,12 @@ public class RocksDbImp implements Db {
 
     @Override
     public CommandEntity getCommand(String commandId) {
-        try {
-            byte[] bytes = db.get(columnHandlesMap.get(commandFamilyName), 
commandId.getBytes());
-            return bytes == null ? null : GSON.fromJson(new String(bytes), 
CommandEntity.class);
-        } catch (Exception e) {
-            throw new RuntimeException("get command value error", e);
-        }
+        return null;
     }
 
     @Override
     public CommandEntity putCommand(CommandEntity entity) {
-        requireNonNull(entity);
-        try {
-            db.put(columnHandlesMap.get(commandFamilyName), 
entity.getId().getBytes(), GSON.toJson(entity).getBytes());
-        } catch (Exception e) {
-            throw new RuntimeException("put value to rocks db error", e);
-        }
-        return entity;
+        return null;
     }
 
     @Override
@@ -228,6 +217,11 @@ public class RocksDbImp implements Db {
         return results;
     }
 
+    @Override
+    public List<CommandEntity> searchCommands(boolean isAcked) {
+        return null;
+    }
+
     @Override
     public List<KeyValueEntity> search(StateSearchKey searchKey) {
         List<KeyValueEntity> results = new LinkedList<>();
@@ -260,22 +254,6 @@ public class RocksDbImp implements Db {
         return results;
     }
 
-    @Override
-    public List<CommandEntity> searchCommands(boolean isAcked) {
-        List<CommandEntity> results = new LinkedList<>();
-        try (final RocksIterator it = 
db.newIterator(columnHandlesMap.get(commandFamilyName))) {
-            it.seekToFirst();
-            while (it.isValid()) {
-                CommandEntity commandEntity = GSON.fromJson(new 
String(it.value()), CommandEntity.class);
-                if (commandEntity.isAcked() == isAcked) {
-                    results.add(commandEntity);
-                }
-                it.next();
-            }
-        }
-        return results;
-    }
-
     @Override
     public KeyValueEntity searchOne(StateSearchKey searchKey) {
         try (final RocksIterator it = 
db.newIterator(columnHandlesMap.get(defaultFamilyName))) {
diff --git 
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
 
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
index 3a41ef8644..eccb9c298e 100644
--- 
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
+++ 
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java
@@ -18,9 +18,6 @@
 package org.apache.inlong.agent.db;
 
 import org.apache.inlong.agent.AgentBaseTestsHelper;
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.common.db.CommandEntity;
 
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -30,10 +27,6 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.List;
 
-import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
-
 public class TestRocksDbImp {
 
     private static RocksDbImp db;
@@ -42,7 +35,7 @@ public class TestRocksDbImp {
     @BeforeClass
     public static void setup() throws Exception {
         helper = new 
AgentBaseTestsHelper(TestRocksDbImp.class.getName()).setupAgentHome();
-        db = new RocksDbImp();
+        db = new RocksDbImp("/localdb");
     }
 
     @AfterClass
@@ -81,22 +74,6 @@ public class TestRocksDbImp {
         db.put(entity);
         KeyValueEntity newEntity = db.get("test1");
         Assert.assertEquals("testC", newEntity.getJsonValue());
-
-    }
-
-    @Test
-    public void testCommandDb() {
-        CommandEntity commandEntity = new CommandEntity();
-        commandEntity.setId("1");
-        commandEntity.setCommandResult(0);
-        commandEntity.setAcked(false);
-        commandEntity.setTaskId(1);
-        commandEntity.setVersion(1);
-        db.putCommand(commandEntity);
-        CommandEntity command = db.getCommand("1");
-        Assert.assertEquals("1", command.getId());
-        List<CommandEntity> commandEntities = db.searchCommands(false);
-        Assert.assertEquals("1", commandEntities.get(0).getId());
     }
 
     @Test
@@ -115,16 +92,4 @@ public class TestRocksDbImp {
         KeyValueEntity entityResult = db.searchOne(StateSearchKey.ACCEPTED);
         Assert.assertEquals("searchKey1", entityResult.getKey());
     }
-
-    @Test
-    public void testBinlogJobStore() {
-        JobProfile jobProfile = JobProfile.parseJsonFile("binlogJob.json");
-        JobProfileDb jobDb = new JobProfileDb(db);
-        String jobId = jobProfile.get(JOB_ID);
-        jobProfile.set(JOB_INSTANCE_ID, 
AgentUtils.getSingleJobId(JOB_ID_PREFIX, jobId));
-        jobDb.storeJobFirstTime(jobProfile);
-        List<JobProfile> restarts = jobDb.getRestartJobs();
-        Assert.assertEquals(1, restarts.size());
-    }
-
 }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index c3e2344b27..315667d86a 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -29,6 +29,7 @@ import org.apache.inlong.agent.core.trigger.TriggerManager;
 import org.apache.inlong.agent.db.CommandDb;
 import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.db.JobProfileDb;
+import org.apache.inlong.agent.db.RocksDbImp;
 import org.apache.inlong.agent.db.TriggerProfileDb;
 
 import org.slf4j.Logger;
@@ -102,11 +103,8 @@ public class AgentManager extends AbstractDaemon {
      */
     private Db initDb() {
         try {
-            // db is a required component, so if not init correctly,
-            // throw exception and stop running.
-            return (Db) Class.forName(conf.get(
-                    AgentConstants.AGENT_DB_CLASSNAME, 
AgentConstants.DEFAULT_AGENT_DB_CLASSNAME))
-                    .newInstance();
+            String childPath = conf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH);
+            return new RocksDbImp(childPath);
         } catch (Exception ex) {
             throw new UnsupportedClassVersionError(ex.getMessage());
         }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
index 469acf387e..a16914dca3 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.agent.plugin.utils;
 
+import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.TriggerProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.constant.JobConstants;
 import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.db.RocksDbImp;
@@ -31,7 +33,9 @@ import static 
org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX;
 public class RocksDBUtils {
 
     public static void main(String[] args) {
-        Db db = new RocksDbImp();
+        AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
+        Db db = new RocksDbImp(
+                agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, 
AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH));
         upgrade(db);
     }
 

Reply via email to