This is an automated email from the ASF dual-hosted git repository. wenweihuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ca0983fceb [INLONG-10318][Agent] Add PostgreSQL data source for Agent (#10320) ca0983fceb is described below commit ca0983fcebf70c68f8e79f341c992b1d2c5970f4 Author: haifxu <xhf1208357...@gmail.com> AuthorDate: Thu May 30 19:48:54 2024 +0800 [INLONG-10318][Agent] Add PostgreSQL data source for Agent (#10320) * [INLONG-10318][Agent] Add PostgreSQL data source for Agent * [INLONG-10318][Agent] Add PostgreSQL data source for Agent * [INLONG-10318][Agent] code format --- .../inlong/agent/constant/TaskConstants.java | 13 ++ .../apache/inlong/agent/pojo/PostgreSQLTask.java | 9 +- .../apache/inlong/agent/pojo/TaskProfileDto.java | 15 +- .../agent/plugin/instance/PostgreSQLInstance.java | 29 ++++ .../agent/plugin/sources/PostgreSQLSource.java | 160 +++++++++++++++++---- .../inlong/agent/plugin/task/PostgreSQLTask.java | 113 +++++++++++++++ pom.xml | 2 +- 7 files changed, 302 insertions(+), 39 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java index 4cb1c70d12..1607742556 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java @@ -97,6 +97,7 @@ public class TaskConstants extends CommonConstants { public static final String TASK_PULSAR_SUBSCRIPTION_POSITION = "task.pulsarTask.subscriptionPosition"; public static final String TASK_PULSAR_RESET_TIME = "task.pulsarTask.resetTime"; + // Mongo task public static final String TASK_MONGO_HOSTS = "task.mongoTask.hosts"; public static final String TASK_MONGO_USER = "task.mongoTask.user"; public static final String TASK_MONGO_PASSWORD = "task.mongoTask.password"; @@ -126,6 +127,18 @@ public class TaskConstants extends CommonConstants { public static final String TASK_MONGO_SSL_ENABLE = "task.mongoTask.sslEnabled"; public static final String TASK_MONGO_POLL_INTERVAL = "task.mongoTask.pollIntervalInMs"; + // PostgreSQL task + public static final String TASK_POSTGRES_HOSTNAME = "task.postgreSQLTask.hostname"; + public static final String TASK_POSTGRES_PORT = "task.postgreSQLTask.port"; + public static final String TASK_POSTGRES_USER = "task.postgreSQLTask.user"; + public static final String TASK_POSTGRES_PASSWORD = "task.postgreSQLTask.password"; + public static final String TASK_POSTGRES_DBNAME = "task.postgreSQLTask.dbname"; + public static final String TASK_POSTGRES_SERVERNAME = "task.postgreSQLTask.servername"; + public static final String TASK_POSTGRES_SCHEMA_INCLUDE_LIST = "task.postgreSQLTask.schemaIncludeList"; + public static final String TASK_POSTGRES_TABLE_INCLUDE_LIST = "task.postgreSQLTask.tableIncludeList"; + public static final String TASK_POSTGRES_PLUGIN_NAME = "task.postgreSQLTask.pluginName"; + public static final String TASK_POSTGRES_SNAPSHOT_MODE = "task.postgreSQLTask.snapshotMode"; + public static final String TASK_STATE = "task.state"; public static final String INSTANCE_STATE = "instance.state"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/PostgreSQLTask.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/PostgreSQLTask.java index 0486eb4a7b..33ec92a930 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/PostgreSQLTask.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/PostgreSQLTask.java @@ -32,12 +32,11 @@ public class PostgreSQLTask { private String hostname; private String port; private String dbname; - private String schema; - private String servername; - private String pluginname; - private List<String> tableNameList; + private String schemaIncludeList; + private String pluginName; + private String tableIncludeList; private String serverTimeZone; - private String scanStartupMode; + private String snapshotMode; private String primaryKey; @Data diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java index ed6fef26c3..4f835c0445 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java @@ -38,6 +38,8 @@ import org.apache.inlong.common.pojo.agent.DataConfig; import com.google.gson.Gson; import lombok.Data; +import java.util.stream.Collectors; + import static java.util.Objects.requireNonNull; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR; import static org.apache.inlong.agent.constant.TaskConstants.SYNC_SEND_OPEN; @@ -50,6 +52,7 @@ public class TaskProfileDto { public static final String DEFAULT_KAFKA_TASK = "org.apache.inlong.agent.plugin.task.KafkaTask"; public static final String DEFAULT_PULSAR_TASK = "org.apache.inlong.agent.plugin.task.PulsarTask"; public static final String DEFAULT_MONGODB_TASK = "org.apache.inlong.agent.plugin.task.MongoDBTask"; + public static final String DEFAULT_POSTGRESQL_TASK = "org.apache.inlong.agent.plugin.task.PostgreSQLTask"; public static final String DEFAULT_CHANNEL = "org.apache.inlong.agent.plugin.channel.MemoryChannel"; public static final String MANAGER_JOB = "MANAGER_JOB"; public static final String DEFAULT_DATA_PROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink"; @@ -242,11 +245,14 @@ public class TaskProfileDto { postgreSQLTask.setHostname(config.getHostname()); postgreSQLTask.setPort(config.getPort()); postgreSQLTask.setDbname(config.getDatabase()); - postgreSQLTask.setServername(config.getSchema()); - postgreSQLTask.setPluginname(config.getDecodingPluginName()); - postgreSQLTask.setTableNameList(config.getTableNameList()); + postgreSQLTask.setSchemaIncludeList(config.getSchema()); + postgreSQLTask.setPluginName(config.getDecodingPluginName()); + // Each identifier is of the form schemaName.tableName and connected with "," + postgreSQLTask.setTableIncludeList( + config.getTableNameList().stream().map(tableName -> config.getSchema() + "." + tableName).collect( + Collectors.joining(","))); postgreSQLTask.setServerTimeZone(config.getServerTimeZone()); - postgreSQLTask.setScanStartupMode(config.getScanStartupMode()); + postgreSQLTask.setSnapshotMode(config.getScanStartupMode()); postgreSQLTask.setPrimaryKey(config.getPrimaryKey()); return postgreSQLTask; @@ -475,6 +481,7 @@ public class TaskProfileDto { profileDto.setTask(task); break; case POSTGRES: + task.setTaskClass(DEFAULT_POSTGRESQL_TASK); PostgreSQLTask postgreSQLTask = getPostgresTask(dataConfig); task.setPostgreSQLTask(postgreSQLTask); task.setSource(POSTGRESQL_SOURCE); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PostgreSQLInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PostgreSQLInstance.java new file mode 100644 index 0000000000..be28f18f79 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PostgreSQLInstance.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.instance; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.constant.TaskConstants; + +public class PostgreSQLInstance extends CommonInstance { + + @Override + public void setInodeInfo(InstanceProfile profile) { + profile.set(TaskConstants.INODE_INFO, ""); + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java index 400dc9ab49..706721ce43 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PostgreSQLSource.java @@ -17,67 +17,173 @@ package org.apache.inlong.agent.plugin.sources; +import org.apache.inlong.agent.common.AgentThreadFactory; +import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.constant.AgentConstants; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.except.FileException; import org.apache.inlong.agent.plugin.file.Reader; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; -import org.apache.inlong.agent.plugin.sources.reader.PostgreSQLReader; +import io.debezium.connector.postgresql.PostgresConnector; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.DebeziumEngine.RecordCommitter; +import io.debezium.engine.format.Json; +import io.debezium.engine.spi.OffsetCommitPolicy; +import org.apache.kafka.connect.storage.FileOffsetBackingStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_DBNAME; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_HOSTNAME; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PASSWORD; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PLUGIN_NAME; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PORT; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_SCHEMA_INCLUDE_LIST; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_TABLE_INCLUDE_LIST; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_USER; -/** - * PostgreSQL source, split PostgreSQL source job into multi readers - */ public class PostgreSQLSource extends AbstractSource { private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLSource.class); + private static final Integer DEBEZIUM_QUEUE_SIZE = 100; + private ExecutorService executor; + public InstanceProfile profile; + private BlockingQueue<SourceData> debeziumQueue; + private final Properties props = new Properties(); + private String snapshotMode; + private String pluginName; + private String dbName; + private String tableName; + + private boolean isRestoreFromDB = false; public PostgreSQLSource() { } @Override - public List<Reader> split(TaskProfile conf) { - PostgreSQLReader postgreSQLReader = new PostgreSQLReader(); - List<Reader> readerList = new ArrayList<>(); - readerList.add(postgreSQLReader); - sourceMetric.sourceSuccessCount.incrementAndGet(); - return readerList; + protected void initSource(InstanceProfile profile) { + try { + LOGGER.info("PostgreSQLSource init: {}", profile.toJsonStr()); + debeziumQueue = new LinkedBlockingQueue<>(DEBEZIUM_QUEUE_SIZE); + pluginName = profile.get(TASK_POSTGRES_PLUGIN_NAME); + dbName = profile.get(TASK_POSTGRES_DBNAME); + tableName = profile.get(TASK_POSTGRES_TABLE_INCLUDE_LIST); + snapshotMode = profile.get(TaskConstants.TASK_POSTGRES_SNAPSHOT_MODE, "initial"); + + props.setProperty("name", "PostgreSQL-" + instanceId); + props.setProperty("connector.class", PostgresConnector.class.getName()); + props.setProperty("offset.storage", FileOffsetBackingStore.class.getName()); + String agentPath = AgentConfiguration.getAgentConf() + .get(AgentConstants.AGENT_HOME, AgentConstants.DEFAULT_AGENT_HOME); + String offsetPath = agentPath + "/" + getThreadName() + "offset.dat"; + props.setProperty("offset.storage.file.filename", offsetPath); + + props.setProperty(String.valueOf(PostgresConnectorConfig.HOSTNAME), profile.get(TASK_POSTGRES_HOSTNAME)); + props.setProperty(String.valueOf(PostgresConnectorConfig.PORT), profile.get(TASK_POSTGRES_PORT)); + props.setProperty(String.valueOf(PostgresConnectorConfig.USER), profile.get(TASK_POSTGRES_USER)); + props.setProperty(String.valueOf(PostgresConnectorConfig.PASSWORD), profile.get(TASK_POSTGRES_PASSWORD)); + props.setProperty(String.valueOf(PostgresConnectorConfig.DATABASE_NAME), profile.get(TASK_POSTGRES_DBNAME)); + props.setProperty(String.valueOf(PostgresConnectorConfig.SERVER_NAME), getThreadName()); + props.setProperty(String.valueOf(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST), + profile.get(TASK_POSTGRES_SCHEMA_INCLUDE_LIST)); + props.setProperty(String.valueOf(PostgresConnectorConfig.TABLE_INCLUDE_LIST), + profile.get(TASK_POSTGRES_TABLE_INCLUDE_LIST)); + props.setProperty(String.valueOf(PostgresConnectorConfig.PLUGIN_NAME), pluginName); + props.setProperty(String.valueOf(PostgresConnectorConfig.SNAPSHOT_MODE), snapshotMode); + + executor = Executors.newSingleThreadExecutor(); + executor.execute(startDebeziumEngine()); + + } catch (Exception ex) { + stopRunning(); + throw new FileException("error init stream for " + instanceId, ex); + } + } + + private Runnable startDebeziumEngine() { + return () -> { + AgentThreadFactory.nameThread(getThreadName() + "debezium"); + try (DebeziumEngine<ChangeEvent<String, String>> debeziumEngine = DebeziumEngine.create(Json.class) + .using(props) + .using(OffsetCommitPolicy.always()) + .notifying(this::handleConsumerEvent) + .build()) { + + debeziumEngine.run(); + } catch (Throwable e) { + LOGGER.error("do run error in postgres debezium: ", e); + } + }; + } + + private void handleConsumerEvent(List<ChangeEvent<String, String>> records, + RecordCommitter<ChangeEvent<String, String>> committer) throws InterruptedException { + boolean offerSuc = false; + for (ChangeEvent<String, String> record : records) { + SourceData sourceData = new SourceData(record.value().getBytes(StandardCharsets.UTF_8), "0"); + while (isRunnable() && !offerSuc) { + offerSuc = debeziumQueue.offer(sourceData, 1, TimeUnit.SECONDS); + } + committer.markProcessed(record); + } + committer.markBatchFinished(); } @Override - protected String getThreadName() { + public List<Reader> split(TaskProfile conf) { return null; } @Override - protected void initSource(InstanceProfile profile) { - + protected String getThreadName() { + return "postgres-source-" + taskId + "-" + instanceId; } @Override protected void printCurrentState() { - + LOGGER.info("postgres databases is {} and table is {}", dbName, tableName); } @Override protected boolean doPrepareToRead() { - return false; + return true; } @Override protected List<SourceData> readFromSource() { - return null; - } - - @Override - public Message read() { - return null; + List<SourceData> dataList = new ArrayList<>(); + try { + int size = 0; + while (size < BATCH_READ_LINE_TOTAL_LEN) { + SourceData sourceData = debeziumQueue.poll(1, TimeUnit.SECONDS); + if (sourceData != null) { + LOGGER.info("readFromSource: {}", sourceData.getData()); + size += sourceData.getData().length; + dataList.add(sourceData); + } else { + break; + } + } + } catch (InterruptedException e) { + LOGGER.error("poll {} data from debezium queue interrupted.", instanceId); + } + return dataList; } @Override @@ -87,16 +193,12 @@ public class PostgreSQLSource extends AbstractSource { @Override protected void releaseSource() { - - } - - @Override - public boolean sourceFinish() { - return false; + LOGGER.info("release postgres source"); + executor.shutdownNow(); } @Override public boolean sourceExist() { - return false; + return true; } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java new file mode 100644 index 0000000000..554764911d --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/PostgreSQLTask.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.CycleUnitType; +import org.apache.inlong.agent.utils.AgentUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_DBNAME; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_HOSTNAME; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PASSWORD; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PLUGIN_NAME; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_PORT; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_TABLE_INCLUDE_LIST; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_POSTGRES_USER; + +public class PostgreSQLTask extends AbstractTask { + + private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSQLTask.class); + public static final String DEFAULT_KAFKA_INSTANCE = "org.apache.inlong.agent.plugin.instance.KafkaInstance"; + private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHH"); + private boolean isAdded = false; + public static final int DEFAULT_INSTANCE_LIMIT = 1; + + private String dbName; + private String tableName; + private String instanceId; + + @Override + protected void initTask() { + LOGGER.info("postgres commonInit: {}", taskProfile.toJsonStr()); + taskProfile.get(TASK_POSTGRES_DBNAME); + dbName = taskProfile.get(TASK_POSTGRES_DBNAME); + tableName = taskProfile.get(TASK_POSTGRES_TABLE_INCLUDE_LIST); + instanceId = dbName + "-" + tableName; + } + + @Override + public boolean isProfileValid(TaskProfile profile) { + if (!profile.allRequiredKeyExist()) { + LOGGER.error("task profile needs all required key"); + return false; + } + if (!profile.hasKey(profile.get(TASK_POSTGRES_HOSTNAME))) { + LOGGER.error("task profile needs hostname"); + return false; + } + if (!profile.hasKey(profile.get(TASK_POSTGRES_PORT))) { + LOGGER.error("task profile needs port"); + return false; + } + if (!profile.hasKey(profile.get(TASK_POSTGRES_USER))) { + LOGGER.error("task profile needs username"); + return false; + } + if (!profile.hasKey(profile.get(TASK_POSTGRES_PASSWORD))) { + LOGGER.error("task profile needs password"); + return false; + } + if (!profile.hasKey(profile.get(TASK_POSTGRES_DBNAME))) { + LOGGER.error("task profile needs DB name"); + return false; + } + if (!profile.hasKey(profile.get(TASK_POSTGRES_PLUGIN_NAME))) { + LOGGER.error("task profile needs plugin name"); + return false; + } + return true; + } + + @Override + protected List<InstanceProfile> getNewInstanceList() { + List<InstanceProfile> list = new ArrayList<>(); + if (isAdded) { + return list; + } + String dataTime = LocalDateTime.now().format(dateTimeFormatter); + InstanceProfile instanceProfile = taskProfile.createInstanceProfile(DEFAULT_KAFKA_INSTANCE, instanceId, + CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime()); + list.add(instanceProfile); + this.isAdded = true; + return list; + } + + @Override + protected int getInstanceLimit() { + return DEFAULT_INSTANCE_LIMIT; + } +} diff --git a/pom.xml b/pom.xml index 164b8b6eb5..c27d177e42 100644 --- a/pom.xml +++ b/pom.xml @@ -100,7 +100,7 @@ <rocksdb.version>6.29.4.1</rocksdb.version> <redis-replicator.version>3.6.4</redis-replicator.version> <hadoop.version>2.10.2</hadoop.version> - <postgresql.version>42.4.4</postgresql.version> + <postgresql.version>42.4.5</postgresql.version> <oracle.jdbc.version>19.3.0.0</oracle.jdbc.version> <mysql.jdbc.version>8.0.28</mysql.jdbc.version> <mssql.jdbc.version>12.4.1.jre8</mssql.jdbc.version>