This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new e1c3c5ea64 [INLONG-10288][Agent] Update the Oracle Source (#10746) e1c3c5ea64 is described below commit e1c3c5ea64e760169f9bc65c1ed4c42a4f945cc5 Author: Zkplo <87751516+zk...@users.noreply.github.com> AuthorDate: Mon Aug 5 10:35:52 2024 +0800 [INLONG-10288][Agent] Update the Oracle Source (#10746) Co-authored-by: ZKpLo <14148880+zk...@user.noreply.gitee.com> --- .../inlong/agent/constant/TaskConstants.java | 11 ++ .../org/apache/inlong/agent/pojo/OracleTask.java | 12 +- .../apache/inlong/agent/pojo/TaskProfileDto.java | 14 ++- inlong-agent/agent-plugins/pom.xml | 6 + .../agent/plugin/instance/OracleInstance.java | 31 +++++ .../inlong/agent/plugin/sources/OracleSource.java | 138 +++++++++++++++++++-- .../task/{PostgreSQLTask.java => OracleTask.java} | 69 ++++++----- .../inlong/agent/plugin/task/PostgreSQLTask.java | 1 - .../agent/plugin/sources/TestOracleConnect.java | 13 +- 9 files changed, 236 insertions(+), 59 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java index 1f142f839e..297c163709 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java @@ -127,6 +127,17 @@ public class TaskConstants extends CommonConstants { public static final String TASK_MONGO_SSL_ENABLE = "task.mongoTask.sslEnabled"; public static final String TASK_MONGO_POLL_INTERVAL = "task.mongoTask.pollIntervalInMs"; + // Oracle task + public static final String TASK_ORACLE_HOSTNAME = "task.oracleTask.hostname"; + public static final String TASK_ORACLE_PORT = "task.oracleTask.port"; + public static final String TASK_ORACLE_USER = "task.oracleTask.user"; + public static final String TASK_ORACLE_PASSWORD = "task.oracleTask.password"; + public static final String TASK_ORACLE_DBNAME = "task.oracleTask.dbname"; + public static final String TASK_ORACLE_SERVERNAME = "task.oracleTask.serverName"; + public static final String TASK_ORACLE_SCHEMA_INCLUDE_LIST = "task.oracleTask.schemaIncludeList"; + public static final String TASK_ORACLE_TABLE_INCLUDE_LIST = "task.oracleTask.tableIncludeList"; + public static final String TASK_ORACLE_SNAPSHOT_MODE = "task.oracleTask.snapshotMode"; + // PostgreSQL task public static final String TASK_POSTGRES_HOSTNAME = "task.postgreSQLTask.hostname"; public static final String TASK_POSTGRES_PORT = "task.postgreSQLTask.port"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleTask.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleTask.java index 7810c0a807..69ee8f6064 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleTask.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleTask.java @@ -28,6 +28,8 @@ public class OracleTask { private String port; private String serverName; private String dbname; + private String tableIncludeList; + private String schemaIncludeList; private OracleTask.Snapshot snapshot; private OracleTask.Offset offset; @@ -58,13 +60,15 @@ public class OracleTask { public static class OracleTaskConfig { private String hostname; - private String user; + private String username; private String password; private String port; - private String dbname; - private String serverName; + private String database; + private String schemaName; + private String tableName; + private String primaryKey; - private String snapshotMode; + private String scanStartupMode; private String intervalMs; private String offsetFilename; private String historyFilename; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java index 039acea32d..6fa58ff14b 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java @@ -52,6 +52,7 @@ public class TaskProfileDto { public static final String DEFAULT_KAFKA_TASK = "org.apache.inlong.agent.plugin.task.KafkaTask"; public static final String DEFAULT_PULSAR_TASK = "org.apache.inlong.agent.plugin.task.PulsarTask"; public static final String DEFAULT_MONGODB_TASK = "org.apache.inlong.agent.plugin.task.MongoDBTask"; + public static final String DEFAULT_ORACLE_TASK = "org.apache.inlong.agent.plugin.task.OracleTask"; public static final String DEFAULT_POSTGRESQL_TASK = "org.apache.inlong.agent.plugin.task.PostgreSQLTask"; public static final String DEFAULT_MQTT_TASK = "org.apache.inlong.agent.plugin.task.MqttTask"; public static final String DEFAULT_SQLSERVER_TASK = "org.apache.inlong.agent.plugin.task.SQLServerTask"; @@ -310,12 +311,14 @@ public class TaskProfileDto { OracleTaskConfig config = GSON.fromJson(dataConfigs.getExtParams(), OracleTaskConfig.class); OracleTask oracleTask = new OracleTask(); - oracleTask.setUser(config.getUser()); + oracleTask.setHostname(config.getHostname()); - oracleTask.setPassword(config.getPassword()); oracleTask.setPort(config.getPort()); - oracleTask.setServerName(config.getServerName()); - oracleTask.setDbname(config.getDbname()); + oracleTask.setUser(config.getUsername()); + oracleTask.setPassword(config.getPassword()); + oracleTask.setSchemaIncludeList(config.getSchemaName()); + oracleTask.setDbname(config.getDatabase()); + oracleTask.setTableIncludeList(config.getTableName()); OracleTask.Offset offset = new OracleTask.Offset(); offset.setFilename(config.getOffsetFilename()); @@ -324,7 +327,7 @@ public class TaskProfileDto { oracleTask.setOffset(offset); OracleTask.Snapshot snapshot = new OracleTask.Snapshot(); - snapshot.setMode(config.getSnapshotMode()); + snapshot.setMode(config.getScanStartupMode()); oracleTask.setSnapshot(snapshot); OracleTask.History history = new OracleTask.History(); @@ -493,6 +496,7 @@ public class TaskProfileDto { profileDto.setTask(task); break; case ORACLE: + task.setTaskClass(DEFAULT_ORACLE_TASK); OracleTask oracleTask = getOracleTask(dataConfig); task.setOracleTask(oracleTask); task.setSource(ORACLE_SOURCE); diff --git a/inlong-agent/agent-plugins/pom.xml b/inlong-agent/agent-plugins/pom.xml index 86cfe91a45..eb092bfed0 100644 --- a/inlong-agent/agent-plugins/pom.xml +++ b/inlong-agent/agent-plugins/pom.xml @@ -32,10 +32,16 @@ <properties> <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir> <debezium.version>1.8.0.Final</debezium.version> + <ojdbc.version>19.3.0.0</ojdbc.version> <darwinsys.version>1.5.1</darwinsys.version> </properties> <dependencies> + <dependency> + <groupId>com.oracle.ojdbc</groupId> + <artifactId>ojdbc8</artifactId> + <version>${ojdbc.version}</version> + </dependency> <dependency> <groupId>org.apache.inlong</groupId> <artifactId>agent-common</artifactId> diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/OracleInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/OracleInstance.java new file mode 100644 index 0000000000..43cf2fd4a8 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/OracleInstance.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.instance; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.constant.TaskConstants; + +import java.io.IOException; + +public class OracleInstance extends CommonInstance { + + @Override + public void setInodeInfo(InstanceProfile profile) throws IOException { + profile.set(TaskConstants.INODE_INFO, ""); + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java index c17dadeeff..fe7fe796ce 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java @@ -17,25 +17,56 @@ package org.apache.inlong.agent.plugin.sources; +import org.apache.inlong.agent.common.AgentThreadFactory; +import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.except.FileException; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.file.Reader; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; import org.apache.inlong.agent.plugin.sources.reader.OracleReader; +import io.debezium.connector.oracle.OracleConnector; +import io.debezium.connector.oracle.OracleConnectorConfig; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.format.Json; +import io.debezium.engine.spi.OffsetCommitPolicy; +import io.debezium.relational.history.FileDatabaseHistory; +import org.apache.kafka.connect.storage.FileOffsetBackingStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.apache.inlong.agent.constant.TaskConstants.*; /** * Oracle SQL source */ public class OracleSource extends AbstractSource { - private static final Logger logger = LoggerFactory.getLogger(OracleSource.class); + private static final Logger LOGGER = LoggerFactory.getLogger(OracleSource.class); + private static final Integer DEBEZIUM_QUEUE_SIZE = 100; + private ExecutorService executor; + public InstanceProfile profile; + private BlockingQueue<SourceData> debeziumQueue; + private Properties props = new Properties(); + + private String snapshotMode; + private String dbName; + private String tableName; + private String schema; public OracleSource() { } @@ -51,32 +82,120 @@ public class OracleSource extends AbstractSource { @Override protected String getThreadName() { - return null; + return "oracle-source-" + taskId + "-" + instanceId; } @Override protected void initSource(InstanceProfile profile) { + try { + LOGGER.info("OracleSource init: {}", profile.toJsonStr()); + debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE); + + dbName = profile.get(TASK_ORACLE_DBNAME); + tableName = profile.get(TASK_ORACLE_TABLE_INCLUDE_LIST); + schema = profile.get(TASK_ORACLE_SCHEMA_INCLUDE_LIST); + snapshotMode = profile.get(TASK_ORACLE_SNAPSHOT_MODE, "initial"); + + props.setProperty("name", "Oracle-" + instanceId); + props.setProperty("connector.class", OracleConnector.class.getName()); + + // Unified storage in "[agentPath]/data/" + String agentPath = + AgentConfiguration.getAgentConf().get(AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME); + String offsetPath = agentPath + "/data/" + getThreadName() + "/" + "offset.dat"; + String historyPath = agentPath + "/data/" + getThreadName() + "/" + "history.dat"; + props.setProperty("offset.storage", FileOffsetBackingStore.class.getName()); + props.setProperty("offset.storage.file.filename", offsetPath); + props.setProperty("database.history", FileDatabaseHistory.class.getCanonicalName()); + props.setProperty("database.history.file.filename", historyPath); + + props.setProperty(String.valueOf(OracleConnectorConfig.HOSTNAME), profile.get(TASK_ORACLE_HOSTNAME)); + props.setProperty(String.valueOf(OracleConnectorConfig.PORT), profile.get(TASK_ORACLE_PORT)); + props.setProperty(String.valueOf(OracleConnectorConfig.USER), profile.get(TASK_ORACLE_USER)); + props.setProperty(String.valueOf(OracleConnectorConfig.PASSWORD), profile.get(TASK_ORACLE_PASSWORD)); + props.setProperty(String.valueOf(OracleConnectorConfig.TABLE_INCLUDE_LIST), schema + "." + tableName); + props.setProperty(String.valueOf(OracleConnectorConfig.SERVER_NAME), getThreadName()); + props.setProperty(String.valueOf(OracleConnectorConfig.DATABASE_NAME), profile.get(TASK_ORACLE_DBNAME)); + props.setProperty(String.valueOf(OracleConnectorConfig.SCHEMA_INCLUDE_LIST), schema); + props.setProperty(String.valueOf(OracleConnectorConfig.SNAPSHOT_MODE), snapshotMode); + + // Prevent Base64 encoding of Oracle NUMBER type fields + props.setProperty(String.valueOf(OracleConnectorConfig.DECIMAL_HANDLING_MODE), "string"); + + props.setProperty("key.converter.schemas.enable", "false"); + props.setProperty("value.converter.schemas.enable", "false"); + + executor = Executors.newSingleThreadExecutor(); + executor.execute(startDebeziumEngine()); + + } catch (Exception ex) { + stopRunning(); + throw new FileException("error init stream for " + instanceId, ex); + } + } + private Runnable startDebeziumEngine() { + return () -> { + AgentThreadFactory.nameThread(getThreadName() + "debezium"); + try (DebeziumEngine<ChangeEvent<String, String>> debeziumEngine = DebeziumEngine.create(Json.class) + .using(props) + .using(OffsetCommitPolicy.always()) + .notifying(this::handleConsumerEvent) + .build()) { + debeziumEngine.run(); + } catch (Throwable e) { + LOGGER.error("do run error in postgres debezium: ", e); + } + }; + } + + private void handleConsumerEvent(List<ChangeEvent<String, String>> records, + DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer) throws InterruptedException { + for (ChangeEvent<String, String> record : records) { + boolean offerSuc = false; + SourceData sourceData = new SourceData(record.value().getBytes(StandardCharsets.UTF_8), "0L"); + while (isRunnable() && !offerSuc) { + offerSuc = debeziumQueue.offer(sourceData, 1, TimeUnit.SECONDS); + } + committer.markProcessed(record); + } + committer.markBatchFinished(); } @Override protected void printCurrentState() { - + LOGGER.info("oracle table is {}", tableName); } @Override protected boolean doPrepareToRead() { - return false; + return true; } @Override protected List<SourceData> readFromSource() { - return null; + List<SourceData> dataList = new ArrayList<>(); + try { + int size = 0; + while (size < BATCH_READ_LINE_TOTAL_LEN) { + SourceData sourceData = debeziumQueue.poll(1, TimeUnit.SECONDS); + if (sourceData != null) { + LOGGER.info("readFromSource: {}", sourceData.getData()); + size += sourceData.getData().length; + dataList.add(sourceData); + } else { + break; + } + } + } catch (InterruptedException e) { + LOGGER.error("poll {} data from debezium queue interrupted.", instanceId); + } + return dataList; } @Override public Message read() { - return null; + return super.read(); } @Override @@ -86,16 +205,17 @@ public class OracleSource extends AbstractSource { @Override protected void releaseSource() { - + LOGGER.info("release oracle source"); + executor.shutdownNow(); } @Override public boolean sourceFinish() { - return false; + return super.sourceFinish(); } @Override public boolean sourceExist() { - return false; + return true; } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/OracleTask.java similarity index 59% copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/OracleTask.java index 554764911d..34b064d48d 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/OracleTask.java @@ -20,6 +20,7 @@ package org.apache.inlong.agent.plugin.task; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.constant.CycleUnitType; +import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.utils.AgentUtils; import org.slf4j.Logger; @@ -29,34 +30,25 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_DBNAME; -import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_HOSTNAME; -import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PASSWORD; -import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PLUGIN_NAME; -import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PORT; -import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_TABLE_INCLUDE_LIST; -import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_USER; +import static org.apache.inlong.agent.constant.TaskConstants.*; -public class PostgreSQLTask extends AbstractTask { +public class OracleTask extends AbstractTask { - private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLTask.class); - public static final String DEFAULT_KAFKA_INSTANCE = "org.apache.inlong.agent.plugin.instance.KafkaInstance"; + private static final Logger LOGGER = LoggerFactory.getLogger(OracleTask.class); + + public static final String DEFAULT_ORACLE_INSTANCE = "org.apache.inlong.agent.plugin.instance.OracleInstance"; + private AtomicBoolean isAdded = new AtomicBoolean(false); private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHH"); - private boolean isAdded = false; - public static final int DEFAULT_INSTANCE_LIMIT = 1; private String dbName; private String tableName; private String instanceId; @Override - protected void initTask() { - LOGGER.info("postgres commonInit: {}", taskProfile.toJsonStr()); - taskProfile.get(TASK_POSTGRES_DBNAME); - dbName = taskProfile.get(TASK_POSTGRES_DBNAME); - tableName = taskProfile.get(TASK_POSTGRES_TABLE_INCLUDE_LIST); - instanceId = dbName + "-" + tableName; + protected int getInstanceLimit() { + return DEFAULT_INSTANCE_LIMIT; } @Override @@ -65,49 +57,58 @@ public class PostgreSQLTask extends AbstractTask { LOGGER.error("task profile needs all required key"); return false; } - if (!profile.hasKey(profile.get(TASK_POSTGRES_HOSTNAME))) { + if (!profile.hasKey(TaskConstants.TASK_ORACLE_HOSTNAME)) { LOGGER.error("task profile needs hostname"); return false; } - if (!profile.hasKey(profile.get(TASK_POSTGRES_PORT))) { + if (!profile.hasKey(TaskConstants.TASK_ORACLE_PORT)) { LOGGER.error("task profile needs port"); return false; } - if (!profile.hasKey(profile.get(TASK_POSTGRES_USER))) { + if (!profile.hasKey(TaskConstants.TASK_ORACLE_USER)) { LOGGER.error("task profile needs username"); return false; } - if (!profile.hasKey(profile.get(TASK_POSTGRES_PASSWORD))) { + if (!profile.hasKey(TaskConstants.TASK_ORACLE_PASSWORD)) { LOGGER.error("task profile needs password"); return false; } - if (!profile.hasKey(profile.get(TASK_POSTGRES_DBNAME))) { + if (!profile.hasKey(TaskConstants.TASK_ORACLE_DBNAME)) { LOGGER.error("task profile needs DB name"); return false; } - if (!profile.hasKey(profile.get(TASK_POSTGRES_PLUGIN_NAME))) { - LOGGER.error("task profile needs plugin name"); + if (!profile.hasKey(TaskConstants.TASK_ORACLE_SCHEMA_INCLUDE_LIST)) { + LOGGER.error("task profile needs schema name"); + return false; + } + if (!profile.hasKey(TaskConstants.TASK_ORACLE_TABLE_INCLUDE_LIST)) { + LOGGER.error("task profile needs table list"); return false; } return true; } + @Override + protected void initTask() { + LOGGER.info("oracle commonInit: {}", taskProfile.toJsonStr()); + dbName = taskProfile.get(TASK_ORACLE_DBNAME); + tableName = taskProfile.get(TASK_ORACLE_TABLE_INCLUDE_LIST); + instanceId = dbName + "-" + tableName; + } + @Override protected List<InstanceProfile> getNewInstanceList() { List<InstanceProfile> list = new ArrayList<>(); - if (isAdded) { + if (isAdded.get()) { return list; } String dataTime = LocalDateTime.now().format(dateTimeFormatter); - InstanceProfile instanceProfile = taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, instanceId, - CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime()); + InstanceProfile instanceProfile = + taskProfile.createInstanceProfile(DEFAULT_ORACLE_INSTANCE, instanceId, + CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime()); + LOGGER.info("taskProfile.createInstanceProfile: {}", instanceProfile.toJsonStr()); list.add(instanceProfile); - this.isAdded = true; + this.isAdded.set(true); return list; } - - @Override - protected int getInstanceLimit() { - return DEFAULT_INSTANCE_LIMIT; - } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java index 554764911d..7cf382fbd6 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java @@ -53,7 +53,6 @@ public class PostgreSQLTask extends AbstractTask { @Override protected void initTask() { LOGGER.info("postgres commonInit: {}", taskProfile.toJsonStr()); - taskProfile.get(TASK_POSTGRES_DBNAME); dbName = taskProfile.get(TASK_POSTGRES_DBNAME); tableName = taskProfile.get(TASK_POSTGRES_TABLE_INCLUDE_LIST); instanceId = dbName + "-" + tableName; diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java index ea1d9289cb..7f2e9e3c81 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java @@ -41,13 +41,14 @@ public class TestOracleConnect { @Ignore public void testOracle() { InstanceProfile jobProfile = new InstanceProfile(); - jobProfile.set("job.oracleJob.hostname", "localhost"); - jobProfile.set("job.oracleJob.port", "1521"); - jobProfile.set("job.oracleJob.user", "c##dbzuser"); - jobProfile.set("job.oracleJob.password", "dbz"); - jobProfile.set("job.oracleJob.sid", "ORCLCDB"); - jobProfile.set("job.oracleJob.dbname", "ORCLCDB"); + jobProfile.set("job.oracleJob.hostname", "192.168.101.11"); + jobProfile.set("job.oracleJob.port", "49161"); + jobProfile.set("job.oracleJob.user", "c##admin"); + jobProfile.set("job.oracleJob.password", "inlong"); + jobProfile.set("job.oracleJob.sid", "xe"); + jobProfile.set("job.oracleJob.dbname", "xe"); jobProfile.set("job.oracleJob.serverName", "server1"); + jobProfile.set("instance.id", "instance_test"); jobProfile.set(TaskConstants.JOB_INSTANCE_ID, UUID.randomUUID().toString()); jobProfile.set(PROXY_INLONG_GROUP_ID, UUID.randomUUID().toString()); jobProfile.set(PROXY_INLONG_STREAM_ID, UUID.randomUUID().toString());