This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ca0983fceb [INLONG-10318][Agent] Add PostgreSQL data source for Agent 
(#10320)
ca0983fceb is described below

commit ca0983fcebf70c68f8e79f341c992b1d2c5970f4
Author: haifxu <xhf1208357...@gmail.com>
AuthorDate: Thu May 30 19:48:54 2024 +0800

    [INLONG-10318][Agent] Add PostgreSQL data source for Agent (#10320)
    
    * [INLONG-10318][Agent] Add PostgreSQL data source for Agent
    
    * [INLONG-10318][Agent] Add PostgreSQL data source for Agent
    
    * [INLONG-10318][Agent] code format
---
 .../inlong/agent/constant/TaskConstants.java       |  13 ++
 .../apache/inlong/agent/pojo/PostgreSQLTask.java   |   9 +-
 .../apache/inlong/agent/pojo/TaskProfileDto.java   |  15 +-
 .../agent/plugin/instance/PostgreSQLInstance.java  |  29 ++++
 .../agent/plugin/sources/PostgreSQLSource.java     | 160 +++++++++++++++++----
 .../inlong/agent/plugin/task/PostgreSQLTask.java   | 113 +++++++++++++++
 pom.xml                                            |   2 +-
 7 files changed, 302 insertions(+), 39 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 4cb1c70d12..1607742556 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -97,6 +97,7 @@ public class TaskConstants extends CommonConstants {
     public static final String TASK_PULSAR_SUBSCRIPTION_POSITION = 
"task.pulsarTask.subscriptionPosition";
     public static final String TASK_PULSAR_RESET_TIME = 
"task.pulsarTask.resetTime";
 
+    // Mongo task
     public static final String TASK_MONGO_HOSTS = "task.mongoTask.hosts";
     public static final String TASK_MONGO_USER = "task.mongoTask.user";
     public static final String TASK_MONGO_PASSWORD = "task.mongoTask.password";
@@ -126,6 +127,18 @@ public class TaskConstants extends CommonConstants {
     public static final String TASK_MONGO_SSL_ENABLE = 
"task.mongoTask.sslEnabled";
     public static final String TASK_MONGO_POLL_INTERVAL = 
"task.mongoTask.pollIntervalInMs";
 
+    // PostgreSQL task
+    public static final String TASK_POSTGRES_HOSTNAME = 
"task.postgreSQLTask.hostname";
+    public static final String TASK_POSTGRES_PORT = "task.postgreSQLTask.port";
+    public static final String TASK_POSTGRES_USER = "task.postgreSQLTask.user";
+    public static final String TASK_POSTGRES_PASSWORD = 
"task.postgreSQLTask.password";
+    public static final String TASK_POSTGRES_DBNAME = 
"task.postgreSQLTask.dbname";
+    public static final String TASK_POSTGRES_SERVERNAME = 
"task.postgreSQLTask.servername";
+    public static final String TASK_POSTGRES_SCHEMA_INCLUDE_LIST = 
"task.postgreSQLTask.schemaIncludeList";
+    public static final String TASK_POSTGRES_TABLE_INCLUDE_LIST = 
"task.postgreSQLTask.tableIncludeList";
+    public static final String TASK_POSTGRES_PLUGIN_NAME = 
"task.postgreSQLTask.pluginName";
+    public static final String TASK_POSTGRES_SNAPSHOT_MODE = 
"task.postgreSQLTask.snapshotMode";
+
     public static final String TASK_STATE = "task.state";
 
     public static final String INSTANCE_STATE = "instance.state";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/PostgreSQLTask.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/PostgreSQLTask.java
index 0486eb4a7b..33ec92a930 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/PostgreSQLTask.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/PostgreSQLTask.java
@@ -32,12 +32,11 @@ public class PostgreSQLTask {
     private String hostname;
     private String port;
     private String dbname;
-    private String schema;
-    private String servername;
-    private String pluginname;
-    private List<String> tableNameList;
+    private String schemaIncludeList;
+    private String pluginName;
+    private String tableIncludeList;
     private String serverTimeZone;
-    private String scanStartupMode;
+    private String snapshotMode;
     private String primaryKey;
 
     @Data
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index ed6fef26c3..4f835c0445 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -38,6 +38,8 @@ import org.apache.inlong.common.pojo.agent.DataConfig;
 import com.google.gson.Gson;
 import lombok.Data;
 
+import java.util.stream.Collectors;
+
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
 import static org.apache.inlong.agent.constant.TaskConstants.SYNC_SEND_OPEN;
@@ -50,6 +52,7 @@ public class TaskProfileDto {
     public static final String DEFAULT_KAFKA_TASK = 
"org.apache.inlong.agent.plugin.task.KafkaTask";
     public static final String DEFAULT_PULSAR_TASK = 
"org.apache.inlong.agent.plugin.task.PulsarTask";
     public static final String DEFAULT_MONGODB_TASK = 
"org.apache.inlong.agent.plugin.task.MongoDBTask";
+    public static final String DEFAULT_POSTGRESQL_TASK = 
"org.apache.inlong.agent.plugin.task.PostgreSQLTask";
     public static final String DEFAULT_CHANNEL = 
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
     public static final String MANAGER_JOB = "MANAGER_JOB";
     public static final String DEFAULT_DATA_PROXY_SINK = 
"org.apache.inlong.agent.plugin.sinks.ProxySink";
@@ -242,11 +245,14 @@ public class TaskProfileDto {
         postgreSQLTask.setHostname(config.getHostname());
         postgreSQLTask.setPort(config.getPort());
         postgreSQLTask.setDbname(config.getDatabase());
-        postgreSQLTask.setServername(config.getSchema());
-        postgreSQLTask.setPluginname(config.getDecodingPluginName());
-        postgreSQLTask.setTableNameList(config.getTableNameList());
+        postgreSQLTask.setSchemaIncludeList(config.getSchema());
+        postgreSQLTask.setPluginName(config.getDecodingPluginName());
+        // Each identifier is of the form schemaName.tableName and connected 
with ","
+        postgreSQLTask.setTableIncludeList(
+                config.getTableNameList().stream().map(tableName -> 
config.getSchema() + "." + tableName).collect(
+                        Collectors.joining(",")));
         postgreSQLTask.setServerTimeZone(config.getServerTimeZone());
-        postgreSQLTask.setScanStartupMode(config.getScanStartupMode());
+        postgreSQLTask.setSnapshotMode(config.getScanStartupMode());
         postgreSQLTask.setPrimaryKey(config.getPrimaryKey());
 
         return postgreSQLTask;
@@ -475,6 +481,7 @@ public class TaskProfileDto {
                 profileDto.setTask(task);
                 break;
             case POSTGRES:
+                task.setTaskClass(DEFAULT_POSTGRESQL_TASK);
                 PostgreSQLTask postgreSQLTask = getPostgresTask(dataConfig);
                 task.setPostgreSQLTask(postgreSQLTask);
                 task.setSource(POSTGRESQL_SOURCE);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PostgreSQLInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PostgreSQLInstance.java
new file mode 100644
index 0000000000..be28f18f79
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PostgreSQLInstance.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+
+public class PostgreSQLInstance extends CommonInstance {
+
+    @Override
+    public void setInodeInfo(InstanceProfile profile) {
+        profile.set(TaskConstants.INODE_INFO, "");
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
index 400dc9ab49..706721ce43 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java
@@ -17,67 +17,173 @@
 
 package org.apache.inlong.agent.plugin.sources;
 
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.constant.AgentConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.except.FileException;
 import org.apache.inlong.agent.plugin.file.Reader;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.plugin.sources.reader.PostgreSQLReader;
 
+import io.debezium.connector.postgresql.PostgresConnector;
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.engine.ChangeEvent;
+import io.debezium.engine.DebeziumEngine;
+import io.debezium.engine.DebeziumEngine.RecordCommitter;
+import io.debezium.engine.format.Json;
+import io.debezium.engine.spi.OffsetCommitPolicy;
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_DBNAME;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_HOSTNAME;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PASSWORD;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PLUGIN_NAME;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PORT;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_SCHEMA_INCLUDE_LIST;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_TABLE_INCLUDE_LIST;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_USER;
 
-/**
- * PostgreSQL source, split PostgreSQL source job into multi readers
- */
 public class PostgreSQLSource extends AbstractSource {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(PostgreSQLSource.class);
+    private static final Integer DEBEZIUM_QUEUE_SIZE = 100;
+    private ExecutorService executor;
+    public InstanceProfile profile;
+    private BlockingQueue<SourceData> debeziumQueue;
+    private final Properties props = new Properties();
+    private String snapshotMode;
+    private String pluginName;
+    private String dbName;
+    private String tableName;
+
+    private boolean isRestoreFromDB = false;
 
     public PostgreSQLSource() {
 
     }
 
     @Override
-    public List<Reader> split(TaskProfile conf) {
-        PostgreSQLReader postgreSQLReader = new PostgreSQLReader();
-        List<Reader> readerList = new ArrayList<>();
-        readerList.add(postgreSQLReader);
-        sourceMetric.sourceSuccessCount.incrementAndGet();
-        return readerList;
+    protected void initSource(InstanceProfile profile) {
+        try {
+            LOGGER.info("PostgreSQLSource init: {}", profile.toJsonStr());
+            debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE);
+            pluginName = profile.get(TASK_POSTGRES_PLUGIN_NAME);
+            dbName = profile.get(TASK_POSTGRES_DBNAME);
+            tableName = profile.get(TASK_POSTGRES_TABLE_INCLUDE_LIST);
+            snapshotMode = 
profile.get(TaskConstants.TASK_POSTGRES_SNAPSHOT_MODE, "initial");
+
+            props.setProperty("name", "PostgreSQL-" + instanceId);
+            props.setProperty("connector.class", 
PostgresConnector.class.getName());
+            props.setProperty("offset.storage", 
FileOffsetBackingStore.class.getName());
+            String agentPath = AgentConfiguration.getAgentConf()
+                    .get(AgentConstants.AGENT_HOME, 
AgentConstants.DEFAULT_AGENT_HOME);
+            String offsetPath = agentPath + "/" + getThreadName() + 
"offset.dat";
+            props.setProperty("offset.storage.file.filename", offsetPath);
+
+            
props.setProperty(String.valueOf(PostgresConnectorConfig.HOSTNAME), 
profile.get(TASK_POSTGRES_HOSTNAME));
+            props.setProperty(String.valueOf(PostgresConnectorConfig.PORT), 
profile.get(TASK_POSTGRES_PORT));
+            props.setProperty(String.valueOf(PostgresConnectorConfig.USER), 
profile.get(TASK_POSTGRES_USER));
+            
props.setProperty(String.valueOf(PostgresConnectorConfig.PASSWORD), 
profile.get(TASK_POSTGRES_PASSWORD));
+            
props.setProperty(String.valueOf(PostgresConnectorConfig.DATABASE_NAME), 
profile.get(TASK_POSTGRES_DBNAME));
+            
props.setProperty(String.valueOf(PostgresConnectorConfig.SERVER_NAME), 
getThreadName());
+            
props.setProperty(String.valueOf(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST),
+                    profile.get(TASK_POSTGRES_SCHEMA_INCLUDE_LIST));
+            
props.setProperty(String.valueOf(PostgresConnectorConfig.TABLE_INCLUDE_LIST),
+                    profile.get(TASK_POSTGRES_TABLE_INCLUDE_LIST));
+            
props.setProperty(String.valueOf(PostgresConnectorConfig.PLUGIN_NAME), 
pluginName);
+            
props.setProperty(String.valueOf(PostgresConnectorConfig.SNAPSHOT_MODE), 
snapshotMode);
+
+            executor = Executors.newSingleThreadExecutor();
+            executor.execute(startDebeziumEngine());
+
+        } catch (Exception ex) {
+            stopRunning();
+            throw new FileException("error init stream for " + instanceId, ex);
+        }
+    }
+
+    private Runnable startDebeziumEngine() {
+        return () -> {
+            AgentThreadFactory.nameThread(getThreadName() + "debezium");
+            try (DebeziumEngine<ChangeEvent<String, String>> debeziumEngine = 
DebeziumEngine.create(Json.class)
+                    .using(props)
+                    .using(OffsetCommitPolicy.always())
+                    .notifying(this::handleConsumerEvent)
+                    .build()) {
+
+                debeziumEngine.run();
+            } catch (Throwable e) {
+                LOGGER.error("do run error in postgres debezium: ", e);
+            }
+        };
+    }
+
+    private void handleConsumerEvent(List<ChangeEvent<String, String>> records,
+            RecordCommitter<ChangeEvent<String, String>> committer) throws 
InterruptedException {
+        boolean offerSuc = false;
+        for (ChangeEvent<String, String> record : records) {
+            SourceData sourceData = new 
SourceData(record.value().getBytes(StandardCharsets.UTF_8), "0");
+            while (isRunnable() && !offerSuc) {
+                offerSuc = debeziumQueue.offer(sourceData, 1, 
TimeUnit.SECONDS);
+            }
+            committer.markProcessed(record);
+        }
+        committer.markBatchFinished();
     }
 
     @Override
-    protected String getThreadName() {
+    public List<Reader> split(TaskProfile conf) {
         return null;
     }
 
     @Override
-    protected void initSource(InstanceProfile profile) {
-
+    protected String getThreadName() {
+        return "postgres-source-" + taskId + "-" + instanceId;
     }
 
     @Override
     protected void printCurrentState() {
-
+        LOGGER.info("postgres databases is {} and table is {}", dbName, 
tableName);
     }
 
     @Override
     protected boolean doPrepareToRead() {
-        return false;
+        return true;
     }
 
     @Override
     protected List<SourceData> readFromSource() {
-        return null;
-    }
-
-    @Override
-    public Message read() {
-        return null;
+        List<SourceData> dataList = new ArrayList<>();
+        try {
+            int size = 0;
+            while (size < BATCH_READ_LINE_TOTAL_LEN) {
+                SourceData sourceData = debeziumQueue.poll(1, 
TimeUnit.SECONDS);
+                if (sourceData != null) {
+                    LOGGER.info("readFromSource: {}", sourceData.getData());
+                    size += sourceData.getData().length;
+                    dataList.add(sourceData);
+                } else {
+                    break;
+                }
+            }
+        } catch (InterruptedException e) {
+            LOGGER.error("poll {} data from debezium queue interrupted.", 
instanceId);
+        }
+        return dataList;
     }
 
     @Override
@@ -87,16 +193,12 @@ public class PostgreSQLSource extends AbstractSource {
 
     @Override
     protected void releaseSource() {
-
-    }
-
-    @Override
-    public boolean sourceFinish() {
-        return false;
+        LOGGER.info("release postgres source");
+        executor.shutdownNow();
     }
 
     @Override
     public boolean sourceExist() {
-        return false;
+        return true;
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
new file mode 100644
index 0000000000..554764911d
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.utils.AgentUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_DBNAME;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_HOSTNAME;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PASSWORD;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PLUGIN_NAME;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PORT;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_TABLE_INCLUDE_LIST;
+import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_USER;
+
+public class PostgreSQLTask extends AbstractTask {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PostgreSQLTask.class);
+    public static final String DEFAULT_KAFKA_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.KafkaInstance";
+    private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyyMMddHH");
+    private boolean isAdded = false;
+    public static final int DEFAULT_INSTANCE_LIMIT = 1;
+
+    private String dbName;
+    private String tableName;
+    private String instanceId;
+
+    @Override
+    protected void initTask() {
+        LOGGER.info("postgres commonInit: {}", taskProfile.toJsonStr());
+        taskProfile.get(TASK_POSTGRES_DBNAME);
+        dbName = taskProfile.get(TASK_POSTGRES_DBNAME);
+        tableName = taskProfile.get(TASK_POSTGRES_TABLE_INCLUDE_LIST);
+        instanceId = dbName + "-" + tableName;
+    }
+
+    @Override
+    public boolean isProfileValid(TaskProfile profile) {
+        if (!profile.allRequiredKeyExist()) {
+            LOGGER.error("task profile needs all required key");
+            return false;
+        }
+        if (!profile.hasKey(profile.get(TASK_POSTGRES_HOSTNAME))) {
+            LOGGER.error("task profile needs hostname");
+            return false;
+        }
+        if (!profile.hasKey(profile.get(TASK_POSTGRES_PORT))) {
+            LOGGER.error("task profile needs port");
+            return false;
+        }
+        if (!profile.hasKey(profile.get(TASK_POSTGRES_USER))) {
+            LOGGER.error("task profile needs username");
+            return false;
+        }
+        if (!profile.hasKey(profile.get(TASK_POSTGRES_PASSWORD))) {
+            LOGGER.error("task profile needs password");
+            return false;
+        }
+        if (!profile.hasKey(profile.get(TASK_POSTGRES_DBNAME))) {
+            LOGGER.error("task profile needs DB name");
+            return false;
+        }
+        if (!profile.hasKey(profile.get(TASK_POSTGRES_PLUGIN_NAME))) {
+            LOGGER.error("task profile needs plugin name");
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    protected List<InstanceProfile> getNewInstanceList() {
+        List<InstanceProfile> list = new ArrayList<>();
+        if (isAdded) {
+            return list;
+        }
+        String dataTime = LocalDateTime.now().format(dateTimeFormatter);
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, instanceId,
+                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+        list.add(instanceProfile);
+        this.isAdded = true;
+        return list;
+    }
+
+    @Override
+    protected int getInstanceLimit() {
+        return DEFAULT_INSTANCE_LIMIT;
+    }
+}
diff --git a/pom.xml b/pom.xml
index 164b8b6eb5..c27d177e42 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,7 +100,7 @@
         <rocksdb.version>6.29.4.1</rocksdb.version>
         <redis-replicator.version>3.6.4</redis-replicator.version>
         <hadoop.version>2.10.2</hadoop.version>
-        <postgresql.version>42.4.4</postgresql.version>
+        <postgresql.version>42.4.5</postgresql.version>
         <oracle.jdbc.version>19.3.0.0</oracle.jdbc.version>
         <mysql.jdbc.version>8.0.28</mysql.jdbc.version>
         <mssql.jdbc.version>12.4.1.jre8</mssql.jdbc.version>

Reply via email to