This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e1c3c5ea64 [INLONG-10288][Agent] Update the Oracle Source (#10746)
e1c3c5ea64 is described below

commit e1c3c5ea64e760169f9bc65c1ed4c42a4f945cc5
Author: Zkplo <87751516+zk...@users.noreply.github.com>
AuthorDate: Mon Aug 5 10:35:52 2024 +0800

    [INLONG-10288][Agent] Update the Oracle Source (#10746)
    
    Co-authored-by: ZKpLo <14148880+zk...@user.noreply.gitee.com>
---
 .../inlong/agent/constant/TaskConstants.java       |  11 ++
 .../org/apache/inlong/agent/pojo/OracleTask.java   |  12 +-
 .../apache/inlong/agent/pojo/TaskProfileDto.java   |  14 ++-
 inlong-agent/agent-plugins/pom.xml                 |   6 +
 .../agent/plugin/instance/OracleInstance.java      |  31 +++++
 .../inlong/agent/plugin/sources/OracleSource.java  | 138 +++++++++++++++++++--
 .../task/{PostgreSQLTask.java => OracleTask.java}  |  69 ++++++-----
 .../inlong/agent/plugin/task/PostgreSQLTask.java   |   1 -
 .../agent/plugin/sources/TestOracleConnect.java    |  13 +-
 9 files changed, 236 insertions(+), 59 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 1f142f839e..297c163709 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -127,6 +127,17 @@ public class TaskConstants extends CommonConstants {
     public static final String TASK_MONGO_SSL_ENABLE = 
"task.mongoTask.sslEnabled";
     public static final String TASK_MONGO_POLL_INTERVAL = 
"task.mongoTask.pollIntervalInMs";
 
+    // Oracle task
+    public static final String TASK_ORACLE_HOSTNAME = 
"task.oracleTask.hostname";
+    public static final String TASK_ORACLE_PORT = "task.oracleTask.port";
+    public static final String TASK_ORACLE_USER = "task.oracleTask.user";
+    public static final String TASK_ORACLE_PASSWORD = 
"task.oracleTask.password";
+    public static final String TASK_ORACLE_DBNAME = "task.oracleTask.dbname";
+    public static final String TASK_ORACLE_SERVERNAME = 
"task.oracleTask.serverName";
+    public static final String TASK_ORACLE_SCHEMA_INCLUDE_LIST = 
"task.oracleTask.schemaIncludeList";
+    public static final String TASK_ORACLE_TABLE_INCLUDE_LIST = 
"task.oracleTask.tableIncludeList";
+    public static final String TASK_ORACLE_SNAPSHOT_MODE = 
"task.oracleTask.snapshotMode";
+
     // PostgreSQL task
     public static final String TASK_POSTGRES_HOSTNAME = 
"task.postgreSQLTask.hostname";
     public static final String TASK_POSTGRES_PORT = "task.postgreSQLTask.port";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleTask.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleTask.java
index 7810c0a807..69ee8f6064 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleTask.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/OracleTask.java
@@ -28,6 +28,8 @@ public class OracleTask {
     private String port;
     private String serverName;
     private String dbname;
+    private String tableIncludeList;
+    private String schemaIncludeList;
 
     private OracleTask.Snapshot snapshot;
     private OracleTask.Offset offset;
@@ -58,13 +60,15 @@ public class OracleTask {
     public static class OracleTaskConfig {
 
         private String hostname;
-        private String user;
+        private String username;
         private String password;
         private String port;
-        private String dbname;
-        private String serverName;
+        private String database;
+        private String schemaName;
+        private String tableName;
+        private String primaryKey;
 
-        private String snapshotMode;
+        private String scanStartupMode;
         private String intervalMs;
         private String offsetFilename;
         private String historyFilename;
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 039acea32d..6fa58ff14b 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -52,6 +52,7 @@ public class TaskProfileDto {
     public static final String DEFAULT_KAFKA_TASK = 
"org.apache.inlong.agent.plugin.task.KafkaTask";
     public static final String DEFAULT_PULSAR_TASK = 
"org.apache.inlong.agent.plugin.task.PulsarTask";
     public static final String DEFAULT_MONGODB_TASK = 
"org.apache.inlong.agent.plugin.task.MongoDBTask";
+    public static final String DEFAULT_ORACLE_TASK = 
"org.apache.inlong.agent.plugin.task.OracleTask";
     public static final String DEFAULT_POSTGRESQL_TASK = 
"org.apache.inlong.agent.plugin.task.PostgreSQLTask";
     public static final String DEFAULT_MQTT_TASK = 
"org.apache.inlong.agent.plugin.task.MqttTask";
     public static final String DEFAULT_SQLSERVER_TASK = 
"org.apache.inlong.agent.plugin.task.SQLServerTask";
@@ -310,12 +311,14 @@ public class TaskProfileDto {
         OracleTaskConfig config = GSON.fromJson(dataConfigs.getExtParams(),
                 OracleTaskConfig.class);
         OracleTask oracleTask = new OracleTask();
-        oracleTask.setUser(config.getUser());
+
         oracleTask.setHostname(config.getHostname());
-        oracleTask.setPassword(config.getPassword());
         oracleTask.setPort(config.getPort());
-        oracleTask.setServerName(config.getServerName());
-        oracleTask.setDbname(config.getDbname());
+        oracleTask.setUser(config.getUsername());
+        oracleTask.setPassword(config.getPassword());
+        oracleTask.setSchemaIncludeList(config.getSchemaName());
+        oracleTask.setDbname(config.getDatabase());
+        oracleTask.setTableIncludeList(config.getTableName());
 
         OracleTask.Offset offset = new OracleTask.Offset();
         offset.setFilename(config.getOffsetFilename());
@@ -324,7 +327,7 @@ public class TaskProfileDto {
         oracleTask.setOffset(offset);
 
         OracleTask.Snapshot snapshot = new OracleTask.Snapshot();
-        snapshot.setMode(config.getSnapshotMode());
+        snapshot.setMode(config.getScanStartupMode());
         oracleTask.setSnapshot(snapshot);
 
         OracleTask.History history = new OracleTask.History();
@@ -493,6 +496,7 @@ public class TaskProfileDto {
                 profileDto.setTask(task);
                 break;
             case ORACLE:
+                task.setTaskClass(DEFAULT_ORACLE_TASK);
                 OracleTask oracleTask = getOracleTask(dataConfig);
                 task.setOracleTask(oracleTask);
                 task.setSource(ORACLE_SOURCE);
diff --git a/inlong-agent/agent-plugins/pom.xml 
b/inlong-agent/agent-plugins/pom.xml
index 86cfe91a45..eb092bfed0 100644
--- a/inlong-agent/agent-plugins/pom.xml
+++ b/inlong-agent/agent-plugins/pom.xml
@@ -32,10 +32,16 @@
     <properties>
         <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
         <debezium.version>1.8.0.Final</debezium.version>
+        <ojdbc.version>19.3.0.0</ojdbc.version>
         <darwinsys.version>1.5.1</darwinsys.version>
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>com.oracle.ojdbc</groupId>
+            <artifactId>ojdbc8</artifactId>
+            <version>${ojdbc.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.inlong</groupId>
             <artifactId>agent-common</artifactId>
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/OracleInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/OracleInstance.java
new file mode 100644
index 0000000000..43cf2fd4a8
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/OracleInstance.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+
+import java.io.IOException;
+
+public class OracleInstance extends CommonInstance {
+
+    @Override
+    public void setInodeInfo(InstanceProfile profile) throws IOException {
+        profile.set(TaskConstants.INODE_INFO, "");
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
index c17dadeeff..fe7fe796ce 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/OracleSource.java
@@ -17,25 +17,56 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.except.FileException;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Reader;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
 import org.apache.inlong.agent.plugin.sources.reader.OracleReader;
 
+import io.debezium.connector.oracle.OracleConnector;
+import io.debezium.connector.oracle.OracleConnectorConfig;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.format.Json;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import io.debezium.relational.history.FileDatabaseHistory;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.inlong.agent.constant.TaskConstants.*;
 
 /**
  * Oracle SQL source
  */
 public class OracleSource extends AbstractSource {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(OracleSource.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(OracleSource.class);
+    private static final Integer DEBEZIUM_QUEUE_SIZE = 100;
+    private ExecutorService executor;
+    public InstanceProfile profile;
+    private BlockingQueue<SourceData> debeziumQueue;
+    private Properties props = new Properties();
+
+    private String snapshotMode;
+    private String dbName;
+    private String tableName;
+    private String schema;
 
     public OracleSource() {
     }
@@ -51,32 +82,120 @@ public class OracleSource extends AbstractSource {
 
     @Override
     protected String getThreadName() {
-        return null;
+        return "oracle-source-" + taskId + "-" + instanceId;
     }
 
     @Override
     protected void initSource(InstanceProfile profile) {
+        try {
+            LOGGER.info("OracleSource init: {}", profile.toJsonStr());
+            debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE);
+
+            dbName = profile.get(TASK_ORACLE_DBNAME);
+            tableName = profile.get(TASK_ORACLE_TABLE_INCLUDE_LIST);
+            schema = profile.get(TASK_ORACLE_SCHEMA_INCLUDE_LIST);
+            snapshotMode = profile.get(TASK_ORACLE_SNAPSHOT_MODE, "initial");
+
+            props.setProperty("name", "Oracle-" + instanceId);
+            props.setProperty("connector.class", 
OracleConnector.class.getName());
+
+            // Unified storage in "[agentPath]/data/"
+            String agentPath =
+                    
AgentConfiguration.getAgentConf().get(AgentConstants.AGENT_HOME, 
AgentConstants.DEFAULT_AGENT_HOME);
+            String offsetPath = agentPath + "/data/" + getThreadName() + "/" + 
"offset.dat";
+            String historyPath = agentPath + "/data/" + getThreadName() + "/" 
+ "history.dat";
+            props.setProperty("offset.storage", 
FileOffsetBackingStore.class.getName());
+            props.setProperty("offset.storage.file.filename", offsetPath);
+            props.setProperty("database.history", 
FileDatabaseHistory.class.getCanonicalName());
+            props.setProperty("database.history.file.filename", historyPath);
+
+            props.setProperty(String.valueOf(OracleConnectorConfig.HOSTNAME), 
profile.get(TASK_ORACLE_HOSTNAME));
+            props.setProperty(String.valueOf(OracleConnectorConfig.PORT), 
profile.get(TASK_ORACLE_PORT));
+            props.setProperty(String.valueOf(OracleConnectorConfig.USER), 
profile.get(TASK_ORACLE_USER));
+            props.setProperty(String.valueOf(OracleConnectorConfig.PASSWORD), 
profile.get(TASK_ORACLE_PASSWORD));
+            
props.setProperty(String.valueOf(OracleConnectorConfig.TABLE_INCLUDE_LIST), 
schema + "." + tableName);
+            
props.setProperty(String.valueOf(OracleConnectorConfig.SERVER_NAME), 
getThreadName());
+            
props.setProperty(String.valueOf(OracleConnectorConfig.DATABASE_NAME), 
profile.get(TASK_ORACLE_DBNAME));
+            
props.setProperty(String.valueOf(OracleConnectorConfig.SCHEMA_INCLUDE_LIST), 
schema);
+            
props.setProperty(String.valueOf(OracleConnectorConfig.SNAPSHOT_MODE), 
snapshotMode);
+
+            // Prevent Base64 encoding of Oracle NUMBER type fields
+            
props.setProperty(String.valueOf(OracleConnectorConfig.DECIMAL_HANDLING_MODE), 
"string");
+
+            props.setProperty("key.converter.schemas.enable", "false");
+            props.setProperty("value.converter.schemas.enable", "false");
+
+            executor = Executors.newSingleThreadExecutor();
+            executor.execute(startDebeziumEngine());
+
+        } catch (Exception ex) {
+            stopRunning();
+            throw new FileException("error init stream for " + instanceId, ex);
+        }
+    }
 
+    private Runnable startDebeziumEngine() {
+        return () -> {
+            AgentThreadFactory.nameThread(getThreadName() + "debezium");
+            try (DebeziumEngine<ChangeEvent<String, String>> debeziumEngine = 
DebeziumEngine.create(Json.class)
+                    .using(props)
+                    .using(OffsetCommitPolicy.always())
+                    .notifying(this::handleConsumerEvent)
+                    .build()) {
+                debeziumEngine.run();
+            } catch (Throwable e) {
+                LOGGER.error("do run error in postgres debezium: ", e);
+            }
+        };
+    }
+
+    private void handleConsumerEvent(List<ChangeEvent<String, String>> records,
+            DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> 
committer) throws InterruptedException {
+        for (ChangeEvent<String, String> record : records) {
+            boolean offerSuc = false;
+            SourceData sourceData = new 
SourceData(record.value().getBytes(StandardCharsets.UTF_8), "0L");
+            while (isRunnable() && !offerSuc) {
+                offerSuc = debeziumQueue.offer(sourceData, 1, 
TimeUnit.SECONDS);
+            }
+            committer.markProcessed(record);
+        }
+        committer.markBatchFinished();
     }
 
     @Override
     protected void printCurrentState() {
-
+        LOGGER.info("oracle table is {}", tableName);
     }
 
     @Override
     protected boolean doPrepareToRead() {
-        return false;
+        return true;
     }
 
     @Override
     protected List<SourceData> readFromSource() {
-        return null;
+        List<SourceData> dataList = new ArrayList<>();
+        try {
+            int size = 0;
+            while (size < BATCH_READ_LINE_TOTAL_LEN) {
+                SourceData sourceData = debeziumQueue.poll(1, 
TimeUnit.SECONDS);
+                if (sourceData != null) {
+                    LOGGER.info("readFromSource: {}", sourceData.getData());
+                    size += sourceData.getData().length;
+                    dataList.add(sourceData);
+                } else {
+                    break;
+                }
+            }
+        } catch (InterruptedException e) {
+            LOGGER.error("poll {} data from debezium queue interrupted.", 
instanceId);
+        }
+        return dataList;
     }
 
     @Override
     public Message read() {
-        return null;
+        return super.read();
     }
 
     @Override
@@ -86,16 +205,17 @@ public class OracleSource extends AbstractSource {
 
     @Override
     protected void releaseSource() {
-
+        LOGGER.info("release oracle source");
+        executor.shutdownNow();
     }
 
     @Override
     public boolean sourceFinish() {
-        return false;
+        return super.sourceFinish();
     }
 
     @Override
     public boolean sourceExist() {
-        return false;
+        return true;
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/OracleTask.java
similarity index 59%
copy from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
copy to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/OracleTask.java
index 554764911d..34b064d48d 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/OracleTask.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.plugin.task;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.utils.AgentUtils;
 
 import org.slf4j.Logger;
@@ -29,34 +30,25 @@ import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_DBNAME;
-import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_HOSTNAME;
-import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PASSWORD;
-import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PLUGIN_NAME;
-import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PORT;
-import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_TABLE_INCLUDE_LIST;
-import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_USER;
+import static org.apache.inlong.agent.constant.TaskConstants.*;
 
-public class PostgreSQLTask extends AbstractTask {
+public class OracleTask extends AbstractTask {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(PostgreSQLTask.class);
-    public static final String DEFAULT_KAFKA_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(OracleTask.class);
+
+    public static final String DEFAULT_ORACLE_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.OracleInstance";
+    private AtomicBoolean isAdded = new AtomicBoolean(false);
     private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyyMMddHH");
-    private boolean isAdded = false;
-    public static final int DEFAULT_INSTANCE_LIMIT = 1;
 
     private String dbName;
     private String tableName;
     private String instanceId;
 
     @Override
-    protected void initTask() {
-        LOGGER.info("postgres commonInit: {}", taskProfile.toJsonStr());
-        taskProfile.get(TASK_POSTGRES_DBNAME);
-        dbName = taskProfile.get(TASK_POSTGRES_DBNAME);
-        tableName = taskProfile.get(TASK_POSTGRES_TABLE_INCLUDE_LIST);
-        instanceId = dbName + "-" + tableName;
+    protected int getInstanceLimit() {
+        return DEFAULT_INSTANCE_LIMIT;
     }
 
     @Override
@@ -65,49 +57,58 @@ public class PostgreSQLTask extends AbstractTask {
             LOGGER.error("task profile needs all required key");
             return false;
         }
-        if (!profile.hasKey(profile.get(TASK_POSTGRES_HOSTNAME))) {
+        if (!profile.hasKey(TaskConstants.TASK_ORACLE_HOSTNAME)) {
             LOGGER.error("task profile needs hostname");
             return false;
         }
-        if (!profile.hasKey(profile.get(TASK_POSTGRES_PORT))) {
+        if (!profile.hasKey(TaskConstants.TASK_ORACLE_PORT)) {
             LOGGER.error("task profile needs port");
             return false;
         }
-        if (!profile.hasKey(profile.get(TASK_POSTGRES_USER))) {
+        if (!profile.hasKey(TaskConstants.TASK_ORACLE_USER)) {
             LOGGER.error("task profile needs username");
             return false;
         }
-        if (!profile.hasKey(profile.get(TASK_POSTGRES_PASSWORD))) {
+        if (!profile.hasKey(TaskConstants.TASK_ORACLE_PASSWORD)) {
             LOGGER.error("task profile needs password");
             return false;
         }
-        if (!profile.hasKey(profile.get(TASK_POSTGRES_DBNAME))) {
+        if (!profile.hasKey(TaskConstants.TASK_ORACLE_DBNAME)) {
             LOGGER.error("task profile needs DB name");
             return false;
         }
-        if (!profile.hasKey(profile.get(TASK_POSTGRES_PLUGIN_NAME))) {
-            LOGGER.error("task profile needs plugin name");
+        if (!profile.hasKey(TaskConstants.TASK_ORACLE_SCHEMA_INCLUDE_LIST)) {
+            LOGGER.error("task profile needs schema name");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.TASK_ORACLE_TABLE_INCLUDE_LIST)) {
+            LOGGER.error("task profile needs table list");
             return false;
         }
         return true;
     }
 
+    @Override
+    protected void initTask() {
+        LOGGER.info("oracle commonInit: {}", taskProfile.toJsonStr());
+        dbName = taskProfile.get(TASK_ORACLE_DBNAME);
+        tableName = taskProfile.get(TASK_ORACLE_TABLE_INCLUDE_LIST);
+        instanceId = dbName + "-" + tableName;
+    }
+
     @Override
     protected List<InstanceProfile> getNewInstanceList() {
         List<InstanceProfile> list = new ArrayList<>();
-        if (isAdded) {
+        if (isAdded.get()) {
             return list;
         }
         String dataTime = LocalDateTime.now().format(dateTimeFormatter);
-        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, instanceId,
-                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+        InstanceProfile instanceProfile =
+                taskProfile.createInstanceProfile(DEFAULT_ORACLE_INSTANCE, 
instanceId,
+                        CycleUnitType.HOUR, dataTime, 
AgentUtils.getCurrentTime());
+        LOGGER.info("taskProfile.createInstanceProfile: {}", 
instanceProfile.toJsonStr());
         list.add(instanceProfile);
-        this.isAdded = true;
+        this.isAdded.set(true);
         return list;
     }
-
-    @Override
-    protected int getInstanceLimit() {
-        return DEFAULT_INSTANCE_LIMIT;
-    }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
index 554764911d..7cf382fbd6 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
@@ -53,7 +53,6 @@ public class PostgreSQLTask extends AbstractTask {
     @Override
     protected void initTask() {
         LOGGER.info("postgres commonInit: {}", taskProfile.toJsonStr());
-        taskProfile.get(TASK_POSTGRES_DBNAME);
         dbName = taskProfile.get(TASK_POSTGRES_DBNAME);
         tableName = taskProfile.get(TASK_POSTGRES_TABLE_INCLUDE_LIST);
         instanceId = dbName + "-" + tableName;
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java
index ea1d9289cb..7f2e9e3c81 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestOracleConnect.java
@@ -41,13 +41,14 @@ public class TestOracleConnect {
     @Ignore
     public void testOracle() {
         InstanceProfile jobProfile = new InstanceProfile();
-        jobProfile.set("job.oracleJob.hostname", "localhost");
-        jobProfile.set("job.oracleJob.port", "1521");
-        jobProfile.set("job.oracleJob.user", "c##dbzuser");
-        jobProfile.set("job.oracleJob.password", "dbz");
-        jobProfile.set("job.oracleJob.sid", "ORCLCDB");
-        jobProfile.set("job.oracleJob.dbname", "ORCLCDB");
+        jobProfile.set("job.oracleJob.hostname", "192.168.101.11");
+        jobProfile.set("job.oracleJob.port", "49161");
+        jobProfile.set("job.oracleJob.user", "c##admin");
+        jobProfile.set("job.oracleJob.password", "inlong");
+        jobProfile.set("job.oracleJob.sid", "xe");
+        jobProfile.set("job.oracleJob.dbname", "xe");
         jobProfile.set("job.oracleJob.serverName", "server1");
+        jobProfile.set("instance.id", "instance_test");
         jobProfile.set(TaskConstants.JOB_INSTANCE_ID, 
UUID.randomUUID().toString());
         jobProfile.set(PROXY_INLONG_GROUP_ID, UUID.randomUUID().toString());
         jobProfile.set(PROXY_INLONG_STREAM_ID, UUID.randomUUID().toString());

Reply via email to