This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new a194f575f9 [INLONG-9117][Agent] Rewrite class RocksDbImp to enable it to be constructed with a child path (#9119) a194f575f9 is described below commit a194f575f9cf72a005634e5f20144668013e9e09 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Thu Oct 26 16:58:02 2023 +0800 [INLONG-9117][Agent] Rewrite class RocksDbImp to enable it to be constructed with a child path (#9119) --- .../org/apache/inlong/agent/db/KeyValueEntity.java | 8 ++++ .../org/apache/inlong/agent/db/RocksDbImp.java | 46 ++++++---------------- .../org/apache/inlong/agent/db/TestRocksDbImp.java | 37 +---------------- .../org/apache/inlong/agent/core/AgentManager.java | 8 ++-- .../inlong/agent/plugin/utils/RocksDBUtils.java | 6 ++- 5 files changed, 29 insertions(+), 76 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java index 5cbda1e4f0..5ca4dadcb6 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/KeyValueEntity.java @@ -18,6 +18,7 @@ package org.apache.inlong.agent.db; import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.conf.OffsetProfile; import org.apache.inlong.agent.conf.TriggerProfile; /** @@ -90,6 +91,13 @@ public class KeyValueEntity { return TriggerProfile.parseJsonStr(getJsonValue()); } + /** + * convert keyValue to offset profile + */ + public OffsetProfile getAsOffsetProfile() { + return OffsetProfile.parseJsonStr(getJsonValue()); + } + /** * check whether the entity is finished */ diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java index d8dd2aa522..eac8479aa4 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/RocksDbImp.java @@ -62,10 +62,10 @@ public class RocksDbImp implements Db { private ConcurrentHashMap<String, ColumnFamilyDescriptor> columnDescriptorMap; private String storePath; - public RocksDbImp() { + public RocksDbImp(String childPath) { // init rocks db this.conf = AgentConfiguration.getAgentConf(); - this.db = initEnv(); + this.db = initEnv(childPath); // add a command column family addColumnFamily(commandFamilyName); } @@ -74,10 +74,10 @@ public class RocksDbImp implements Db { return new ColumnFamilyDescriptor(columnFamilyName, new ColumnFamilyOptions()); } - private RocksDB initEnv() { - String configPath = conf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH); + private RocksDB initEnv(String childPath) { String parentPath = conf.get(AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME); - File finalPath = new File(parentPath, configPath); + LOGGER.info("parentPath {} childPath {}", parentPath, childPath); + File finalPath = new File(parentPath, childPath); storePath = finalPath.getAbsolutePath(); RocksDB.loadLibrary(); final Options options = new Options(); @@ -160,23 +160,12 @@ public class RocksDbImp implements Db { @Override public CommandEntity getCommand(String commandId) { - try { - byte[] bytes = db.get(columnHandlesMap.get(commandFamilyName), commandId.getBytes()); - return bytes == null ? null : GSON.fromJson(new String(bytes), CommandEntity.class); - } catch (Exception e) { - throw new RuntimeException("get command value error", e); - } + return null; } @Override public CommandEntity putCommand(CommandEntity entity) { - requireNonNull(entity); - try { - db.put(columnHandlesMap.get(commandFamilyName), entity.getId().getBytes(), GSON.toJson(entity).getBytes()); - } catch (Exception e) { - throw new RuntimeException("put value to rocks db error", e); - } - return entity; + return null; } @Override @@ -228,6 +217,11 @@ public class RocksDbImp implements Db { return results; } + @Override + public List<CommandEntity> searchCommands(boolean isAcked) { + return null; + } + @Override public List<KeyValueEntity> search(StateSearchKey searchKey) { List<KeyValueEntity> results = new LinkedList<>(); @@ -260,22 +254,6 @@ public class RocksDbImp implements Db { return results; } - @Override - public List<CommandEntity> searchCommands(boolean isAcked) { - List<CommandEntity> results = new LinkedList<>(); - try (final RocksIterator it = db.newIterator(columnHandlesMap.get(commandFamilyName))) { - it.seekToFirst(); - while (it.isValid()) { - CommandEntity commandEntity = GSON.fromJson(new String(it.value()), CommandEntity.class); - if (commandEntity.isAcked() == isAcked) { - results.add(commandEntity); - } - it.next(); - } - } - return results; - } - @Override public KeyValueEntity searchOne(StateSearchKey searchKey) { try (final RocksIterator it = db.newIterator(columnHandlesMap.get(defaultFamilyName))) { diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java index 3a41ef8644..eccb9c298e 100644 --- a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java +++ b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestRocksDbImp.java @@ -18,9 +18,6 @@ package org.apache.inlong.agent.db; import org.apache.inlong.agent.AgentBaseTestsHelper; -import org.apache.inlong.agent.conf.JobProfile; -import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.common.db.CommandEntity; import org.junit.AfterClass; import org.junit.Assert; @@ -30,10 +27,6 @@ import org.junit.Test; import java.io.IOException; import java.util.List; -import static org.apache.inlong.agent.constant.JobConstants.JOB_ID; -import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX; -import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID; - public class TestRocksDbImp { private static RocksDbImp db; @@ -42,7 +35,7 @@ public class TestRocksDbImp { @BeforeClass public static void setup() throws Exception { helper = new AgentBaseTestsHelper(TestRocksDbImp.class.getName()).setupAgentHome(); - db = new RocksDbImp(); + db = new RocksDbImp("/localdb"); } @AfterClass @@ -81,22 +74,6 @@ public class TestRocksDbImp { db.put(entity); KeyValueEntity newEntity = db.get("test1"); Assert.assertEquals("testC", newEntity.getJsonValue()); - - } - - @Test - public void testCommandDb() { - CommandEntity commandEntity = new CommandEntity(); - commandEntity.setId("1"); - commandEntity.setCommandResult(0); - commandEntity.setAcked(false); - commandEntity.setTaskId(1); - commandEntity.setVersion(1); - db.putCommand(commandEntity); - CommandEntity command = db.getCommand("1"); - Assert.assertEquals("1", command.getId()); - List<CommandEntity> commandEntities = db.searchCommands(false); - Assert.assertEquals("1", commandEntities.get(0).getId()); } @Test @@ -115,16 +92,4 @@ public class TestRocksDbImp { KeyValueEntity entityResult = db.searchOne(StateSearchKey.ACCEPTED); Assert.assertEquals("searchKey1", entityResult.getKey()); } - - @Test - public void testBinlogJobStore() { - JobProfile jobProfile = JobProfile.parseJsonFile("binlogJob.json"); - JobProfileDb jobDb = new JobProfileDb(db); - String jobId = jobProfile.get(JOB_ID); - jobProfile.set(JOB_INSTANCE_ID, AgentUtils.getSingleJobId(JOB_ID_PREFIX, jobId)); - jobDb.storeJobFirstTime(jobProfile); - List<JobProfile> restarts = jobDb.getRestartJobs(); - Assert.assertEquals(1, restarts.size()); - } - } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java index c3e2344b27..315667d86a 100755 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java @@ -29,6 +29,7 @@ import org.apache.inlong.agent.core.trigger.TriggerManager; import org.apache.inlong.agent.db.CommandDb; import org.apache.inlong.agent.db.Db; import org.apache.inlong.agent.db.JobProfileDb; +import org.apache.inlong.agent.db.RocksDbImp; import org.apache.inlong.agent.db.TriggerProfileDb; import org.slf4j.Logger; @@ -102,11 +103,8 @@ public class AgentManager extends AbstractDaemon { */ private Db initDb() { try { - // db is a required component, so if not init correctly, - // throw exception and stop running. - return (Db) Class.forName(conf.get( - AgentConstants.AGENT_DB_CLASSNAME, AgentConstants.DEFAULT_AGENT_DB_CLASSNAME)) - .newInstance(); + String childPath = conf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH); + return new RocksDbImp(childPath); } catch (Exception ex) { throw new UnsupportedClassVersionError(ex.getMessage()); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java index 469acf387e..a16914dca3 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/RocksDBUtils.java @@ -17,7 +17,9 @@ package org.apache.inlong.agent.plugin.utils; +import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.TriggerProfile; +import org.apache.inlong.agent.constant.AgentConstants; import org.apache.inlong.agent.constant.JobConstants; import org.apache.inlong.agent.db.Db; import org.apache.inlong.agent.db.RocksDbImp; @@ -31,7 +33,9 @@ import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX; public class RocksDBUtils { public static void main(String[] args) { - Db db = new RocksDbImp(); + AgentConfiguration agentConf = AgentConfiguration.getAgentConf(); + Db db = new RocksDbImp( + agentConf.get(AgentConstants.AGENT_ROCKS_DB_PATH, AgentConstants.DEFAULT_AGENT_ROCKS_DB_PATH)); upgrade(db); }