This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new a06800e90b [Feature][Connector] Add Apache Cloudberry Support (#8985) a06800e90b is described below commit a06800e90b5685632ef788fd85aebd5c8fad2412 Author: chenhongyu <2795267...@qq.com> AuthorDate: Mon Mar 24 10:54:09 2025 +0800 [Feature][Connector] Add Apache Cloudberry Support (#8985) --- .../connector-v2/changelog/connector-cloudberry.md | 7 + docs/en/connector-v2/sink/Cloudberry.md | 176 ++++++++++++++++++ docs/en/connector-v2/source/Cloudberry.md | 152 +++++++++++++++ .../seatunnel/jdbc/JdbcCloudberryIT.java | 206 +++++++++++++++++++++ .../resources/jdbc_cloudberry_source_and_sink.conf | 57 ++++++ 5 files changed, 598 insertions(+) diff --git a/docs/en/connector-v2/changelog/connector-cloudberry.md b/docs/en/connector-v2/changelog/connector-cloudberry.md new file mode 100644 index 0000000000..749ae587cd --- /dev/null +++ b/docs/en/connector-v2/changelog/connector-cloudberry.md @@ -0,0 +1,7 @@ +<details><summary> Change Log </summary> + +| Change | Commit | Version | +| --- | --- | --- | +|[Feature][Connector] Add Apache Cloudberry Support (#8985)|https://github.com/apache/seatunnel/commit/b6f82c1|dev| + +</details> diff --git a/docs/en/connector-v2/sink/Cloudberry.md b/docs/en/connector-v2/sink/Cloudberry.md new file mode 100644 index 0000000000..c7e5b6c99a --- /dev/null +++ b/docs/en/connector-v2/sink/Cloudberry.md @@ -0,0 +1,176 @@ +import ChangeLog from '../changelog/connector-cloudberry.md'; + +# Cloudberry + +> JDBC Cloudberry Sink Connector + +## Support Those Engines + +> Spark<br/> +> Flink<br/> +> SeaTunnel Zeta<br/> + +## Description + +Write data through JDBC. Cloudberry currently does not have its own native driver. It uses PostgreSQL's driver for connectivity and follows PostgreSQL's implementation. + +Support Batch mode and Streaming mode, support concurrent writing, support exactly-once +semantics (using XA transaction guarantee). + +## Using Dependency + +### For Spark/Flink Engine + +> 1. You need to ensure that the [jdbc driver jar package](https://mvnrepository.com/artifact/org.postgresql/postgresql) has been placed in directory `${SEATUNNEL_HOME}/plugins/`. + +### For SeaTunnel Zeta Engine + +> 1. You need to ensure that the [jdbc driver jar package](https://mvnrepository.com/artifact/org.postgresql/postgresql) has been placed in directory `${SEATUNNEL_HOME}/lib/`. + +## Key Features + +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [x] [cdc](../../concept/connector-v2-features.md) + +> Use `Xa transactions` to ensure `exactly-once`. So only support `exactly-once` for the database which is +> support `Xa transactions`. You can set `is_exactly_once=true` to enable it. + +## Supported DataSource Info + +| Datasource | Supported Versions | Driver | Url | Maven | +|------------|------------------------------------------|------------------------|---------------------------------------|--------------------------------------------------------------------------| +| Cloudberry | Uses PostgreSQL driver implementation | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | [Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) | + +## Database Dependency + +> Please download the PostgreSQL driver jar and copy it to the '$SEATUNNEL_HOME/plugins/jdbc/lib/' working directory<br/> +> For example: cp postgresql-xxx.jar $SEATUNNEL_HOME/plugins/jdbc/lib/ + +## Data Type Mapping + +Cloudberry uses PostgreSQL's data type implementation. Please refer to PostgreSQL documentation for data type compatibility and mappings. + +## Options + +Cloudberry connector uses the same options as PostgreSQL. For detailed configuration options, please refer to the PostgreSQL documentation. + +Key options include: +- url (required): The JDBC connection URL +- driver (required): The driver class name (org.postgresql.Driver) +- user/password: Authentication credentials +- query or database/table combination: What data to write and how +- is_exactly_once: Enable exactly-once semantics with XA transactions +- batch_size: Control batch writing behavior + +## Task Example + +### Simple: + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + parallelism = 1 + plugin_output = "fake" + row.num = 16 + schema = { + fields { + name = "string" + age = "int" + } + } + } +} + +sink { + jdbc { + url = "jdbc:postgresql://localhost:5432/cloudberrydb" + driver = "org.postgresql.Driver" + user = "dbadmin" + password = "password" + query = "insert into test_table(name,age) values(?,?)" + } +} +``` + +### Generate Sink SQL + +```hocon +sink { + Jdbc { + url = "jdbc:postgresql://localhost:5432/cloudberrydb" + driver = "org.postgresql.Driver" + user = "dbadmin" + password = "password" + + generate_sink_sql = true + database = "mydb" + table = "public.test_table" + } +} +``` + +### Exactly-once: + +```hocon +sink { + jdbc { + url = "jdbc:postgresql://localhost:5432/cloudberrydb" + driver = "org.postgresql.Driver" + user = "dbadmin" + password = "password" + query = "insert into test_table(name,age) values(?,?)" + + is_exactly_once = "true" + xa_data_source_class_name = "org.postgresql.xa.PGXADataSource" + } +} +``` + +### CDC(Change Data Capture) Event + +```hocon +sink { + jdbc { + url = "jdbc:postgresql://localhost:5432/cloudberrydb" + driver = "org.postgresql.Driver" + user = "dbadmin" + password = "password" + + generate_sink_sql = true + database = "mydb" + table = "sink_table" + primary_keys = ["id","name"] + field_ide = UPPERCASE + } +} +``` + +### Save mode function + +```hocon +sink { + Jdbc { + url = "jdbc:postgresql://localhost:5432/cloudberrydb" + driver = "org.postgresql.Driver" + user = "dbadmin" + password = "password" + + generate_sink_sql = true + database = "mydb" + table = "public.test_table" + schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" + data_save_mode = "APPEND_DATA" + } +} +``` + +For more detailed examples and options, please refer to the PostgreSQL connector documentation. + +## Changelog + +<ChangeLog /> \ No newline at end of file diff --git a/docs/en/connector-v2/source/Cloudberry.md b/docs/en/connector-v2/source/Cloudberry.md new file mode 100644 index 0000000000..80880d6f98 --- /dev/null +++ b/docs/en/connector-v2/source/Cloudberry.md @@ -0,0 +1,152 @@ +import ChangeLog from '../changelog/connector-cloudberry.md'; + +# Cloudberry + +> JDBC Cloudberry Source Connector + +## Support Those Engines + +> Spark<br/> +> Flink<br/> +> SeaTunnel Zeta<br/> + +## Using Dependency + +### For Spark/Flink Engine + +> 1. You need to ensure that the [jdbc driver jar package](https://mvnrepository.com/artifact/org.postgresql/postgresql) has been placed in directory `${SEATUNNEL_HOME}/plugins/`. + +### For SeaTunnel Zeta Engine + +> 1. You need to ensure that the [jdbc driver jar package](https://mvnrepository.com/artifact/org.postgresql/postgresql) has been placed in directory `${SEATUNNEL_HOME}/lib/`. + +## Key Features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [x] [column projection](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) +- [x] [support user-defined split](../../concept/connector-v2-features.md) + +> supports query SQL and can achieve projection effect. + +## Description + +Read external data source data through JDBC. Cloudberry currently does not have its own native JDBC driver, using PostgreSQL's drivers and implementation. + +## Supported DataSource Info + +| Datasource | Supported Versions | Driver | Url | Maven | +|------------|------------------------------------------|------------------------|---------------------------------------|--------------------------------------------------------------------------| +| Cloudberry | Uses PostgreSQL driver implementation | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | [Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) | + +## Database Dependency + +> Please download the PostgreSQL driver jar and copy it to the '$SEATUNNEL_HOME/plugins/jdbc/lib/' working directory<br/> +> For example: cp postgresql-xxx.jar $SEATUNNEL_HOME/plugins/jdbc/lib/ + +## Data Type Mapping + +Cloudberry uses PostgreSQL's data type implementation. Please refer to PostgreSQL documentation for data type compatibility and mappings. + +## Options + +Cloudberry connector uses the same options as PostgreSQL. For detailed configuration options, please refer to the PostgreSQL documentation. + +Key options include: +- url (required): The JDBC connection URL +- driver (required): The driver class name (org.postgresql.Driver) +- user/password: Authentication credentials +- query or table_path: What data to read +- partition options for parallel reading + +## Parallel Reader + +Cloudberry supports parallel reading following the same rules as PostgreSQL connector. For detailed information on split strategies and parallel reading options, please refer to the PostgreSQL connector documentation. + +## Task Example + +### Simple: + +```hocon +env { + parallelism = 4 + job.mode = "BATCH" +} + +source { + Jdbc { + url = "jdbc:postgresql://localhost:5432/cloudberrydb" + driver = "org.postgresql.Driver" + user = "dbadmin" + password = "password" + query = "select * from mytable limit 100" + } +} + +sink { + Console {} +} +``` + +### Parallel reading with table_path: + +```hocon +env { + parallelism = 4 + job.mode = "BATCH" +} + +source { + Jdbc { + url = "jdbc:postgresql://localhost:5432/cloudberrydb" + driver = "org.postgresql.Driver" + user = "dbadmin" + password = "password" + table_path = "public.mytable" + split.size = 10000 + } +} + +sink { + Console {} +} +``` + +### Multiple table read: + +```hocon +env { + job.mode = "BATCH" + parallelism = 4 +} + +source { + Jdbc { + url = "jdbc:postgresql://localhost:5432/cloudberrydb" + driver = "org.postgresql.Driver" + user = "dbadmin" + password = "password" + "table_list" = [ + { + "table_path" = "public.table1" + }, + { + "table_path" = "public.table2" + } + ] + split.size = 10000 + } +} + +sink { + Console {} +} +``` + +For more detailed examples and configurations, please refer to the PostgreSQL connector documentation. + +## Changelog + +<ChangeLog /> \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCloudberryIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCloudberryIT.java new file mode 100644 index 0000000000..bb898d6761 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCloudberryIT.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import org.apache.seatunnel.shade.com.google.common.collect.Lists; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.apache.commons.lang3.tuple.Pair; + +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.images.PullPolicy; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.given; + +public class JdbcCloudberryIT extends AbstractJdbcIT { + private static final String CLOUDBERRY_IMAGE = "lhrbest/cbdb:1.5.4"; + private static final String CLOUDBERRY_CONTAINER_HOST = "cbdb"; + private static final String CLOUDBERRY_DATABASE = "postgres"; + + private static final String CLOUDBERRY_SCHEMA = "public"; + private static final String CLOUDBERRY_SOURCE = "source"; + private static final String CLOUDBERRY_SINK = "sink"; + + private static final String CLOUDBERRY_USERNAME = "gpadmin"; + private static final String CLOUDBERRY_PASSWORD = "gpadmin"; + private static final int CLOUDBERRY_CONTAINER_PORT = 5432; + + private static final String CLOUDBERRY_URL = "jdbc:postgresql://" + HOST + ":%s/%s"; + + private static final String DRIVER_CLASS = "org.postgresql.Driver"; + + private static final List<String> CONFIG_FILE = + Lists.newArrayList("/jdbc_cloudberry_source_and_sink.conf"); + + private static final String CREATE_SQL = + "CREATE TABLE %s (\n" + "age INT NOT NULL,\n" + "name VARCHAR(255) NOT NULL\n" + ")"; + + @Override + JdbcCase getJdbcCase() { + Map<String, String> containerEnv = new HashMap<>(); + String jdbcUrl = + String.format(CLOUDBERRY_URL, CLOUDBERRY_CONTAINER_PORT, CLOUDBERRY_DATABASE); + Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData(); + String[] fieldNames = testDataSet.getKey(); + + String insertSql = insertTable(CLOUDBERRY_SCHEMA, CLOUDBERRY_SOURCE, fieldNames); + + return JdbcCase.builder() + .dockerImage(CLOUDBERRY_IMAGE) + .networkAliases(CLOUDBERRY_CONTAINER_HOST) + .containerEnv(containerEnv) + .driverClass(DRIVER_CLASS) + .host(HOST) + .port(CLOUDBERRY_CONTAINER_PORT) + .localPort(CLOUDBERRY_CONTAINER_PORT) + .jdbcTemplate(CLOUDBERRY_URL) + .jdbcUrl(jdbcUrl) + .userName(CLOUDBERRY_USERNAME) + .password(CLOUDBERRY_PASSWORD) + .database(CLOUDBERRY_SCHEMA) + .sourceTable(CLOUDBERRY_SOURCE) + .sinkTable(CLOUDBERRY_SINK) + .createSql(CREATE_SQL) + .configFile(CONFIG_FILE) + .insertSql(insertSql) + .testData(testDataSet) + .tablePathFullName(CLOUDBERRY_SOURCE) + .useSaveModeCreateTable(false) + .build(); + } + + @Override + String driverUrl() { + return "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar"; + } + + @Override + Pair<String[], List<SeaTunnelRow>> initTestData() { + String[] fieldNames = + new String[] { + "age", "name", + }; + + List<SeaTunnelRow> rows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = + new SeaTunnelRow( + new Object[] { + i, "f_" + i, + }); + rows.add(row); + } + + return Pair.of(fieldNames, rows); + } + + @Override + GenericContainer<?> initContainer() { + DockerImageName imageName = DockerImageName.parse(CLOUDBERRY_IMAGE); + GenericContainer<?> container = + new GenericContainer<>(imageName) + .withNetwork(NETWORK) + .withNetworkAliases(CLOUDBERRY_CONTAINER_HOST) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(CLOUDBERRY_IMAGE))) + .withCommand("/usr/sbin/init") // Ensure container starts correctly + .withPrivilegedMode(true); // Set privileged mode + // Mount cgroup volume + container.addFileSystemBind("/sys/fs/cgroup", "/sys/fs/cgroup", BindMode.READ_ONLY); + container.setPortBindings( + Lists.newArrayList( + String.format( + "%s:%s", CLOUDBERRY_CONTAINER_PORT, CLOUDBERRY_CONTAINER_PORT))); + return container; + } + + @Override + public String quoteIdentifier(String field) { + return "\"" + field + "\""; + } + + @Override + public void clearTable(String schema, String table) { + // do nothing. + } + + @Override + protected void beforeStartUP() { + log.info("Setting up Apache Cloudberry..."); + try { + // Wait for container to start + Thread.sleep(5000); + // Switch to gpadmin user and start database + Container.ExecResult execResult = + dbServer.execInContainer("bash", "-c", "su - gpadmin -c 'gpstart -a'"); + log.info("gpstart result: {}", execResult.getStdout()); + // Set gpadmin password + execResult = + dbServer.execInContainer( + "bash", + "-c", + "su - gpadmin -c \"psql -c \\\"ALTER USER gpadmin WITH PASSWORD 'gpadmin';\\\"\""); + log.info("Set password result: {}", execResult.getStdout()); + // Confirm database is started + execResult = + dbServer.execInContainer( + "bash", "-c", "su - gpadmin -c 'psql -c \"SELECT version();\"'"); + log.info("Apache Cloudberry version: {}", execResult.getStdout()); + + } catch (InterruptedException | IOException e) { + log.error("Failed to initialize Apache Cloudberry", e); + throw new RuntimeException("Failed to initialize Apache Cloudberry", e); + } + } + + @BeforeAll + @Override + public void startUp() { + dbServer = initContainer().withImagePullPolicy(PullPolicy.alwaysPull()); + Startables.deepStart(Stream.of(dbServer)).join(); + jdbcCase = getJdbcCase(); + beforeStartUP(); + // Increase retry count and timeout, CloudberryDB might need more time to start + given().ignoreExceptions() + .await() + .atMost(600, TimeUnit.SECONDS) // Increase waiting time + .pollInterval(10, TimeUnit.SECONDS) // Set polling interval + .untilAsserted(() -> this.initializeJdbcConnection(jdbcCase.getJdbcUrl())); + createSchemaIfNeeded(); + createNeededTables(); + insertTestData(); + initCatalog(); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_cloudberry_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_cloudberry_source_and_sink.conf new file mode 100644 index 0000000000..af574a7655 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/resources/jdbc_cloudberry_source_and_sink.conf @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + driver = org.postgresql.Driver + url = "jdbc:postgresql://cbdb:5432/postgres" + user = gpadmin + password = gpadmin + query = "select age, name from source" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc +} + +transform { + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform-v2/sql +} + +sink { + Jdbc { + driver = org.postgresql.Driver + url = "jdbc:postgresql://cbdb:5432/postgres" + user = gpadmin + password = gpadmin + query = "insert into sink(age, name) values(?, ?)" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc +} \ No newline at end of file